// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package handle import ( "adm-dws/apis" "adm-dws/common.in/clinit" "adm-dws/common.in/utils" "adm-dws/consts" "adm-dws/impl/task" "adm-dws/model" "adm-dws/pb" "adm-dws/pb/v1" dutils "adm-dws/utils" "context" "encoding/json" "fmt" "github.com/tidwall/gjson" "go.uber.org/zap" "gorm.io/gorm" "time" ) func doTask(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (adsMsgList []*apis.AdsMessage, dwsMsgList []*apis.DwsMessage, err error) { lockKey, _ := dutils.GetPrimaryKey(dwsMessage.Content, task.OutputSourceCode, task.PrimaryKeys) if lockKey != "" { lock := utils.Lock{Key: lockKey, Ttl: 3} err := lock.TryRedisLock() if err != nil { return nil, nil, err } defer lock.RedisUnlock() } // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内 /*if taskKey != ""{ _,err := cache.Redis.Get(taskKey) if err == nil { return nil,nil ,nil } // 手动修改标记数据为手动修改,设置redis 过期时间 if dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{ cache.Redis.SetEx(taskKey,1*24*60*60,"") } }*/ adsMsgList, dwsMsgList, err = task.Function(db, dwsMessage, task.OutputSourceCode) if err != nil { l.Error("func", zap.String("call", "task.Function"), zap.String("args", dwsMessage.SourceCode), zap.String("error", err.Error())) return nil, nil, err } return adsMsgList, dwsMsgList, err } func execTaskImpl(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) } }() adsMsgs, dwsMsgs, err := doTask(db, task, dwsMessage) if err != nil { return err } for _, adsMsg := range adsMsgs { err := dutils.SendAdsMsg(adsMsg) if err != nil { return err } } // 发送消息到dws层 for _, dwsMsg := range dwsMsgs { err := dutils.SendDwsMsg(dwsMsg) if err != nil { return err } } return nil } func ExecTaskImpl(taskIdList []int32, dwsMessage *apis.DwsMessage, contentLen int) (err error) { dwsMessage.MsgLen = contentLen for _, v := range taskIdList { if v == 10009 { if contentLen > 1 { continue } } /*if v == 10017 { if contentLen == 1 { continue } }*/ fmt.Println("exec task:", v) if task, ok := DwsTaskMap[v]; ok { db := clinit.DB().Begin() err = execTaskImpl(db, task, dwsMessage) if err != nil { db.Rollback() return err } else { db.Commit() } } } return nil } // 执行任务 func ExecTask(taskIdList []int32, dwsMessage *apis.DwsMessage) error { //var adsMsgList []*apis.AdsMessage //var dwsMsgList []*apis.DwsMessage //fmt.Println("dws content:",dwsMessage.Content) contentList := gjson.Parse(dwsMessage.Content).Array() //fmt.Println("dws contentlist :",contentList) contentLen := len(contentList) for _, v := range contentList { dwsMessage.Content = v.String() err := ExecTaskImpl(taskIdList, dwsMessage, contentLen) if err != nil { return err } /*if len(adsMsgs) > 0 { adsMsgList = append(adsMsgList,adsMsgs...) } if len(dwsMsgs) > 0 { dwsMsgList = append(dwsMsgList,dwsMsgs...) }*/ } // 发送消息到ads层 /*if len(adsMsgList) > 0 { err := dutils.SendAdsMsg(adsMsgList) if err != nil{ return err } }*/ /*for _,adsMsg := range adsMsgList{ err := dutils.SendAdsMsg(adsMsg) if err != nil{ return err } } // 发送消息到dws层 for _,dwsMsg := range dwsMsgList { err := dutils.SendDwsMsg(dwsMsg) if err != nil{ return err } }*/ return nil } //(通过接口获取)通过来源获取任务列表 func GetTaskList(sourceCode string) ([]int32, error) { req := &v1.GetTaskBySourceCodeRequest{SourceCode: sourceCode} reply, err := pb.AdmTask.GetTaskBySourceCode(context.Background(), req) if err != nil { l.Error("rpc", zap.String("call", "pb.AdmTask.GetTaskBySourceCode"), zap.String("args", sourceCode), zap.String("error", err.Error())) return nil, err } return reply.TaskList, nil } // 运行任务 func RunTask(data []byte) (err error) { dwsMessage := &apis.DwsMessage{} err = json.Unmarshal(data, dwsMessage) if err != nil { l.Error("func", zap.String("call", "RunTask"), zap.String("args", string(data)), zap.String("error", err.Error())) return nil } if dwsMessage.SourceCode == "" && len(dwsMessage.TaskList) == 0 { l.Error("func", zap.String("call", "RunTask"), zap.String("args", string(data)), zap.String("error", "参数错误,soruce code和task list都为空")) return nil } // 开始时间 startTime := uint64(time.Now().UnixNano()) // 捕获各个task中的异常并返回给调用者 defer func() { status := "SUCCESS" if r := recover(); r != nil { err = fmt.Errorf("%+v", r) l.Error("err", zap.String("run_task", err.Error()), zap.Stack("stacktrace")) } if err != nil { status = "FAIL" consumeFail := &model.ConsumeFail{} consumeFail.Content = string(data) err = consumeFail.Insert(clinit.DB()) if err != nil { l.Error("mysql", zap.String("sql", "insert into t_adm_dws_consume_fail"), zap.String("fields", string(data)), zap.String("error", err.Error())) } } printAccessLog(dwsMessage.OdsMsgType, dwsMessage.SourceCode, startTime, status) }() // 离线导入先采用直选任务 var taskList []int32 if dwsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT || dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{ taskList = dwsMessage.TaskList } // 如果直选任务为空,采用默认任务 if len(taskList) == 0 { taskList, err = GetTaskList(dwsMessage.SourceCode) if err != nil { l.Error("func", zap.String("call", "GetTaskList"), zap.String("args", dwsMessage.SourceCode), zap.String("error", err.Error())) return err } } // 任务拆分 var adsTaskList []int32 var dwsTaskList []int32 for _, v := range taskList { if v >= 20000 { // 大于2万表示是ads层任务 adsTaskList = append(adsTaskList, v) } else { // 小于表示为dws层任务 dwsTaskList = append(dwsTaskList, v) } } // 发送消息到ads层 if len(adsTaskList) > 0 { //var adsMsgList []*apis.AdsMessage adsMsg := task.NewAdsMessage(dwsMessage, "", "") adsMsg.TaskList = adsTaskList contentList := gjson.Parse(dwsMessage.Content).Array() for _, v := range contentList { adsMsg.Content = v.String() err = dutils.SendAdsMsg(adsMsg) if err != nil { return err } //adsMsgList = append(adsMsgList,adsMsg) } /*if len(adsMsgList) > 0 { err = dutils.SendAdsMsg(adsMsgList) if err != nil{ return err } }*/ } // 处理dws层任务 if len(dwsTaskList) > 0 { err = ExecTask(dwsTaskList, dwsMessage) if err != nil { l.Error("func", zap.String("call", "ExecTask"), zap.String("args", dwsMessage.SourceCode), zap.String("error", err.Error())) } return err } return nil }