online_data.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package handle
  4. import (
  5. "adm-ods/apis"
  6. "adm-ods/common.in/utils"
  7. dutils "adm-ods/utils"
  8. "fmt"
  9. "github.com/tidwall/gjson"
  10. "go.uber.org/zap"
  11. )
  12. // 实时处理数据源日志
  13. func HandleOnlineProviderLog(msg *apis.OdsMessage) (err error) {
  14. fmt.Println(utils.MarshalJsonString(*msg))
  15. //thirdpartyLog := gjson.Parse(msg.Content)
  16. providerApiCode := msg.SourceCode
  17. //providerApiCode := thirdpartyLog.Get("provider_api_code").String()
  18. if v, ok := ProviderApiCodeToSoucreCodeMap[providerApiCode]; ok {
  19. if task, ok1 := OdsOnlineTaskMap[v]; ok1 {
  20. // 赋值源
  21. msg.SourceCode = v
  22. // 获取数据源时间
  23. data := gjson.Parse(msg.Content)
  24. msg.Timestamp = data.Get("timestamp").Int()
  25. dataMapList, err := task.OnlineFunction(msg)
  26. if err == nil {
  27. if len(dataMapList) > 0 {
  28. dwsMsg := dutils.NewDwsMessage(msg)
  29. dwsMsg.Content = utils.MarshalJsonString(dataMapList)
  30. err = dutils.SendDwsMsg(dwsMsg)
  31. if err != nil {
  32. return err
  33. }
  34. }
  35. /*
  36. dwsMsgListLen := len(dataMapList)
  37. if dwsMsgListLen > 0 {
  38. for _, dwsMsg := range dwsMsgList {
  39. err = dutils.SendDwsMsg(dwsMsg)
  40. if err != nil {
  41. return err
  42. }
  43. }
  44. }*/
  45. }
  46. return err
  47. } else {
  48. l.Error("func",
  49. zap.String("call", "HandleOnlineProviderLog.ProviderApiCodeToSoucreCodeMap"),
  50. zap.String("args", v),
  51. zap.String("error", "provider msg no handle"))
  52. }
  53. } else {
  54. l.Error("func",
  55. zap.String("call", "HandleOnlineProviderLog.OdsOnlineTaskMap"),
  56. zap.String("args", providerApiCode),
  57. zap.String("error", "provider msg no mapping"))
  58. }
  59. return nil
  60. }
  61. func HandleOnlineGdLog(msg *apis.OdsMessage) error {
  62. return nil
  63. }