123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package timetask
- import (
- "fmt"
- "git.getensh.com/common/gopkgs/database"
- "git.getensh.com/common/gopkgs/logger"
- "go.uber.org/zap"
- "property-mqtt/errors"
- "property-mqtt/model"
- "sync"
- "time"
- )
- var gateStatus = map[string]int64{}
- var gateMutex sync.Mutex
- func setGateStatus(deviceId string, status int32) error {
- p := model.TGate{}
- where := map[string]interface{}{
- "id": deviceId,
- }
- values := map[string]interface{}{
- "status": status,
- }
- err := p.Update(database.DB(), where, values)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- func initGateStatus() {
- gateMutex.Lock()
- defer gateMutex.Unlock()
- p := model.TGate{}
- where := map[string]interface{}{
- "status": 1,
- }
- page := 1
- for {
- list, err := p.List(database.DB(), where, nil, page, 100)
- if err != nil {
- panic("init status failed:" + err.Error())
- }
- if len(list) == 0 {
- break
- }
- page += 1
- for _, v := range list {
- gateStatus[v.ID] = time.Now().Unix()
- }
- }
- }
- func gateStatusTaskImpl() {
- defer func() {
- if r := recover(); r != nil {
- err := fmt.Errorf("%+v", r)
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }()
- timeNow := time.Now().Unix()
- for k, v := range gateStatus {
- // 超过3分钟没有连接认为超时
- if (timeNow - v) > 60*3 {
- gateMutex.Lock()
- // 设置状态成功后清除缓存
- err := setGateStatus(k, 2)
- if err == nil {
- delete(gateStatus, k)
- }
- gateMutex.Unlock()
- }
- }
- }
- /*
- func checkGateStatusDb() {
- p := model.TGate{}
- where := map[string]interface{}{
- "status": 1,
- }
- page := 1
- for {
- list, err := p.List(database.DB(), where, nil, 1, 100)
- if err != nil {
- return
- }
- if len(list) == 0 {
- return
- }
- page++
- ids := []string{}
- for _, v := range list {
- gateMutex.Lock()
- if _, ok := gateStatus[v.ID]; !ok {
- ids = append(ids, v.ID)
- }
- gateMutex.Unlock()
- }
- if len(ids) == 0 {
- continue
- }
- // 批量设置离线
- batchSetGateStatus(ids, 2)
- }
- }
- func batchSetGateStatus(deviceIds []string, status int32) error {
- p := model.TGate{}
- where := map[string]interface{}{
- "id in":deviceIds,
- }
- values := map[string]interface{}{
- "status": status,
- }
- err := p.Update(database.DB(), where, values)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- */
- func GateStatusTask() {
- initGateStatus()
- for {
- // 任务实现
- gateStatusTaskImpl()
- // 休眠60s
- time.Sleep(60 * time.Second)
- }
- }
- func UpdateGateTime(deviceId string) {
- timeNow := time.Now().Unix()
- gateMutex.Lock()
- defer gateMutex.Unlock()
- // 重新启动或设备上线
- if _, ok := gateStatus[deviceId]; !ok {
- // 设置为在线
- err := setGateStatus(deviceId, 1)
- if err != nil {
- // 设置失败不更新本地缓存
- return
- }
- }
- // 更新时间为当前时间搓
- gateStatus[deviceId] = timeNow
- }
|