// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package handle import ( "adm-ads/apis" "adm-ads/common.in/clinit" "adm-ads/common.in/utils" "adm-ads/consts" "adm-ads/model" "adm-ads/pb" "adm-ads/pb/v1" dutils "adm-ads/utils" "context" "encoding/json" "fmt" "go.uber.org/zap" "time" ) func doTask(task DwsTask,adsMessage *apis.AdsMessage)error{ lockKey,_ := dutils.GetPrimaryKey(adsMessage.Content,task.OutputSourceCode,task.PrimaryKeys) // 分布式锁 if lockKey != ""{ lock := utils.Lock{Key:lockKey,Ttl:3} err := lock.TryRedisLock() if err != nil{ return err } defer lock.RedisUnlock() } // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内 /*if taskKey != ""{ _,err := cache.Redis.Get(taskKey) if err == nil { return nil } }*/ err := task.Function(adsMessage) if err != nil{ l.Error("func", zap.String("call", "task"), zap.String("args", adsMessage.SourceCode), zap.String("error", err.Error())) return err } return nil } // 执行任务 func ExecTask(taskId []int32,adsMessage *apis.AdsMessage) error { for _,v := range taskId{ fmt.Println("exec task:",v) if task,ok:= taskMap[v];ok{ err := doTask(task,adsMessage) if err != nil{ return err } } } return nil } // (通过接口获取)通过来源获取任务列表 func GetTaskList(sourceCode string) ([]int32,error) { req := &v1.GetTaskBySourceCodeRequest{SourceCode:sourceCode} reply,err := pb.AdmTask.GetTaskBySourceCode(context.Background(),req) if err != nil{ l.Error("rpc", zap.String("call", "pb.AdmTask.GetTaskBySourceCode"), zap.String("args", sourceCode), zap.String("error", err.Error())) return nil,err } return reply.TaskList,nil } func RunTaskBak(data []byte)(err error) { fmt.Println("msg:",string(data)) var adsMsgList []apis.AdsMessage err = json.Unmarshal(data,&adsMsgList) if err != nil{ l.Error("func", zap.String("call", "RunTask"), zap.String("args", string(data)), zap.String("error", err.Error())) return nil } /*for _,v := range adsMsgList{ err = RunTaskImpl(&v) if err != nil{ return err } }*/ return nil } // 运行任务 func RunTask(data []byte)(err error) { adsMessage := &apis.AdsMessage{} err = json.Unmarshal(data,adsMessage) if err != nil{ l.Error("func", zap.String("call", "RunTask"), zap.String("args", string(data)), zap.String("error", err.Error())) return nil } if adsMessage.SourceCode == "" && len(adsMessage.TaskList) == 0 { l.Error("func", zap.String("call", "RunTask"), zap.String("args",adsMessage.Content), zap.String("error", "参数错误,soruce code和task list都为空")) return nil } // 开始时间 startTime := uint64(time.Now().UnixNano()) // 捕获各个task中的异常并返回给调用者 defer func() { status := "SUCCESS" if r := recover(); r != nil { err = fmt.Errorf("%+v", r) l.Error("err", zap.String("run_task", err.Error()), zap.Stack("stacktrace")) } if err != nil{ status = "FAIL" consumeFail := &model.ConsumeFail{} consumeFail.Content = utils.MarshalJsonString(adsMessage) err = consumeFail.Insert(clinit.DB()) if err != nil{ l.Error("mysql", zap.String("sql", "insert into t_adm_ads_consume_fail"), zap.String("fields", consumeFail.Content), zap.String("error", err.Error())) } } printAccessLog(adsMessage.SourceCode,adsMessage.Action,startTime, status) }() var taskList []int32 if adsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT{ taskList = adsMessage.TaskList } // 任务不为空,只执行消息中的任务,为空采用默认任务 if len(taskList) == 0{ taskList,err = GetTaskList(adsMessage.SourceCode) if err != nil{ l.Error("func", zap.String("call", "GetTaskList"), zap.String("args", adsMessage.SourceCode), zap.String("error", err.Error())) return err } } err = ExecTask(taskList,adsMessage) if err != nil{ l.Error("func", zap.String("call", "ExecTask"), zap.String("args", adsMessage.SourceCode), zap.String("error", err.Error())) } return err }