run.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package handle
  4. import (
  5. "adm-ods/apis"
  6. "adm-ods/common.in/clinit"
  7. "adm-ods/consts"
  8. "adm-ods/model"
  9. "encoding/json"
  10. "fmt"
  11. "time"
  12. "go.uber.org/zap"
  13. )
  14. // 处理ods数据任务
  15. func Run(data []byte) (err error) {
  16. odsMessage := &apis.OdsMessage{}
  17. err = json.Unmarshal(data, odsMessage)
  18. if err != nil {
  19. l.Error("func",
  20. zap.String("call", "Run"),
  21. zap.String("args", string(data)),
  22. zap.String("error", err.Error()))
  23. return nil
  24. }
  25. // 开始时间
  26. startTime := uint64(time.Now().UnixNano())
  27. // 捕获各个task中的异常并返回给调用者
  28. defer func() {
  29. status := "SUCCESS"
  30. if r := recover(); r != nil {
  31. err = fmt.Errorf("%+v", r)
  32. l.Error("err",
  33. zap.String("run_task", err.Error()),
  34. zap.Stack("stacktrace"))
  35. }
  36. if err != nil {
  37. //if err != nil && err != errors.UpdateTaskError && err != errors.SenMqError {
  38. consumeFail := &model.ConsumeFail{}
  39. consumeFail.Content = string(data)
  40. err = consumeFail.Insert(clinit.DB())
  41. if err != nil {
  42. l.Error("mysql",
  43. zap.String("sql", "insert into t_adm_ods_consume_fail"),
  44. zap.String("fields", string(data)),
  45. zap.String("error", err.Error()))
  46. }
  47. status = "FAIL"
  48. }
  49. printAccessLog(odsMessage.MsgType, startTime, status)
  50. }()
  51. if odsMessage.MsgType == consts.ODSPROVIDERLOG {
  52. // 处理三方日志
  53. err = HandleOnlineProviderLog(odsMessage)
  54. } else if odsMessage.MsgType == consts.ODSGDLOG {
  55. // 处理gd调用日志
  56. err = HandleOnlineGdLog(odsMessage)
  57. } else if odsMessage.MsgType == consts.ODSOFFLINEIMPORT {
  58. // 处理离线任务
  59. go HandleOfflineData(odsMessage)
  60. } else if odsMessage.MsgType == consts.ODSMANUALAMENDMENT {
  61. err = HandleManualAmendmentData(odsMessage)
  62. }
  63. return err
  64. }