gate.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package timetask
  2. import (
  3. "fmt"
  4. "git.getensh.com/common/gopkgs/database"
  5. "git.getensh.com/common/gopkgs/logger"
  6. "go.uber.org/zap"
  7. "property-mqtt/errors"
  8. "property-mqtt/model"
  9. "sync"
  10. "time"
  11. )
  12. var gateStatus = map[string]int64{}
  13. var gateMutex sync.Mutex
  14. func setGateStatus(deviceId string, status int32) error {
  15. p := model.TGate{}
  16. where := map[string]interface{}{
  17. "id": deviceId,
  18. }
  19. values := map[string]interface{}{
  20. "status": status,
  21. }
  22. err := p.Update(database.DB(), where, values)
  23. if err != nil {
  24. return errors.DataBaseError
  25. }
  26. return nil
  27. }
  28. func initGateStatus() {
  29. gateMutex.Lock()
  30. defer gateMutex.Unlock()
  31. p := model.TGate{}
  32. where := map[string]interface{}{
  33. "status": 1,
  34. }
  35. page := 1
  36. for {
  37. list, err := p.List(database.DB(), where, nil, page, 100)
  38. if err != nil {
  39. panic("init status failed:" + err.Error())
  40. }
  41. if len(list) == 0 {
  42. break
  43. }
  44. page += 1
  45. for _, v := range list {
  46. gateStatus[v.ID] = time.Now().Unix()
  47. }
  48. }
  49. }
  50. func gateStatusTaskImpl() {
  51. defer func() {
  52. if r := recover(); r != nil {
  53. err := fmt.Errorf("%+v", r)
  54. logger.Error("err",
  55. zap.String("system_err", err.Error()),
  56. zap.Stack("stacktrace"))
  57. }
  58. }()
  59. timeNow := time.Now().Unix()
  60. for k, v := range gateStatus {
  61. // 超过3分钟没有连接认为超时
  62. if (timeNow - v) > 60*3 {
  63. gateMutex.Lock()
  64. // 设置状态成功后清除缓存
  65. err := setGateStatus(k, 2)
  66. if err == nil {
  67. delete(gateStatus, k)
  68. }
  69. gateMutex.Unlock()
  70. }
  71. }
  72. }
  73. /*
  74. func checkGateStatusDb() {
  75. p := model.TGate{}
  76. where := map[string]interface{}{
  77. "status": 1,
  78. }
  79. page := 1
  80. for {
  81. list, err := p.List(database.DB(), where, nil, 1, 100)
  82. if err != nil {
  83. return
  84. }
  85. if len(list) == 0 {
  86. return
  87. }
  88. page++
  89. ids := []string{}
  90. for _, v := range list {
  91. gateMutex.Lock()
  92. if _, ok := gateStatus[v.ID]; !ok {
  93. ids = append(ids, v.ID)
  94. }
  95. gateMutex.Unlock()
  96. }
  97. if len(ids) == 0 {
  98. continue
  99. }
  100. // 批量设置离线
  101. batchSetGateStatus(ids, 2)
  102. }
  103. }
  104. func batchSetGateStatus(deviceIds []string, status int32) error {
  105. p := model.TGate{}
  106. where := map[string]interface{}{
  107. "id in":deviceIds,
  108. }
  109. values := map[string]interface{}{
  110. "status": status,
  111. }
  112. err := p.Update(database.DB(), where, values)
  113. if err != nil {
  114. return errors.DataBaseError
  115. }
  116. return nil
  117. }
  118. */
  119. func GateStatusTask() {
  120. initGateStatus()
  121. for {
  122. // 任务实现
  123. gateStatusTaskImpl()
  124. // 休眠60s
  125. time.Sleep(60 * time.Second)
  126. }
  127. }
  128. func UpdateGateTime(deviceId string) {
  129. timeNow := time.Now().Unix()
  130. gateMutex.Lock()
  131. defer gateMutex.Unlock()
  132. // 重新启动或设备上线
  133. if _, ok := gateStatus[deviceId]; !ok {
  134. // 设置为在线
  135. err := setGateStatus(deviceId, 1)
  136. if err != nil {
  137. // 设置失败不更新本地缓存
  138. return
  139. }
  140. }
  141. // 更新时间为当前时间搓
  142. gateStatus[deviceId] = timeNow
  143. }