// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package handle import ( "adm-ods/apis" "adm-ods/common.in/utils" dutils "adm-ods/utils" "fmt" "github.com/tidwall/gjson" "go.uber.org/zap" ) // 实时处理数据源日志 func HandleOnlineProviderLog(msg *apis.OdsMessage) (err error) { fmt.Println(utils.MarshalJsonString(*msg)) //thirdpartyLog := gjson.Parse(msg.Content) providerApiCode := msg.SourceCode //providerApiCode := thirdpartyLog.Get("provider_api_code").String() if v, ok := ProviderApiCodeToSoucreCodeMap[providerApiCode]; ok { if task, ok1 := OdsOnlineTaskMap[v]; ok1 { // 赋值源 msg.SourceCode = v // 获取数据源时间 data := gjson.Parse(msg.Content) msg.Timestamp = data.Get("timestamp").Int() dataMapList, err := task.OnlineFunction(msg) if err == nil { if len(dataMapList) > 0 { dwsMsg := dutils.NewDwsMessage(msg) dwsMsg.Content = utils.MarshalJsonString(dataMapList) err = dutils.SendDwsMsg(dwsMsg) if err != nil { return err } } /* dwsMsgListLen := len(dataMapList) if dwsMsgListLen > 0 { for _, dwsMsg := range dwsMsgList { err = dutils.SendDwsMsg(dwsMsg) if err != nil { return err } } }*/ } return err } else { l.Error("func", zap.String("call", "HandleOnlineProviderLog.ProviderApiCodeToSoucreCodeMap"), zap.String("args", v), zap.String("error", "provider msg no handle")) } } else { l.Error("func", zap.String("call", "HandleOnlineProviderLog.OdsOnlineTaskMap"), zap.String("args", providerApiCode), zap.String("error", "provider msg no mapping")) } return nil } func HandleOnlineGdLog(msg *apis.OdsMessage) error { return nil }