12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package handle
- import (
- "adm-ods/apis"
- "adm-ods/common.in/clinit"
- "adm-ods/consts"
- "adm-ods/model"
- "encoding/json"
- "fmt"
- "time"
- "go.uber.org/zap"
- )
- // 处理ods数据任务
- func Run(data []byte) (err error) {
- odsMessage := &apis.OdsMessage{}
- err = json.Unmarshal(data, odsMessage)
- if err != nil {
- l.Error("func",
- zap.String("call", "Run"),
- zap.String("args", string(data)),
- zap.String("error", err.Error()))
- return nil
- }
- // 开始时间
- startTime := uint64(time.Now().UnixNano())
- // 捕获各个task中的异常并返回给调用者
- defer func() {
- status := "SUCCESS"
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- l.Error("err",
- zap.String("run_task", err.Error()),
- zap.Stack("stacktrace"))
- }
- if err != nil {
- //if err != nil && err != errors.UpdateTaskError && err != errors.SenMqError {
- consumeFail := &model.ConsumeFail{}
- consumeFail.Content = string(data)
- err = consumeFail.Insert(clinit.DB())
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert into t_adm_ods_consume_fail"),
- zap.String("fields", string(data)),
- zap.String("error", err.Error()))
- }
- status = "FAIL"
- }
- printAccessLog(odsMessage.MsgType, startTime, status)
- }()
- if odsMessage.MsgType == consts.ODSPROVIDERLOG {
- // 处理三方日志
- err = HandleOnlineProviderLog(odsMessage)
- } else if odsMessage.MsgType == consts.ODSGDLOG {
- // 处理gd调用日志
- err = HandleOnlineGdLog(odsMessage)
- } else if odsMessage.MsgType == consts.ODSOFFLINEIMPORT {
- // 处理离线任务
- go HandleOfflineData(odsMessage)
- } else if odsMessage.MsgType == consts.ODSMANUALAMENDMENT {
- err = HandleManualAmendmentData(odsMessage)
- }
- return err
- }
|