common.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package utils
  4. import (
  5. "adm-ods/apis"
  6. "adm-ods/common.in/cache"
  7. "adm-ods/common.in/config"
  8. "adm-ods/common.in/utils"
  9. "adm-ods/model"
  10. "adm-ods/pb"
  11. "adm-ods/pb/v1"
  12. "context"
  13. "encoding/json"
  14. "fmt"
  15. "google.golang.org/grpc/status"
  16. "strconv"
  17. "strings"
  18. "time"
  19. "unicode"
  20. "adm-ods/common.in/clinit"
  21. "adm-ods/common.in/mq"
  22. "go.uber.org/zap"
  23. )
  24. func getKey(taskId int64) string {
  25. return fmt.Sprintf("offline-task-%d", taskId)
  26. }
  27. func getStopKey(taskId int64) string {
  28. return fmt.Sprintf("offline-task-stop-%d", taskId)
  29. }
  30. func DeleteLastId(taskId int64) {
  31. key := getKey(taskId)
  32. cache.Redis.Del(key)
  33. }
  34. func GetLastId(taskId int64) int64 {
  35. key := getKey(taskId)
  36. idstr, err := cache.Redis.Get(key)
  37. if err != nil {
  38. return 0
  39. }
  40. id, _ := strconv.Atoi(idstr)
  41. return int64(id)
  42. //return 9
  43. }
  44. func IsStop(taskId int64) bool {
  45. key := getStopKey(taskId)
  46. _, err := cache.Redis.Get(key)
  47. if err != nil {
  48. return false
  49. }
  50. DeleteLastId(taskId)
  51. return true
  52. }
  53. func SetLastId(taskId int64, lastId int64) error {
  54. key := getKey(taskId)
  55. //_, err := cache.Redis.Set(key, fmt.Sprintf("%d", lastId))
  56. _, err := cache.Redis.IncrBy(key, 1)
  57. return err
  58. }
  59. func BuildSql(msg *apis.OdsMessage, pageSize, pageNum int) string {
  60. //content := gjson.Parse(msg.Content)
  61. // oss excel 地址
  62. lastId := GetLastId(msg.OfflineTaskId)
  63. //sql := content.Get("sql").String()
  64. sql := msg.Content
  65. if lastId != 0 {
  66. sqlList := strings.Split(sql, " ")
  67. existWhere := false
  68. for _, v := range sqlList {
  69. if strings.TrimSpace(v) == "where" {
  70. // 存在条件
  71. existWhere = true
  72. }
  73. }
  74. if existWhere {
  75. sql = fmt.Sprintf("%s and id>%d ", sql, lastId)
  76. } else {
  77. sql = fmt.Sprintf("%s where id>%d ", sql, lastId)
  78. }
  79. }
  80. sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, pageNum*pageSize)
  81. return sql
  82. }
  83. func checkField(checkMap map[string]string) error {
  84. content := utils.MarshalJsonString(checkMap)
  85. req := &v1.FieldCheckRequest{Content: content}
  86. _, err := pb.AdmTask.FieldCheck(context.Background(), req)
  87. if err != nil {
  88. l.Error("rpc",
  89. zap.String("call", "pb.AdmTask.FieldCheck"),
  90. zap.String("args", content),
  91. zap.String("error", err.Error()))
  92. return err
  93. }
  94. return nil
  95. }
  96. func checkDwsMsgParams(checkMap map[string]string) error {
  97. for key, value := range checkMap {
  98. if value == "0" {
  99. checkMap[key] = ""
  100. } else {
  101. checkMap[key] = strings.TrimSpace(value)
  102. }
  103. }
  104. var plateNo, plateType, vin, idcert string
  105. if v, ok := checkMap["plate_no"]; ok {
  106. plateNo = v
  107. }
  108. if v, ok := checkMap["plate_type"]; ok {
  109. plateType = v
  110. }
  111. if v, ok := checkMap["vin"]; ok {
  112. vin = v
  113. }
  114. if v, ok := checkMap["idcert"]; ok {
  115. idcert = v
  116. }
  117. if plateNo != "" || vin != "" || idcert != "" {
  118. req := &v1.CheckFormatRequest{PlateNo: plateNo, PlateType: plateType, Vin: vin, Idcert: idcert}
  119. reply, err := pb.AdmTask.CheckFormat(context.Background(), req)
  120. fmt.Println("check format :", reply, err)
  121. if err != nil {
  122. l.Error("rpc",
  123. zap.String("call", "pb.AdmTask.CheckFormat"),
  124. zap.String("args", utils.MarshalJsonString(*req)),
  125. zap.String("error", err.Error()))
  126. return err
  127. }
  128. if plateType == "" && reply.PlateType != "" {
  129. checkMap["plate_type"] = reply.PlateType
  130. }
  131. checkMap["vin"] = reply.Vin
  132. checkMap["idcert"] = reply.Idcert
  133. checkMap["plate_no"] = reply.PlateNo
  134. }
  135. return nil
  136. }
  137. // 发送消息到dws
  138. func SendDwsMsg(dwsMsg *apis.DwsMessage) error {
  139. var checkMapList []map[string]string
  140. json.Unmarshal([]byte(dwsMsg.Content), &checkMapList)
  141. if len(checkMapList) <= 0 {
  142. return nil
  143. }
  144. if config.Conf.RunMode != "prod" {
  145. fmt.Println("Check Start:", checkMapList)
  146. }
  147. // 标准字段检查
  148. err := checkField(checkMapList[0])
  149. if err != nil {
  150. status := status.Convert(err)
  151. if status.Code() == 20003 {
  152. // 参数错误
  153. fmt.Println("字段检查失败:", status.Message())
  154. return nil
  155. } else {
  156. // 服务错误
  157. fmt.Println("参数检查调用失败:", err)
  158. return err
  159. }
  160. }
  161. // 参数解析不通过不发送消息
  162. for index, _ := range checkMapList {
  163. err = checkDwsMsgParams(checkMapList[index])
  164. if err != nil {
  165. status := status.Convert(err)
  166. if status.Code() == 20003 {
  167. // 参数错误
  168. return nil
  169. } else {
  170. // 服务错误
  171. return err
  172. }
  173. }
  174. }
  175. if config.Conf.RunMode != "prod" {
  176. fmt.Println("Check End:", checkMapList)
  177. }
  178. dwsMsg.Content = utils.MarshalJsonString(checkMapList)
  179. mqMsg, _ := json.Marshal(*dwsMsg)
  180. err = mq.DwsMq.PublishMsg(mqMsg)
  181. if err != nil {
  182. failMsg := &model.FailMsg{}
  183. failMsg.Msg = string(mqMsg)
  184. err = failMsg.Insert(clinit.DB())
  185. if err != nil {
  186. l.Error("mysql",
  187. zap.String("sql", "insert into t_adm-ods_fail_msg"),
  188. zap.String("fields", failMsg.Msg),
  189. zap.String("error", err.Error()))
  190. }
  191. return err
  192. }
  193. return nil
  194. }
  195. func NewDwsMessage(msg *apis.OdsMessage) *apis.DwsMessage {
  196. dwsMsg := &apis.DwsMessage{}
  197. dwsMsg.OdsMsgType = msg.MsgType
  198. dwsMsg.SourceCode = msg.SourceCode
  199. dwsMsg.TaskList = msg.TaskList
  200. dwsMsg.OfflineTaskId = msg.OfflineTaskId
  201. //dwsMsg.SourceLevel = msg.SourceLevel
  202. if msg.Timestamp != 0 {
  203. dwsMsg.Timestamp = msg.Timestamp
  204. } else {
  205. dwsMsg.Timestamp = time.Now().Unix()
  206. }
  207. /* if msg.SourceCode == consts.SOURCEODS2 || msg.SourceCode == consts.SOURCEODS4 {
  208. dwsMsg.NeedSleep = true
  209. }
  210. */
  211. return dwsMsg
  212. }
  213. func IsPlate(str string) bool {
  214. for _, r := range str {
  215. if unicode.Is(unicode.Han, r) {
  216. return true
  217. }
  218. }
  219. return false
  220. }