123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package timetask
- import (
- "fmt"
- "git.getensh.com/common/gopkgs/database"
- "git.getensh.com/common/gopkgs/logger"
- "go.uber.org/zap"
- "property-device/errors"
- "property-device/model"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- var gateStatus = map[string]int64{}
- var gateMutex sync.Mutex
- func setGateStatus(sn string, protocol int32, status int32) error {
- if sn == "" || protocol == 0 {
- return nil
- }
- p := model.TGate{}
- where := [][2]interface{}{}
- where = model.WhereAdd(where, "sn", sn)
- where = model.WhereAdd(where, "protocol", protocol)
- values := map[string]interface{}{
- "status": status,
- }
- err := p.Update(database.DB(), where, values)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- func initGateStatus() {
- gateMutex.Lock()
- defer gateMutex.Unlock()
- p := model.TGate{}
- where := [][2]interface{}{}
- where = model.WhereAdd(where, "status", 1)
- page := 1
- for {
- list, err := p.List(database.DB(), where, nil, page, 100)
- if err != nil {
- panic("init status failed:" + err.Error())
- }
- if len(list) == 0 {
- break
- }
- page += 1
- for _, v := range list {
- key := GetStatusKey(v.Sn, v.Protocol)
- gateStatus[key] = time.Now().Unix()
- }
- }
- }
- func gateStatusTaskImpl() {
- defer func() {
- if r := recover(); r != nil {
- err := fmt.Errorf("%+v", r)
- logger.Error("err",
- zap.String("system_err", err.Error()),
- zap.Stack("stacktrace"))
- }
- }()
- timeNow := time.Now().Unix()
- for k, v := range gateStatus {
- // 超过3分钟没有连接认为超时
- if (timeNow - v) > 60*3 {
- gateMutex.Lock()
- // 设置状态成功后清除缓存
- sn, proto := ParseStatusKey(k)
- err := setGateStatus(sn, proto, 2)
- if err == nil {
- delete(gateStatus, k)
- }
- gateMutex.Unlock()
- }
- }
- }
- /*
- func checkGateStatusDb() {
- p := model.TGate{}
- where := map[string]interface{}{
- "status": 1,
- }
- page := 1
- for {
- list, err := p.List(database.DB(), where, nil, 1, 100)
- if err != nil {
- return
- }
- if len(list) == 0 {
- return
- }
- page++
- ids := []string{}
- for _, v := range list {
- gateMutex.Lock()
- if _, ok := gateStatus[v.ID]; !ok {
- ids = append(ids, v.ID)
- }
- gateMutex.Unlock()
- }
- if len(ids) == 0 {
- continue
- }
- // 批量设置离线
- batchSetGateStatus(ids, 2)
- }
- }
- func batchSetGateStatus(deviceIds []string, status int32) error {
- p := model.TGate{}
- where := map[string]interface{}{
- "id in":deviceIds,
- }
- values := map[string]interface{}{
- "status": status,
- }
- err := p.Update(database.DB(), where, values)
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- */
- func GateStatusTask() {
- initGateStatus()
- for {
- // 任务实现
- gateStatusTaskImpl()
- // 休眠60s
- time.Sleep(60 * time.Second)
- }
- }
- func GetStatusKey(sn string, protocol int32) string {
- return fmt.Sprintf("%v----%v", sn, protocol)
- }
- func ParseStatusKey(key string) (string, int32) {
- array := strings.Split(key, "----")
- if len(array) < 2 {
- return "", 0
- }
- sn := array[0]
- protocol, _ := strconv.Atoi(array[1])
- return sn, int32(protocol)
- }
- func UpdateGateTime(sn string, protocol int32) {
- timeNow := time.Now().Unix()
- key := GetStatusKey(sn, protocol)
- gateMutex.Lock()
- defer gateMutex.Unlock()
- // 重新启动或设备上线
- if _, ok := gateStatus[key]; !ok {
- // 设置为在线
- err := setGateStatus(sn, protocol, 1)
- if err != nil {
- // 设置失败不更新本地缓存
- return
- }
- }
- // 更新时间为当前时间搓
- gateStatus[key] = timeNow
- }
|