run_task.go 6.8 KB


  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package handle
  4. import (
  5. "adm-dws/apis"
  6. "adm-dws/common.in/clinit"
  7. "adm-dws/common.in/utils"
  8. "adm-dws/consts"
  9. "adm-dws/impl/task"
  10. "adm-dws/model"
  11. "adm-dws/pb"
  12. "adm-dws/pb/v1"
  13. dutils "adm-dws/utils"
  14. "context"
  15. "encoding/json"
  16. "fmt"
  17. "github.com/tidwall/gjson"
  18. "go.uber.org/zap"
  19. "gorm.io/gorm"
  20. "time"
  21. )
  22. func doTask(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (adsMsgList []*apis.AdsMessage, dwsMsgList []*apis.DwsMessage, err error) {
  23. lockKey, _ := dutils.GetPrimaryKey(dwsMessage.Content, task.OutputSourceCode, task.PrimaryKeys)
  24. if lockKey != "" {
  25. lock := utils.Lock{Key: lockKey, Ttl: 3}
  26. err := lock.TryRedisLock()
  27. if err != nil {
  28. return nil, nil, err
  29. }
  30. defer lock.RedisUnlock()
  31. }
  32. // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内
  33. /*if taskKey != ""{
  34. _,err := cache.Redis.Get(taskKey)
  35. if err == nil {
  36. return nil,nil ,nil
  37. }
  38. // 手动修改标记数据为手动修改,设置redis 过期时间
  39. if dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{
  40. cache.Redis.SetEx(taskKey,1*24*60*60,"")
  41. }
  42. }*/
  43. adsMsgList, dwsMsgList, err = task.Function(db, dwsMessage, task.OutputSourceCode)
  44. if err != nil {
  45. l.Error("func",
  46. zap.String("call", "task.Function"),
  47. zap.String("args", dwsMessage.SourceCode),
  48. zap.String("error", err.Error()))
  49. return nil, nil, err
  50. }
  51. return adsMsgList, dwsMsgList, err
  52. }
  53. func execTaskImpl(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (err error) {
  54. defer func() {
  55. if r := recover(); r != nil {
  56. err = fmt.Errorf("%+v", r)
  57. }
  58. }()
  59. adsMsgs, dwsMsgs, err := doTask(db, task, dwsMessage)
  60. if err != nil {
  61. return err
  62. }
  63. for _, adsMsg := range adsMsgs {
  64. err := dutils.SendAdsMsg(adsMsg)
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. // 发送消息到dws层
  70. for _, dwsMsg := range dwsMsgs {
  71. err := dutils.SendDwsMsg(dwsMsg)
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. return nil
  77. }
  78. func ExecTaskImpl(taskIdList []int32, dwsMessage *apis.DwsMessage, contentLen int) (err error) {
  79. dwsMessage.MsgLen = contentLen
  80. for _, v := range taskIdList {
  81. if v == 10009 {
  82. if contentLen > 1 {
  83. continue
  84. }
  85. }
  86. /*if v == 10017 {
  87. if contentLen == 1 {
  88. continue
  89. }
  90. }*/
  91. fmt.Println("exec task:", v)
  92. if task, ok := DwsTaskMap[v]; ok {
  93. db := clinit.DB().Begin()
  94. err = execTaskImpl(db, task, dwsMessage)
  95. if err != nil {
  96. db.Rollback()
  97. return err
  98. } else {
  99. db.Commit()
  100. }
  101. }
  102. }
  103. return nil
  104. }
  105. // 执行任务
  106. func ExecTask(taskIdList []int32, dwsMessage *apis.DwsMessage) error {
  107. //var adsMsgList []*apis.AdsMessage
  108. //var dwsMsgList []*apis.DwsMessage
  109. //fmt.Println("dws content:",dwsMessage.Content)
  110. contentList := gjson.Parse(dwsMessage.Content).Array()
  111. //fmt.Println("dws contentlist :",contentList)
  112. contentLen := len(contentList)
  113. for _, v := range contentList {
  114. dwsMessage.Content = v.String()
  115. err := ExecTaskImpl(taskIdList, dwsMessage, contentLen)
  116. if err != nil {
  117. return err
  118. }
  119. /*if len(adsMsgs) > 0 {
  120. adsMsgList = append(adsMsgList,adsMsgs...)
  121. }
  122. if len(dwsMsgs) > 0 {
  123. dwsMsgList = append(dwsMsgList,dwsMsgs...)
  124. }*/
  125. }
  126. // 发送消息到ads层
  127. /*if len(adsMsgList) > 0 {
  128. err := dutils.SendAdsMsg(adsMsgList)
  129. if err != nil{
  130. return err
  131. }
  132. }*/
  133. /*for _,adsMsg := range adsMsgList{
  134. err := dutils.SendAdsMsg(adsMsg)
  135. if err != nil{
  136. return err
  137. }
  138. }
  139. // 发送消息到dws层
  140. for _,dwsMsg := range dwsMsgList {
  141. err := dutils.SendDwsMsg(dwsMsg)
  142. if err != nil{
  143. return err
  144. }
  145. }*/
  146. return nil
  147. }
  148. //(通过接口获取)通过来源获取任务列表
  149. func GetTaskList(sourceCode string) ([]int32, error) {
  150. req := &v1.GetTaskBySourceCodeRequest{SourceCode: sourceCode}
  151. reply, err := pb.AdmTask.GetTaskBySourceCode(context.Background(), req)
  152. if err != nil {
  153. l.Error("rpc",
  154. zap.String("call", "pb.AdmTask.GetTaskBySourceCode"),
  155. zap.String("args", sourceCode),
  156. zap.String("error", err.Error()))
  157. return nil, err
  158. }
  159. return reply.TaskList, nil
  160. }
  161. // 运行任务
  162. func RunTask(data []byte) (err error) {
  163. dwsMessage := &apis.DwsMessage{}
  164. err = json.Unmarshal(data, dwsMessage)
  165. if err != nil {
  166. l.Error("func",
  167. zap.String("call", "RunTask"),
  168. zap.String("args", string(data)),
  169. zap.String("error", err.Error()))
  170. return nil
  171. }
  172. if dwsMessage.SourceCode == "" && len(dwsMessage.TaskList) == 0 {
  173. l.Error("func",
  174. zap.String("call", "RunTask"),
  175. zap.String("args", string(data)),
  176. zap.String("error", "参数错误,soruce code和task list都为空"))
  177. return nil
  178. }
  179. // 开始时间
  180. startTime := uint64(time.Now().UnixNano())
  181. // 捕获各个task中的异常并返回给调用者
  182. defer func() {
  183. status := "SUCCESS"
  184. if r := recover(); r != nil {
  185. err = fmt.Errorf("%+v", r)
  186. l.Error("err",
  187. zap.String("run_task", err.Error()),
  188. zap.Stack("stacktrace"))
  189. }
  190. if err != nil {
  191. status = "FAIL"
  192. consumeFail := &model.ConsumeFail{}
  193. consumeFail.Content = string(data)
  194. err = consumeFail.Insert(clinit.DB())
  195. if err != nil {
  196. l.Error("mysql",
  197. zap.String("sql", "insert into t_adm_dws_consume_fail"),
  198. zap.String("fields", string(data)),
  199. zap.String("error", err.Error()))
  200. }
  201. }
  202. printAccessLog(dwsMessage.OdsMsgType, dwsMessage.SourceCode, startTime, status)
  203. }()
  204. // 离线导入先采用直选任务
  205. var taskList []int32
  206. if dwsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT || dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{
  207. taskList = dwsMessage.TaskList
  208. }
  209. // 如果直选任务为空,采用默认任务
  210. if len(taskList) == 0 {
  211. taskList, err = GetTaskList(dwsMessage.SourceCode)
  212. if err != nil {
  213. l.Error("func",
  214. zap.String("call", "GetTaskList"),
  215. zap.String("args", dwsMessage.SourceCode),
  216. zap.String("error", err.Error()))
  217. return err
  218. }
  219. }
  220. // 任务拆分
  221. var adsTaskList []int32
  222. var dwsTaskList []int32
  223. for _, v := range taskList {
  224. if v >= 20000 { // 大于2万表示是ads层任务
  225. adsTaskList = append(adsTaskList, v)
  226. } else { // 小于表示为dws层任务
  227. dwsTaskList = append(dwsTaskList, v)
  228. }
  229. }
  230. // 发送消息到ads层
  231. if len(adsTaskList) > 0 {
  232. //var adsMsgList []*apis.AdsMessage
  233. adsMsg := task.NewAdsMessage(dwsMessage, "", "")
  234. adsMsg.TaskList = adsTaskList
  235. contentList := gjson.Parse(dwsMessage.Content).Array()
  236. for _, v := range contentList {
  237. adsMsg.Content = v.String()
  238. err = dutils.SendAdsMsg(adsMsg)
  239. if err != nil {
  240. return err
  241. }
  242. //adsMsgList = append(adsMsgList,adsMsg)
  243. }
  244. /*if len(adsMsgList) > 0 {
  245. err = dutils.SendAdsMsg(adsMsgList)
  246. if err != nil{
  247. return err
  248. }
  249. }*/
  250. }
  251. // 处理dws层任务
  252. if len(dwsTaskList) > 0 {
  253. err = ExecTask(dwsTaskList, dwsMessage)
  254. if err != nil {
  255. l.Error("func",
  256. zap.String("call", "ExecTask"),
  257. zap.String("args", dwsMessage.SourceCode),
  258. zap.String("error", err.Error()))
  259. }
  260. return err
  261. }
  262. return nil
  263. }