gate.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package timetask
  2. import (
  3. "fmt"
  4. "git.getensh.com/common/gopkgs/database"
  5. "git.getensh.com/common/gopkgs/logger"
  6. "go.uber.org/zap"
  7. "property-device/errors"
  8. "property-device/model"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. )
  14. var gateStatus = map[string]int64{}
  15. var gateMutex sync.Mutex
  16. func setGateStatus(sn string, protocol int32, status int32) error {
  17. if sn == "" || protocol == 0 {
  18. return nil
  19. }
  20. p := model.TGate{}
  21. where := [][2]interface{}{}
  22. where = model.WhereAdd(where, "sn", sn)
  23. where = model.WhereAdd(where, "protocol", protocol)
  24. values := map[string]interface{}{
  25. "status": status,
  26. }
  27. err := p.Update(database.DB(), where, values)
  28. if err != nil {
  29. return errors.DataBaseError
  30. }
  31. return nil
  32. }
  33. func initGateStatus() {
  34. gateMutex.Lock()
  35. defer gateMutex.Unlock()
  36. p := model.TGate{}
  37. where := [][2]interface{}{}
  38. where = model.WhereAdd(where, "status", 1)
  39. page := 1
  40. for {
  41. list, err := p.List(database.DB(), where, nil, page, 100)
  42. if err != nil {
  43. panic("init status failed:" + err.Error())
  44. }
  45. if len(list) == 0 {
  46. break
  47. }
  48. page += 1
  49. for _, v := range list {
  50. key := GetStatusKey(v.Sn, v.Protocol)
  51. gateStatus[key] = time.Now().Unix()
  52. }
  53. }
  54. }
  55. func gateStatusTaskImpl() {
  56. defer func() {
  57. if r := recover(); r != nil {
  58. err := fmt.Errorf("%+v", r)
  59. logger.Error("err",
  60. zap.String("system_err", err.Error()),
  61. zap.Stack("stacktrace"))
  62. }
  63. }()
  64. timeNow := time.Now().Unix()
  65. for k, v := range gateStatus {
  66. // 超过3分钟没有连接认为超时
  67. if (timeNow - v) > 60*3 {
  68. gateMutex.Lock()
  69. // 设置状态成功后清除缓存
  70. sn, proto := ParseStatusKey(k)
  71. err := setGateStatus(sn, proto, 2)
  72. if err == nil {
  73. delete(gateStatus, k)
  74. }
  75. gateMutex.Unlock()
  76. }
  77. }
  78. }
  79. /*
  80. func checkGateStatusDb() {
  81. p := model.TGate{}
  82. where := map[string]interface{}{
  83. "status": 1,
  84. }
  85. page := 1
  86. for {
  87. list, err := p.List(database.DB(), where, nil, 1, 100)
  88. if err != nil {
  89. return
  90. }
  91. if len(list) == 0 {
  92. return
  93. }
  94. page++
  95. ids := []string{}
  96. for _, v := range list {
  97. gateMutex.Lock()
  98. if _, ok := gateStatus[v.ID]; !ok {
  99. ids = append(ids, v.ID)
  100. }
  101. gateMutex.Unlock()
  102. }
  103. if len(ids) == 0 {
  104. continue
  105. }
  106. // 批量设置离线
  107. batchSetGateStatus(ids, 2)
  108. }
  109. }
  110. func batchSetGateStatus(deviceIds []string, status int32) error {
  111. p := model.TGate{}
  112. where := map[string]interface{}{
  113. "id in":deviceIds,
  114. }
  115. values := map[string]interface{}{
  116. "status": status,
  117. }
  118. err := p.Update(database.DB(), where, values)
  119. if err != nil {
  120. return errors.DataBaseError
  121. }
  122. return nil
  123. }
  124. */
  125. func GateStatusTask() {
  126. initGateStatus()
  127. for {
  128. // 任务实现
  129. gateStatusTaskImpl()
  130. // 休眠60s
  131. time.Sleep(60 * time.Second)
  132. }
  133. }
  134. func GetStatusKey(sn string, protocol int32) string {
  135. return fmt.Sprintf("%v----%v", sn, protocol)
  136. }
  137. func ParseStatusKey(key string) (string, int32) {
  138. array := strings.Split(key, "----")
  139. if len(array) < 2 {
  140. return "", 0
  141. }
  142. sn := array[0]
  143. protocol, _ := strconv.Atoi(array[1])
  144. return sn, int32(protocol)
  145. }
  146. func UpdateGateTime(sn string, protocol int32) {
  147. timeNow := time.Now().Unix()
  148. key := GetStatusKey(sn, protocol)
  149. gateMutex.Lock()
  150. defer gateMutex.Unlock()
  151. // 重新启动或设备上线
  152. if _, ok := gateStatus[key]; !ok {
  153. // 设置为在线
  154. err := setGateStatus(sn, protocol, 1)
  155. if err != nil {
  156. // 设置失败不更新本地缓存
  157. return
  158. }
  159. }
  160. // 更新时间为当前时间搓
  161. gateStatus[key] = timeNow
  162. }