123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package timetask
- import (
- "dust-monitor/errors"
- "dust-monitor/model"
- "dust-monitor/util"
- "encoding/json"
- "fmt"
- "time"
- )
- var layout = "2006-01-02 15:04:05"
- type DustInfoMean struct {
- Pm10Avg float64 `protobuf:"fixed64,1,opt,name=pm10_avg,json=pm10Avg,proto3" json:"mean_pm10"`
- Pm25Avg float64 `protobuf:"fixed64,2,opt,name=pm25_avg,json=pm25Avg,proto3" json:"mean_pm25"`
- Time string `protobuf:"bytes,6,opt,name=time,proto3" json:"time"`
- TspAvg float64 `protobuf:"fixed64,7,opt,name=tsp_avg,json=tspAvg,proto3" json:"mean_tsp"`
- T01Avg float64 `protobuf:"fixed64,8,opt,name=t01_avg,json=t01Avg,proto3" json:"mean_t01"`
- H01Avg float64 `protobuf:"fixed64,9,opt,name=h01_avg,json=h01Avg,proto3" json:"mean_h01"`
- W01Avg float64 `protobuf:"fixed64,10,opt,name=w01_avg,json=w01Avg,proto3" json:"mean_w01"`
- W02Avg float64 `protobuf:"fixed64,11,opt,name=w02_avg,json=w02Avg,proto3" json:"mean_w02"`
- R01Avg float64 `protobuf:"fixed64,12,opt,name=r01_avg,json=r01Avg,proto3" json:"mean_r01"`
- P01Avg float64 `protobuf:"fixed64,13,opt,name=p01_avg,json=p01Avg,proto3" json:"mean_p01"`
- B03Avg float64 `protobuf:"fixed64,14,opt,name=b03_avg,json=b03Avg,proto3" json:"mean_b03"`
- Lng float64 `protobuf:"fixed64,15,opt,name=lng,proto3" json:"mean_lng"`
- Lat float64 `protobuf:"fixed64,16,opt,name=lat,proto3" json:"mean_lat"`
- W03 float64 `json:"mean_w03"`
- }
- type DustInfo struct {
- Pm10Avg float64 `protobuf:"fixed64,1,opt,name=pm10_avg,json=pm10Avg,proto3" json:"pm10"`
- Pm25Avg float64 `protobuf:"fixed64,2,opt,name=pm25_avg,json=pm25Avg,proto3" json:"pm25"`
- Time string `protobuf:"bytes,6,opt,name=time,proto3" json:"time"`
- TspAvg float64 `protobuf:"fixed64,7,opt,name=tsp_avg,json=tspAvg,proto3" json:"tsp"`
- T01Avg float64 `protobuf:"fixed64,8,opt,name=t01_avg,json=t01Avg,proto3" json:"t01"`
- H01Avg float64 `protobuf:"fixed64,9,opt,name=h01_avg,json=h01Avg,proto3" json:"h01"`
- W01Avg float64 `protobuf:"fixed64,10,opt,name=w01_avg,json=w01Avg,proto3" json:"w01"`
- W02Avg float64 `protobuf:"fixed64,11,opt,name=w02_avg,json=w02Avg,proto3" json:"w02"`
- R01Avg float64 `protobuf:"fixed64,12,opt,name=r01_avg,json=r01Avg,proto3" json:"r01"`
- P01Avg float64 `protobuf:"fixed64,13,opt,name=p01_avg,json=p01Avg,proto3" json:"p01"`
- B03Avg float64 `protobuf:"fixed64,14,opt,name=b03_avg,json=b03Avg,proto3" json:"b03"`
- Lng float64 `protobuf:"fixed64,15,opt,name=lng,proto3" json:"lng"`
- Lat float64 `protobuf:"fixed64,16,opt,name=lat,proto3" json:"lat"`
- W03 float64 `json:"w03"`
- }
- // 获取小时表中已有的最大时间
- func getLastMaxHour(sn string) (string, error) {
- sql := fmt.Sprintf("select * from \"%s\" order by time desc limit 1", sn)
- dbname := "dust_monitor_2061"
- array := []DustInfo{}
- _, err := model.Query(sql, dbname, &array)
- if err != nil {
- return "", errors.DataBaseError
- }
- if len(array) == 0 {
- return "", nil
- }
- t := util.ParseTime(array[0].Time).Add(8*time.Hour)
- ret := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location())
- return ret.Format(layout), nil
- }
- // 获取未入库的小时数据
- func getUnwritedHourData(start string, end string, sn string) ([]DustInfo, error) {
- sql := fmt.Sprintf("SELECT mean(*) FROM \"%s\"", sn)
- whereArray := []string{}
- if start != "" {
- tstart := util.ParseTime(start)
- whereArray = append(whereArray, fmt.Sprintf("time >= '%s'", tstart.Add(-8*time.Hour).Format(layout)))
- }
- if end != "" {
- tend := util.ParseTime(end)
- whereArray = append(whereArray, fmt.Sprintf("time < '%s'", tend.Add(-8*time.Hour).Format(layout)))
- }
- where := ""
- for _, v := range whereArray {
- if where == "" {
- where = fmt.Sprintf("where %s", v)
- continue
- }
- where = fmt.Sprintf("%s and %s", where, v)
- }
- sql = fmt.Sprintf("%s %s GROUP BY time(1h)", sql, where)
- array := []DustInfoMean{}
- dbname := "dust_monitor_2011"
- _, err := model.Query(sql, dbname, &array)
- if err != nil {
- return nil, errors.DataBaseError
- }
- if len(array) == 0 {
- return nil, nil
- }
- ret := make([]DustInfo, len(array))
- for i, v := range array {
- item := DustInfo{
- Time:v.Time,
- Pm25Avg:v.Pm25Avg,
- Pm10Avg:v.Pm10Avg,
- W01Avg:v.W01Avg,
- W03:v.W03,
- W02Avg:v.W02Avg,
- P01Avg:v.P01Avg,
- TspAvg:v.TspAvg,
- T01Avg:v.T01Avg,
- H01Avg:v.H01Avg,
- R01Avg:v.R01Avg,
- Lat:v.Lat,
- Lng:v.Lng,
- B03Avg:v.B03Avg,
- }
- ret[i] = item
- }
- return ret, nil
- }
- // 写入小时数据
- func writeHourData(sn string, data []DustInfo) error {
- if len(data) == 0 {
- return nil
- }
- bytes, _ := json.Marshal(data)
- array := []map[string]interface{}{}
- json.Unmarshal(bytes, &array)
- tarray := make([]time.Time, len(array))
- for i, v := range array {
- ti, ok := v["time"]
- if !ok {
- return errors.ParamsError
- }
- str, ok := ti.(string)
- if !ok {
- return errors.ParamsError
- }
- t := util.ParseTime(str)
- tarray[i] = t.Add(8*time.Hour)
- delete(array[i], "time")
- }
- tags := map[string]string{
- "sn":sn,
- }
- return model.WriteDustDatas(sn, tags, array, tarray, "2061")
- }
- func handleDustHour(start string, end string, sn string) {
- lastMaxTime, err := getLastMaxHour(sn)
- if err != nil {
- return
- }
- if lastMaxTime == "" {
- start = ""
- } else {
- tmpStart := util.ParseTime(lastMaxTime)
- tmpStart = tmpStart.Add(60 *time.Minute)
- if tmpStart.Unix() > util.ParseTime(start).Unix() {
- return
- }
- start = tmpStart.Format("2006-01-02 15:04:05")
- }
- datas, err := getUnwritedHourData(start, end, sn)
- if err != nil {
- return
- }
- writeHourData(sn, datas)
- }
- func dustHourLoop() {
- var yesterday int
- for {
- reply, err := util.GetDustSns()
- if err != nil {
- time.Sleep(1*time.Hour)
- continue
- }
- now := time.Now()
- start := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, now.Location())
- // 过去一小时
- start = start.Add(-1*time.Hour)
- end := start.Add(1*time.Hour)
- for _, sn := range reply.Sns {
- handleDustHour(start.Format(layout), end.Format(layout), sn)
- }
- if now.Add(-24*time.Hour).Day() != yesterday {
- yesterdayTime := now.Add(-24*time.Hour)
- start := time.Date(yesterdayTime.Year(), yesterdayTime.Month(), yesterdayTime.Day(), 0, 0, 0, 0, yesterdayTime.Location())
- end := start.Add(24 *time.Hour)
- yesterday = start.Day()
- for _, sn := range reply.Sns {
- handleDustDay(start.Format(layout), end.Format(layout), sn)
- }
- }
- time.Sleep(1*time.Hour)
- }
- }
|