dust_hour.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package timetask
  2. import (
  3. "dust-monitor/errors"
  4. "dust-monitor/model"
  5. "dust-monitor/util"
  6. "encoding/json"
  7. "fmt"
  8. "time"
  9. )
  10. var layout = "2006-01-02 15:04:05"
  11. type DustInfoMean struct {
  12. Pm10Avg float64 `protobuf:"fixed64,1,opt,name=pm10_avg,json=pm10Avg,proto3" json:"mean_pm10"`
  13. Pm25Avg float64 `protobuf:"fixed64,2,opt,name=pm25_avg,json=pm25Avg,proto3" json:"mean_pm25"`
  14. Time string `protobuf:"bytes,6,opt,name=time,proto3" json:"time"`
  15. TspAvg float64 `protobuf:"fixed64,7,opt,name=tsp_avg,json=tspAvg,proto3" json:"mean_tsp"`
  16. T01Avg float64 `protobuf:"fixed64,8,opt,name=t01_avg,json=t01Avg,proto3" json:"mean_t01"`
  17. H01Avg float64 `protobuf:"fixed64,9,opt,name=h01_avg,json=h01Avg,proto3" json:"mean_h01"`
  18. W01Avg float64 `protobuf:"fixed64,10,opt,name=w01_avg,json=w01Avg,proto3" json:"mean_w01"`
  19. W02Avg float64 `protobuf:"fixed64,11,opt,name=w02_avg,json=w02Avg,proto3" json:"mean_w02"`
  20. R01Avg float64 `protobuf:"fixed64,12,opt,name=r01_avg,json=r01Avg,proto3" json:"mean_r01"`
  21. P01Avg float64 `protobuf:"fixed64,13,opt,name=p01_avg,json=p01Avg,proto3" json:"mean_p01"`
  22. B03Avg float64 `protobuf:"fixed64,14,opt,name=b03_avg,json=b03Avg,proto3" json:"mean_b03"`
  23. Lng float64 `protobuf:"fixed64,15,opt,name=lng,proto3" json:"mean_lng"`
  24. Lat float64 `protobuf:"fixed64,16,opt,name=lat,proto3" json:"mean_lat"`
  25. W03 float64 `json:"mean_w03"`
  26. }
  27. type DustInfo struct {
  28. Pm10Avg float64 `protobuf:"fixed64,1,opt,name=pm10_avg,json=pm10Avg,proto3" json:"pm10"`
  29. Pm25Avg float64 `protobuf:"fixed64,2,opt,name=pm25_avg,json=pm25Avg,proto3" json:"pm25"`
  30. Time string `protobuf:"bytes,6,opt,name=time,proto3" json:"time"`
  31. TspAvg float64 `protobuf:"fixed64,7,opt,name=tsp_avg,json=tspAvg,proto3" json:"tsp"`
  32. T01Avg float64 `protobuf:"fixed64,8,opt,name=t01_avg,json=t01Avg,proto3" json:"t01"`
  33. H01Avg float64 `protobuf:"fixed64,9,opt,name=h01_avg,json=h01Avg,proto3" json:"h01"`
  34. W01Avg float64 `protobuf:"fixed64,10,opt,name=w01_avg,json=w01Avg,proto3" json:"w01"`
  35. W02Avg float64 `protobuf:"fixed64,11,opt,name=w02_avg,json=w02Avg,proto3" json:"w02"`
  36. R01Avg float64 `protobuf:"fixed64,12,opt,name=r01_avg,json=r01Avg,proto3" json:"r01"`
  37. P01Avg float64 `protobuf:"fixed64,13,opt,name=p01_avg,json=p01Avg,proto3" json:"p01"`
  38. B03Avg float64 `protobuf:"fixed64,14,opt,name=b03_avg,json=b03Avg,proto3" json:"b03"`
  39. Lng float64 `protobuf:"fixed64,15,opt,name=lng,proto3" json:"lng"`
  40. Lat float64 `protobuf:"fixed64,16,opt,name=lat,proto3" json:"lat"`
  41. W03 float64 `json:"w03"`
  42. }
  43. // 获取小时表中已有的最大时间
  44. func getLastMaxHour(sn string) (string, error) {
  45. sql := fmt.Sprintf("select * from \"%s\" order by time desc limit 1", sn)
  46. dbname := "dust_monitor_2061"
  47. array := []DustInfo{}
  48. _, err := model.Query(sql, dbname, &array)
  49. if err != nil {
  50. return "", errors.DataBaseError
  51. }
  52. if len(array) == 0 {
  53. return "", nil
  54. }
  55. t := util.ParseTime(array[0].Time).Add(8*time.Hour)
  56. ret := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
  57. return ret.Format(layout), nil
  58. }
  59. // 获取未入库的小时数据
  60. func getUnwritedHourData(start string, end string, sn string) ([]DustInfo, error) {
  61. sql := fmt.Sprintf("SELECT mean(*) FROM \"%s\"", sn)
  62. whereArray := []string{}
  63. if start != "" {
  64. tstart := util.ParseTime(start)
  65. whereArray = append(whereArray, fmt.Sprintf("time >= '%s'", tstart.Add(-8*time.Hour).Format(layout)))
  66. }
  67. if end != "" {
  68. tend := util.ParseTime(end)
  69. whereArray = append(whereArray, fmt.Sprintf("time < '%s'", tend.Add(-8*time.Hour).Format(layout)))
  70. }
  71. where := ""
  72. for _, v := range whereArray {
  73. if where == "" {
  74. where = fmt.Sprintf("where %s", v)
  75. continue
  76. }
  77. where = fmt.Sprintf("%s and %s", where, v)
  78. }
  79. sql = fmt.Sprintf("%s %s GROUP BY time(1h)", sql, where)
  80. array := []DustInfoMean{}
  81. dbname := "dust_monitor_2011"
  82. _, err := model.Query(sql, dbname, &array)
  83. if err != nil {
  84. return nil, errors.DataBaseError
  85. }
  86. if len(array) == 0 {
  87. return nil, nil
  88. }
  89. ret := make([]DustInfo, len(array))
  90. for i, v := range array {
  91. item := DustInfo{
  92. Time:v.Time,
  93. Pm25Avg:v.Pm25Avg,
  94. Pm10Avg:v.Pm10Avg,
  95. W01Avg:v.W01Avg,
  96. W03:v.W03,
  97. W02Avg:v.W02Avg,
  98. P01Avg:v.P01Avg,
  99. TspAvg:v.TspAvg,
  100. T01Avg:v.T01Avg,
  101. H01Avg:v.H01Avg,
  102. R01Avg:v.R01Avg,
  103. Lat:v.Lat,
  104. Lng:v.Lng,
  105. B03Avg:v.B03Avg,
  106. }
  107. ret[i] = item
  108. }
  109. return ret, nil
  110. }
  111. // 写入小时数据
  112. func writeHourData(sn string, data []DustInfo) error {
  113. if len(data) == 0 {
  114. return nil
  115. }
  116. bytes, _ := json.Marshal(data)
  117. array := []map[string]interface{}{}
  118. json.Unmarshal(bytes, &array)
  119. tarray := make([]time.Time, len(array))
  120. for i, v := range array {
  121. ti, ok := v["time"]
  122. if !ok {
  123. return errors.ParamsError
  124. }
  125. str, ok := ti.(string)
  126. if !ok {
  127. return errors.ParamsError
  128. }
  129. t := util.ParseTime(str)
  130. tarray[i] = t.Add(8*time.Hour)
  131. delete(array[i], "time")
  132. }
  133. tags := map[string]string{
  134. "sn":sn,
  135. }
  136. return model.WriteDustDatas(sn, tags, array, tarray, "2061")
  137. }
  138. func handleDustHour(start string, end string, sn string) {
  139. lastMaxTime, err := getLastMaxHour(sn)
  140. if err != nil {
  141. return
  142. }
  143. if lastMaxTime == "" {
  144. start = ""
  145. } else {
  146. tmpStart := util.ParseTime(lastMaxTime)
  147. tmpStart = tmpStart.Add(60 *time.Minute)
  148. if tmpStart.Unix() > util.ParseTime(start).Unix() {
  149. return
  150. }
  151. start = tmpStart.Format("2006-01-02 15:04:05")
  152. }
  153. datas, err := getUnwritedHourData(start, end, sn)
  154. if err != nil {
  155. return
  156. }
  157. writeHourData(sn, datas)
  158. }
  159. func dustHourLoop() {
  160. var yesterday int
  161. for {
  162. reply, err := util.GetDustSns()
  163. if err != nil {
  164. time.Sleep(1*time.Hour)
  165. continue
  166. }
  167. now := time.Now()
  168. start := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
  169. // 过去一小时
  170. start = start.Add(-1*time.Hour)
  171. end := start.Add(1*time.Hour)
  172. for _, sn := range reply.Sns {
  173. handleDustHour(start.Format(layout), end.Format(layout), sn)
  174. }
  175. if now.Add(-24*time.Hour).Day() != yesterday {
  176. yesterdayTime := now.Add(-24*time.Hour)
  177. start := time.Date(yesterdayTime.Year(), yesterdayTime.Month(), yesterdayTime.Day(), 0, 0, 0, 0, yesterdayTime.Location())
  178. end := start.Add(24 *time.Hour)
  179. yesterday = start.Day()
  180. for _, sn := range reply.Sns {
  181. handleDustDay(start.Format(layout), end.Format(layout), sn)
  182. }
  183. }
  184. time.Sleep(1*time.Hour)
  185. }
  186. }