// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package handle import ( "adm-ods/apis" "adm-ods/common.in/clinit" "adm-ods/consts" "adm-ods/model" "encoding/json" "fmt" "time" "go.uber.org/zap" ) // 处理ods数据任务 func Run(data []byte) (err error) { odsMessage := &apis.OdsMessage{} err = json.Unmarshal(data, odsMessage) if err != nil { l.Error("func", zap.String("call", "Run"), zap.String("args", string(data)), zap.String("error", err.Error())) return nil } // 开始时间 startTime := uint64(time.Now().UnixNano()) // 捕获各个task中的异常并返回给调用者 defer func() { status := "SUCCESS" if r := recover(); r != nil { err = fmt.Errorf("%+v", r) l.Error("err", zap.String("run_task", err.Error()), zap.Stack("stacktrace")) } if err != nil { //if err != nil && err != errors.UpdateTaskError && err != errors.SenMqError { consumeFail := &model.ConsumeFail{} consumeFail.Content = string(data) err = consumeFail.Insert(clinit.DB()) if err != nil { l.Error("mysql", zap.String("sql", "insert into t_adm_ods_consume_fail"), zap.String("fields", string(data)), zap.String("error", err.Error())) } status = "FAIL" } printAccessLog(odsMessage.MsgType, startTime, status) }() if odsMessage.MsgType == consts.ODSPROVIDERLOG { // 处理三方日志 err = HandleOnlineProviderLog(odsMessage) } else if odsMessage.MsgType == consts.ODSGDLOG { // 处理gd调用日志 err = HandleOnlineGdLog(odsMessage) } else if odsMessage.MsgType == consts.ODSOFFLINEIMPORT { // 处理离线任务 go HandleOfflineData(odsMessage) } else if odsMessage.MsgType == consts.ODSMANUALAMENDMENT { err = HandleManualAmendmentData(odsMessage) } return err }