run_task.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package handle
  4. import (
  5. "adm-ads/apis"
  6. "adm-ads/common.in/clinit"
  7. "adm-ads/common.in/utils"
  8. "adm-ads/consts"
  9. "adm-ads/model"
  10. "adm-ads/pb"
  11. "adm-ads/pb/v1"
  12. dutils "adm-ads/utils"
  13. "context"
  14. "encoding/json"
  15. "fmt"
  16. "go.uber.org/zap"
  17. "time"
  18. )
  19. func doTask(task DwsTask,adsMessage *apis.AdsMessage)error{
  20. lockKey,_ := dutils.GetPrimaryKey(adsMessage.Content,task.OutputSourceCode,task.PrimaryKeys)
  21. // 分布式锁
  22. if lockKey != ""{
  23. lock := utils.Lock{Key:lockKey,Ttl:3}
  24. err := lock.TryRedisLock()
  25. if err != nil{
  26. return err
  27. }
  28. defer lock.RedisUnlock()
  29. }
  30. // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内
  31. /*if taskKey != ""{
  32. _,err := cache.Redis.Get(taskKey)
  33. if err == nil {
  34. return nil
  35. }
  36. }*/
  37. err := task.Function(adsMessage)
  38. if err != nil{
  39. l.Error("func",
  40. zap.String("call", "task"),
  41. zap.String("args", adsMessage.SourceCode),
  42. zap.String("error", err.Error()))
  43. return err
  44. }
  45. return nil
  46. }
  47. // 执行任务
  48. func ExecTask(taskId []int32,adsMessage *apis.AdsMessage) error {
  49. for _,v := range taskId{
  50. fmt.Println("exec task:",v)
  51. if task,ok:= taskMap[v];ok{
  52. err := doTask(task,adsMessage)
  53. if err != nil{
  54. return err
  55. }
  56. }
  57. }
  58. return nil
  59. }
  60. // (通过接口获取)通过来源获取任务列表
  61. func GetTaskList(sourceCode string) ([]int32,error) {
  62. req := &v1.GetTaskBySourceCodeRequest{SourceCode:sourceCode}
  63. reply,err := pb.AdmTask.GetTaskBySourceCode(context.Background(),req)
  64. if err != nil{
  65. l.Error("rpc",
  66. zap.String("call", "pb.AdmTask.GetTaskBySourceCode"),
  67. zap.String("args", sourceCode),
  68. zap.String("error", err.Error()))
  69. return nil,err
  70. }
  71. return reply.TaskList,nil
  72. }
  73. func RunTaskBak(data []byte)(err error) {
  74. fmt.Println("msg:",string(data))
  75. var adsMsgList []apis.AdsMessage
  76. err = json.Unmarshal(data,&adsMsgList)
  77. if err != nil{
  78. l.Error("func",
  79. zap.String("call", "RunTask"),
  80. zap.String("args", string(data)),
  81. zap.String("error", err.Error()))
  82. return nil
  83. }
  84. /*for _,v := range adsMsgList{
  85. err = RunTaskImpl(&v)
  86. if err != nil{
  87. return err
  88. }
  89. }*/
  90. return nil
  91. }
  92. // 运行任务
  93. func RunTask(data []byte)(err error) {
  94. adsMessage := &apis.AdsMessage{}
  95. err = json.Unmarshal(data,adsMessage)
  96. if err != nil{
  97. l.Error("func",
  98. zap.String("call", "RunTask"),
  99. zap.String("args", string(data)),
  100. zap.String("error", err.Error()))
  101. return nil
  102. }
  103. if adsMessage.SourceCode == "" && len(adsMessage.TaskList) == 0 {
  104. l.Error("func",
  105. zap.String("call", "RunTask"),
  106. zap.String("args",adsMessage.Content),
  107. zap.String("error", "参数错误,soruce code和task list都为空"))
  108. return nil
  109. }
  110. // 开始时间
  111. startTime := uint64(time.Now().UnixNano())
  112. // 捕获各个task中的异常并返回给调用者
  113. defer func() {
  114. status := "SUCCESS"
  115. if r := recover(); r != nil {
  116. err = fmt.Errorf("%+v", r)
  117. l.Error("err",
  118. zap.String("run_task", err.Error()),
  119. zap.Stack("stacktrace"))
  120. }
  121. if err != nil{
  122. status = "FAIL"
  123. consumeFail := &model.ConsumeFail{}
  124. consumeFail.Content = utils.MarshalJsonString(adsMessage)
  125. err = consumeFail.Insert(clinit.DB())
  126. if err != nil{
  127. l.Error("mysql",
  128. zap.String("sql", "insert into t_adm_ads_consume_fail"),
  129. zap.String("fields", consumeFail.Content),
  130. zap.String("error", err.Error()))
  131. }
  132. }
  133. printAccessLog(adsMessage.SourceCode,adsMessage.Action,startTime, status)
  134. }()
  135. var taskList []int32
  136. if adsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT{
  137. taskList = adsMessage.TaskList
  138. }
  139. // 任务不为空,只执行消息中的任务,为空采用默认任务
  140. if len(taskList) == 0{
  141. taskList,err = GetTaskList(adsMessage.SourceCode)
  142. if err != nil{
  143. l.Error("func",
  144. zap.String("call", "GetTaskList"),
  145. zap.String("args", adsMessage.SourceCode),
  146. zap.String("error", err.Error()))
  147. return err
  148. }
  149. }
  150. err = ExecTask(taskList,adsMessage)
  151. if err != nil{
  152. l.Error("func",
  153. zap.String("call", "ExecTask"),
  154. zap.String("args", adsMessage.SourceCode),
  155. zap.String("error", err.Error()))
  156. }
  157. return err
  158. }