package timetask import ( "fmt" "git.getensh.com/common/gopkgs/database" "git.getensh.com/common/gopkgs/logger" "go.uber.org/zap" "property-device/errors" "property-device/model" "strconv" "strings" "sync" "time" ) var gateStatus = map[string]int64{} var gateMutex sync.Mutex func setGateStatus(sn string, protocol int32, status int32) error { if sn == "" || protocol == 0 { return nil } p := model.TGate{} where := [][2]interface{}{} where = model.WhereAdd(where, "sn", sn) where = model.WhereAdd(where, "protocol", protocol) values := map[string]interface{}{ "status": status, } err := p.Update(database.DB(), where, values) if err != nil { return errors.DataBaseError } return nil } func initGateStatus() { gateMutex.Lock() defer gateMutex.Unlock() p := model.TGate{} where := [][2]interface{}{} where = model.WhereAdd(where, "status", 1) page := 1 for { list, err := p.List(database.DB(), where, nil, page, 100) if err != nil { panic("init status failed:" + err.Error()) } if len(list) == 0 { break } page += 1 for _, v := range list { key := GetStatusKey(v.Sn, v.Protocol) gateStatus[key] = time.Now().Unix() } } } func gateStatusTaskImpl() { defer func() { if r := recover(); r != nil { err := fmt.Errorf("%+v", r) logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } }() timeNow := time.Now().Unix() for k, v := range gateStatus { // 超过3分钟没有连接认为超时 if (timeNow - v) > 60*3 { gateMutex.Lock() // 设置状态成功后清除缓存 sn, proto := ParseStatusKey(k) err := setGateStatus(sn, proto, 2) if err == nil { delete(gateStatus, k) } gateMutex.Unlock() } } } /* func checkGateStatusDb() { p := model.TGate{} where := map[string]interface{}{ "status": 1, } page := 1 for { list, err := p.List(database.DB(), where, nil, 1, 100) if err != nil { return } if len(list) == 0 { return } page++ ids := []string{} for _, v := range list { gateMutex.Lock() if _, ok := gateStatus[v.ID]; !ok { ids = append(ids, v.ID) } gateMutex.Unlock() } if len(ids) == 0 { continue } // 批量设置离线 batchSetGateStatus(ids, 2) } } func batchSetGateStatus(deviceIds []string, status int32) error { p := model.TGate{} where := map[string]interface{}{ "id in":deviceIds, } values := map[string]interface{}{ "status": status, } err := p.Update(database.DB(), where, values) if err != nil { return errors.DataBaseError } return nil } */ func GateStatusTask() { initGateStatus() for { // 任务实现 gateStatusTaskImpl() // 休眠60s time.Sleep(60 * time.Second) } } func GetStatusKey(sn string, protocol int32) string { return fmt.Sprintf("%v----%v", sn, protocol) } func ParseStatusKey(key string) (string, int32) { array := strings.Split(key, "----") if len(array) < 2 { return "", 0 } sn := array[0] protocol, _ := strconv.Atoi(array[1]) return sn, int32(protocol) } func UpdateGateTime(sn string, protocol int32) { timeNow := time.Now().Unix() key := GetStatusKey(sn, protocol) gateMutex.Lock() defer gateMutex.Unlock() // 重新启动或设备上线 if _, ok := gateStatus[key]; !ok { // 设置为在线 err := setGateStatus(sn, protocol, 1) if err != nil { // 设置失败不更新本地缓存 return } } // 更新时间为当前时间搓 gateStatus[key] = timeNow }