123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package handle
- import (
- "adm-dws/apis"
- "adm-dws/common.in/clinit"
- "adm-dws/common.in/utils"
- "adm-dws/consts"
- "adm-dws/impl/task"
- "adm-dws/model"
- "adm-dws/pb"
- "adm-dws/pb/v1"
- dutils "adm-dws/utils"
- "context"
- "encoding/json"
- "fmt"
- "github.com/tidwall/gjson"
- "go.uber.org/zap"
- "gorm.io/gorm"
- "time"
- )
- func doTask(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (adsMsgList []*apis.AdsMessage, dwsMsgList []*apis.DwsMessage, err error) {
- lockKey, _ := dutils.GetPrimaryKey(dwsMessage.Content, task.OutputSourceCode, task.PrimaryKeys)
- if lockKey != "" {
- lock := utils.Lock{Key: lockKey, Ttl: 3}
- err := lock.TryRedisLock()
- if err != nil {
- return nil, nil, err
- }
- defer lock.RedisUnlock()
- }
- // 查询redis中是否存在主键,有主键表示手动修改过,并且在复用时间内
- /*if taskKey != ""{
- _,err := cache.Redis.Get(taskKey)
- if err == nil {
- return nil,nil ,nil
- }
- // 手动修改标记数据为手动修改,设置redis 过期时间
- if dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{
- cache.Redis.SetEx(taskKey,1*24*60*60,"")
- }
- }*/
- adsMsgList, dwsMsgList, err = task.Function(db, dwsMessage, task.OutputSourceCode)
- if err != nil {
- l.Error("func",
- zap.String("call", "task.Function"),
- zap.String("args", dwsMessage.SourceCode),
- zap.String("error", err.Error()))
- return nil, nil, err
- }
- return adsMsgList, dwsMsgList, err
- }
- func execTaskImpl(db *gorm.DB, task DwsTask, dwsMessage *apis.DwsMessage) (err error) {
- defer func() {
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- }
- }()
- adsMsgs, dwsMsgs, err := doTask(db, task, dwsMessage)
- if err != nil {
- return err
- }
- for _, adsMsg := range adsMsgs {
- err := dutils.SendAdsMsg(adsMsg)
- if err != nil {
- return err
- }
- }
- // 发送消息到dws层
- for _, dwsMsg := range dwsMsgs {
- err := dutils.SendDwsMsg(dwsMsg)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func ExecTaskImpl(taskIdList []int32, dwsMessage *apis.DwsMessage, contentLen int) (err error) {
- dwsMessage.MsgLen = contentLen
- for _, v := range taskIdList {
- if v == 10009 {
- if contentLen > 1 {
- continue
- }
- }
- /*if v == 10017 {
- if contentLen == 1 {
- continue
- }
- }*/
- fmt.Println("exec task:", v)
- if task, ok := DwsTaskMap[v]; ok {
- db := clinit.DB().Begin()
- err = execTaskImpl(db, task, dwsMessage)
- if err != nil {
- db.Rollback()
- return err
- } else {
- db.Commit()
- }
- }
- }
- return nil
- }
- // 执行任务
- func ExecTask(taskIdList []int32, dwsMessage *apis.DwsMessage) error {
- //var adsMsgList []*apis.AdsMessage
- //var dwsMsgList []*apis.DwsMessage
- //fmt.Println("dws content:",dwsMessage.Content)
- contentList := gjson.Parse(dwsMessage.Content).Array()
- //fmt.Println("dws contentlist :",contentList)
- contentLen := len(contentList)
- for _, v := range contentList {
- dwsMessage.Content = v.String()
- err := ExecTaskImpl(taskIdList, dwsMessage, contentLen)
- if err != nil {
- return err
- }
- /*if len(adsMsgs) > 0 {
- adsMsgList = append(adsMsgList,adsMsgs...)
- }
- if len(dwsMsgs) > 0 {
- dwsMsgList = append(dwsMsgList,dwsMsgs...)
- }*/
- }
- // 发送消息到ads层
- /*if len(adsMsgList) > 0 {
- err := dutils.SendAdsMsg(adsMsgList)
- if err != nil{
- return err
- }
- }*/
- /*for _,adsMsg := range adsMsgList{
- err := dutils.SendAdsMsg(adsMsg)
- if err != nil{
- return err
- }
- }
- // 发送消息到dws层
- for _,dwsMsg := range dwsMsgList {
- err := dutils.SendDwsMsg(dwsMsg)
- if err != nil{
- return err
- }
- }*/
- return nil
- }
- //(通过接口获取)通过来源获取任务列表
- func GetTaskList(sourceCode string) ([]int32, error) {
- req := &v1.GetTaskBySourceCodeRequest{SourceCode: sourceCode}
- reply, err := pb.AdmTask.GetTaskBySourceCode(context.Background(), req)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "pb.AdmTask.GetTaskBySourceCode"),
- zap.String("args", sourceCode),
- zap.String("error", err.Error()))
- return nil, err
- }
- return reply.TaskList, nil
- }
- // 运行任务
- func RunTask(data []byte) (err error) {
- dwsMessage := &apis.DwsMessage{}
- err = json.Unmarshal(data, dwsMessage)
- if err != nil {
- l.Error("func",
- zap.String("call", "RunTask"),
- zap.String("args", string(data)),
- zap.String("error", err.Error()))
- return nil
- }
- if dwsMessage.SourceCode == "" && len(dwsMessage.TaskList) == 0 {
- l.Error("func",
- zap.String("call", "RunTask"),
- zap.String("args", string(data)),
- zap.String("error", "参数错误,soruce code和task list都为空"))
- return nil
- }
- // 开始时间
- startTime := uint64(time.Now().UnixNano())
- // 捕获各个task中的异常并返回给调用者
- defer func() {
- status := "SUCCESS"
- if r := recover(); r != nil {
- err = fmt.Errorf("%+v", r)
- l.Error("err",
- zap.String("run_task", err.Error()),
- zap.Stack("stacktrace"))
- }
- if err != nil {
- status = "FAIL"
- consumeFail := &model.ConsumeFail{}
- consumeFail.Content = string(data)
- err = consumeFail.Insert(clinit.DB())
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert into t_adm_dws_consume_fail"),
- zap.String("fields", string(data)),
- zap.String("error", err.Error()))
- }
- }
- printAccessLog(dwsMessage.OdsMsgType, dwsMessage.SourceCode, startTime, status)
- }()
- // 离线导入先采用直选任务
- var taskList []int32
- if dwsMessage.OdsMsgType == consts.ODSOFFLINEIMPORT || dwsMessage.OdsMsgType == consts.ODSMANUALAMENDMENT{
- taskList = dwsMessage.TaskList
- }
- // 如果直选任务为空,采用默认任务
- if len(taskList) == 0 {
- taskList, err = GetTaskList(dwsMessage.SourceCode)
- if err != nil {
- l.Error("func",
- zap.String("call", "GetTaskList"),
- zap.String("args", dwsMessage.SourceCode),
- zap.String("error", err.Error()))
- return err
- }
- }
- // 任务拆分
- var adsTaskList []int32
- var dwsTaskList []int32
- for _, v := range taskList {
- if v >= 20000 { // 大于2万表示是ads层任务
- adsTaskList = append(adsTaskList, v)
- } else { // 小于表示为dws层任务
- dwsTaskList = append(dwsTaskList, v)
- }
- }
- // 发送消息到ads层
- if len(adsTaskList) > 0 {
- //var adsMsgList []*apis.AdsMessage
- adsMsg := task.NewAdsMessage(dwsMessage, "", "")
- adsMsg.TaskList = adsTaskList
- contentList := gjson.Parse(dwsMessage.Content).Array()
- for _, v := range contentList {
- adsMsg.Content = v.String()
- err = dutils.SendAdsMsg(adsMsg)
- if err != nil {
- return err
- }
- //adsMsgList = append(adsMsgList,adsMsg)
- }
- /*if len(adsMsgList) > 0 {
- err = dutils.SendAdsMsg(adsMsgList)
- if err != nil{
- return err
- }
- }*/
- }
- // 处理dws层任务
- if len(dwsTaskList) > 0 {
- err = ExecTask(dwsTaskList, dwsMessage)
- if err != nil {
- l.Error("func",
- zap.String("call", "ExecTask"),
- zap.String("args", dwsMessage.SourceCode),
- zap.String("error", err.Error()))
- }
- return err
- }
- return nil
- }
|