package timetask import ( "fmt" "git.getensh.com/common/gopkgs/database" "git.getensh.com/common/gopkgs/logger" "go.uber.org/zap" "property-mqtt/errors" "property-mqtt/model" "sync" "time" ) var gateStatus = map[string]int64{} var gateMutex sync.Mutex func setGateStatus(deviceId string, status int32) error { p := model.TGate{} where := map[string]interface{}{ "id": deviceId, } values := map[string]interface{}{ "status": status, } err := p.Update(database.DB(), where, values) if err != nil { return errors.DataBaseError } return nil } func initGateStatus() { gateMutex.Lock() defer gateMutex.Unlock() p := model.TGate{} where := map[string]interface{}{ "status": 1, } page := 1 for { list, err := p.List(database.DB(), where, nil, page, 100) if err != nil { panic("init status failed:" + err.Error()) } if len(list) == 0 { break } page += 1 for _, v := range list { gateStatus[v.ID] = time.Now().Unix() } } } func gateStatusTaskImpl() { defer func() { if r := recover(); r != nil { err := fmt.Errorf("%+v", r) logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } }() timeNow := time.Now().Unix() for k, v := range gateStatus { // 超过3分钟没有连接认为超时 if (timeNow - v) > 60*3 { gateMutex.Lock() // 设置状态成功后清除缓存 err := setGateStatus(k, 2) if err == nil { delete(gateStatus, k) } gateMutex.Unlock() } } } /* func checkGateStatusDb() { p := model.TGate{} where := map[string]interface{}{ "status": 1, } page := 1 for { list, err := p.List(database.DB(), where, nil, 1, 100) if err != nil { return } if len(list) == 0 { return } page++ ids := []string{} for _, v := range list { gateMutex.Lock() if _, ok := gateStatus[v.ID]; !ok { ids = append(ids, v.ID) } gateMutex.Unlock() } if len(ids) == 0 { continue } // 批量设置离线 batchSetGateStatus(ids, 2) } } func batchSetGateStatus(deviceIds []string, status int32) error { p := model.TGate{} where := map[string]interface{}{ "id in":deviceIds, } values := map[string]interface{}{ "status": status, } err := p.Update(database.DB(), where, values) if err != nil { return errors.DataBaseError } return nil } */ func GateStatusTask() { initGateStatus() for { // 任务实现 gateStatusTaskImpl() // 休眠60s time.Sleep(60 * time.Second) } } func UpdateGateTime(deviceId string) { timeNow := time.Now().Unix() gateMutex.Lock() defer gateMutex.Unlock() // 重新启动或设备上线 if _, ok := gateStatus[deviceId]; !ok { // 设置为在线 err := setGateStatus(deviceId, 1) if err != nil { // 设置失败不更新本地缓存 return } } // 更新时间为当前时间搓 gateStatus[deviceId] = timeNow }