online_data.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. // Copyright 2019 autocareai.com. All rights reserved.
  2. // Use of this source code is governed by autocareai.com.
  3. package handle
  4. import (
  5. "gadm-ods/apis"
  6. "gadm-ods/common.in/config"
  7. "gadm-ods/common.in/utils"
  8. dutils "gadm-ods/utils"
  9. "fmt"
  10. "github.com/tidwall/gjson"
  11. "go.uber.org/zap"
  12. )
  13. // 实时处理数据源日志
  14. func HandleOnlineProviderLog(msg *apis.OdsMessage) (err error) {
  15. if config.Conf.RunMode != "prod"{
  16. fmt.Println(utils.MarshalJsonString(*msg))
  17. }
  18. //thirdpartyLog := gjson.Parse(msg.Content)
  19. providerApiCode := msg.SourceCode
  20. //providerApiCode := thirdpartyLog.Get("provider_api_code").String()
  21. if v, ok := ProviderApiCodeToSoucreCodeMap[providerApiCode]; ok {
  22. if task, ok1 := OdsOnlineTaskMap[v]; ok1 {
  23. // 赋值源
  24. msg.SourceCode = v
  25. // 获取数据源时间
  26. data := gjson.Parse(msg.Content)
  27. msg.Timestamp = data.Get("timestamp").Int()
  28. dataMapList, err := task.OnlineFunction(msg)
  29. if err == nil {
  30. if len(dataMapList) > 0 {
  31. dwsMsg := dutils.NewDwsMessage(msg)
  32. dwsMsg.Content = utils.MarshalJsonString(dataMapList)
  33. err = dutils.SendDwsMsg(dwsMsg)
  34. if err != nil {
  35. return err
  36. }
  37. }
  38. /*
  39. dwsMsgListLen := len(dataMapList)
  40. if dwsMsgListLen > 0 {
  41. for _, dwsMsg := range dwsMsgList {
  42. err = dutils.SendDwsMsg(dwsMsg)
  43. if err != nil {
  44. return err
  45. }
  46. }
  47. }*/
  48. }
  49. return err
  50. } else {
  51. l.Error("func",
  52. zap.String("call", "HandleOnlineProviderLog.ProviderApiCodeToSoucreCodeMap"),
  53. zap.String("args", v),
  54. zap.String("error", "provider msg no handle"))
  55. }
  56. } else {
  57. l.Error("func",
  58. zap.String("call", "HandleOnlineProviderLog.OdsOnlineTaskMap"),
  59. zap.String("args", providerApiCode),
  60. zap.String("error", "provider msg no mapping"))
  61. }
  62. return nil
  63. }
  64. func HandleOnlineDipLog(msg *apis.OdsMessage) error {
  65. return nil
  66. }