// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package utils import ( "adm-ods/apis" "adm-ods/common.in/cache" "adm-ods/common.in/config" "adm-ods/common.in/utils" "adm-ods/model" "adm-ods/pb" "adm-ods/pb/v1" "context" "encoding/json" "fmt" "google.golang.org/grpc/status" "strconv" "strings" "time" "unicode" "adm-ods/common.in/clinit" "adm-ods/common.in/mq" "go.uber.org/zap" ) func getKey(taskId int64) string { return fmt.Sprintf("offline-task-%d", taskId) } func getStopKey(taskId int64) string { return fmt.Sprintf("offline-task-stop-%d", taskId) } func DeleteLastId(taskId int64) { key := getKey(taskId) cache.Redis.Del(key) } func GetLastId(taskId int64) int64 { key := getKey(taskId) idstr, err := cache.Redis.Get(key) if err != nil { return 0 } id, _ := strconv.Atoi(idstr) return int64(id) //return 9 } func IsStop(taskId int64) bool { key := getStopKey(taskId) _, err := cache.Redis.Get(key) if err != nil { return false } DeleteLastId(taskId) return true } func SetLastId(taskId int64, lastId int64) error { key := getKey(taskId) //_, err := cache.Redis.Set(key, fmt.Sprintf("%d", lastId)) _, err := cache.Redis.IncrBy(key, 1) return err } func BuildSql(msg *apis.OdsMessage, pageSize, pageNum int) string { //content := gjson.Parse(msg.Content) // oss excel 地址 lastId := GetLastId(msg.OfflineTaskId) //sql := content.Get("sql").String() sql := msg.Content if lastId != 0 { sqlList := strings.Split(sql, " ") existWhere := false for _, v := range sqlList { if strings.TrimSpace(v) == "where" { // 存在条件 existWhere = true } } if existWhere { sql = fmt.Sprintf("%s and id>%d ", sql, lastId) } else { sql = fmt.Sprintf("%s where id>%d ", sql, lastId) } } sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, pageNum*pageSize) return sql } func checkField(checkMap map[string]string) error { content := utils.MarshalJsonString(checkMap) req := &v1.FieldCheckRequest{Content: content} _, err := pb.AdmTask.FieldCheck(context.Background(), req) if err != nil { l.Error("rpc", zap.String("call", "pb.AdmTask.FieldCheck"), zap.String("args", content), zap.String("error", err.Error())) return err } return nil } func checkDwsMsgParams(checkMap map[string]string) error { for key, value := range checkMap { if value == "0" { checkMap[key] = "" } else { checkMap[key] = strings.TrimSpace(value) } } var plateNo, plateType, vin, idcert string if v, ok := checkMap["plate_no"]; ok { plateNo = v } if v, ok := checkMap["plate_type"]; ok { plateType = v } if v, ok := checkMap["vin"]; ok { vin = v } if v, ok := checkMap["idcert"]; ok { idcert = v } if plateNo != "" || vin != "" || idcert != "" { req := &v1.CheckFormatRequest{PlateNo: plateNo, PlateType: plateType, Vin: vin, Idcert: idcert} reply, err := pb.AdmTask.CheckFormat(context.Background(), req) fmt.Println("check format :", reply, err) if err != nil { l.Error("rpc", zap.String("call", "pb.AdmTask.CheckFormat"), zap.String("args", utils.MarshalJsonString(*req)), zap.String("error", err.Error())) return err } if plateType == "" && reply.PlateType != "" { checkMap["plate_type"] = reply.PlateType } checkMap["vin"] = reply.Vin checkMap["idcert"] = reply.Idcert checkMap["plate_no"] = reply.PlateNo } return nil } // 发送消息到dws func SendDwsMsg(dwsMsg *apis.DwsMessage) error { var checkMapList []map[string]string json.Unmarshal([]byte(dwsMsg.Content), &checkMapList) if len(checkMapList) <= 0 { return nil } if config.Conf.RunMode != "prod" { fmt.Println("Check Start:", checkMapList) } // 标准字段检查 err := checkField(checkMapList[0]) if err != nil { status := status.Convert(err) if status.Code() == 20003 { // 参数错误 fmt.Println("字段检查失败:", status.Message()) return nil } else { // 服务错误 fmt.Println("参数检查调用失败:", err) return err } } // 参数解析不通过不发送消息 for index, _ := range checkMapList { err = checkDwsMsgParams(checkMapList[index]) if err != nil { status := status.Convert(err) if status.Code() == 20003 { // 参数错误 return nil } else { // 服务错误 return err } } } if config.Conf.RunMode != "prod" { fmt.Println("Check End:", checkMapList) } dwsMsg.Content = utils.MarshalJsonString(checkMapList) mqMsg, _ := json.Marshal(*dwsMsg) err = mq.DwsMq.PublishMsg(mqMsg) if err != nil { failMsg := &model.FailMsg{} failMsg.Msg = string(mqMsg) err = failMsg.Insert(clinit.DB()) if err != nil { l.Error("mysql", zap.String("sql", "insert into t_adm-ods_fail_msg"), zap.String("fields", failMsg.Msg), zap.String("error", err.Error())) } return err } return nil } func NewDwsMessage(msg *apis.OdsMessage) *apis.DwsMessage { dwsMsg := &apis.DwsMessage{} dwsMsg.OdsMsgType = msg.MsgType dwsMsg.SourceCode = msg.SourceCode dwsMsg.TaskList = msg.TaskList dwsMsg.OfflineTaskId = msg.OfflineTaskId //dwsMsg.SourceLevel = msg.SourceLevel if msg.Timestamp != 0 { dwsMsg.Timestamp = msg.Timestamp } else { dwsMsg.Timestamp = time.Now().Unix() } /* if msg.SourceCode == consts.SOURCEODS2 || msg.SourceCode == consts.SOURCEODS4 { dwsMsg.NeedSleep = true } */ return dwsMsg } func IsPlate(str string) bool { for _, r := range str { if unicode.Is(unicode.Han, r) { return true } } return false }