123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package handle
- import (
- "adm-ads/apis"
- "adm-ads/common.in/clinit"
- "adm-ads/common.in/utils"
- "adm-ads/consts"
- "adm-ads/model"
- "adm-ads/pb"
- "adm-ads/pb/v1"
- dutils "adm-ads/utils"
- "context"
- "encoding/json"
- "fmt"
- "go.uber.org/zap"
- "time"
- )
- func doTask(task DwsTask,adsMessage *apis.AdsMessage)error{
- lockKey,_ := dutils.GetPrimaryKey(adsMessage.Content,task.OutputSourceCode,task.PrimaryKeys)
- // 分布式锁
- if lockKey != ""{
- lock := utils.Lock{Key:lockKey,Ttl:3}
- err := lock.TryRedisLock()
- if err != nil{
- return err
- }
- defer lock.RedisUnlock()
- }
- // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内
- /*if taskKey != ""{
- _,err := cache.Redis.Get(taskKey)
- if err == nil {
- return nil
- }
- }*/
- err := task.Function(adsMessage)
- if err != nil{
- l.Error("func",
- zap.String("call", "task"),
- zap.String("args", adsMessage.SourceCode),
- zap.String("error", err.Error()))
- return err
- }
- return nil
- }
- // 执行任务
- func ExecTask(taskId []int32,adsMessage *apis.AdsMessage) error {
- for _,v := range taskId{
- fmt.Println("exec task:",v)
- if task,ok:= taskMap[v];ok{
- err := doTask(task,adsMessage)
- if err != nil{
- return err
- }
- }
- }
- return nil
- }
- // (通过接口获取)通过来源获取任务列表
- func GetTaskList(sourceCode string) ([]int32,error) {
- req := &v1.GetTaskBySourceCodeRequest{SourceCode:sourceCode}
- reply,err := pb.AdmTask.GetTaskBySourceCode(context.Background(),req)
- if err != nil{
- l.Error("rpc",
- zap.String("call", "pb.AdmTask.GetTaskBySourceCode"),
- zap.String("args", sourceCode),
- zap.String("error", err.Error()))
- return nil,err
- }
- return reply.TaskList,nil
- }
- func RunTaskBak(data []byte)(err error) {
- fmt.Println("msg:",string(data))
- var adsMsgList []apis.AdsMessage
- err = json.Unmarshal(data,&adsMsgList)
- if err != nil{
- l.Error("func",
- zap.String("call", "RunTask"),
- zap.String("args", string(data)),
- zap.String("error", err.Error()))
- return nil
- }
- /*for _,v := range adsMsgList{
- err = RunTaskImpl(&v)
- if err != nil{
- return err
- }
- }*/
- return nil
- }
- // 运行任务
- func RunTask(data []byte)(err error) {
- adsMessage := &apis.AdsMessage{}
- err = json.Unmarshal(data,adsMessage)
- if err != nil{
- l.Error("func",
- zap.String("call", "RunTask"),
- zap.String("args", string(data)),
- zap.String("error", err.Error()))
- return nil
- }
- if adsMessage.SourceCode == "" && len(adsMessage.TaskList) == 0 {
- l.Error("func",
- zap.String("call", "RunTask"),
- zap.String("args",adsMessage.Content),
- zap.String("error", "参数错误,soruce code和task list都为空"))
- return nil
- }
- // 开始时间
- startTime := uint64(time.Now().UnixNano())
- // 捕获各个task中的异常并返回给调用者
- defer func() {
- status := "SUCCESS"
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- l.Error("err",
- zap.String("run_task", err.Error()),
- zap.Stack("stacktrace"))
- }
- if err != nil{
- status = "FAIL"
- consumeFail := &model.ConsumeFail{}
- consumeFail.Content = utils.MarshalJsonString(adsMessage)
- err = consumeFail.Insert(clinit.DB())
- if err != nil{
- l.Error("mysql",
- zap.String("sql", "insert into t_adm_ads_consume_fail"),
- zap.String("fields", consumeFail.Content),
- zap.String("error", err.Error()))
- }
- }
- printAccessLog(adsMessage.SourceCode,adsMessage.Action,startTime, status)
- }()
- var taskList []int32
- if adsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT{
- taskList = adsMessage.TaskList
- }
- // 任务不为空,只执行消息中的任务,为空采用默认任务
- if len(taskList) == 0{
- taskList,err = GetTaskList(adsMessage.SourceCode)
- if err != nil{
- l.Error("func",
- zap.String("call", "GetTaskList"),
- zap.String("args", adsMessage.SourceCode),
- zap.String("error", err.Error()))
- return err
- }
- }
- err = ExecTask(taskList,adsMessage)
- if err != nil{
- l.Error("func",
- zap.String("call", "ExecTask"),
- zap.String("args", adsMessage.SourceCode),
- zap.String("error", err.Error()))
- }
- return err
- }
|