|
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package utils
- import (
- "adm-ods/apis"
- "adm-ods/common.in/cache"
- "adm-ods/common.in/config"
- "adm-ods/common.in/utils"
- "adm-ods/model"
- "adm-ods/pb"
- "adm-ods/pb/v1"
- "context"
- "encoding/json"
- "fmt"
- "google.golang.org/grpc/status"
- "strconv"
- "strings"
- "time"
- "unicode"
- "adm-ods/common.in/clinit"
- "adm-ods/common.in/mq"
- "go.uber.org/zap"
- )
- func getKey(taskId int64) string {
- return fmt.Sprintf("offline-task-%d", taskId)
- }
- func getStopKey(taskId int64) string {
- return fmt.Sprintf("offline-task-stop-%d", taskId)
- }
- func DeleteLastId(taskId int64) {
- key := getKey(taskId)
- cache.Redis.Del(key)
- }
- func GetLastId(taskId int64) int64 {
- key := getKey(taskId)
- idstr, err := cache.Redis.Get(key)
- if err != nil {
- return 0
- }
- id, _ := strconv.Atoi(idstr)
- return int64(id)
- //return 9
- }
- func IsStop(taskId int64) bool {
- key := getStopKey(taskId)
- _, err := cache.Redis.Get(key)
- if err != nil {
- return false
- }
- DeleteLastId(taskId)
- return true
- }
- func SetLastId(taskId int64, lastId int64) error {
- key := getKey(taskId)
- //_, err := cache.Redis.Set(key, fmt.Sprintf("%d", lastId))
- _, err := cache.Redis.IncrBy(key, 1)
- return err
- }
- func BuildSql(msg *apis.OdsMessage, pageSize, pageNum int) string {
- //content := gjson.Parse(msg.Content)
- // oss excel 地址
- lastId := GetLastId(msg.OfflineTaskId)
- //sql := content.Get("sql").String()
- sql := msg.Content
- if lastId != 0 {
- sqlList := strings.Split(sql, " ")
- existWhere := false
- for _, v := range sqlList {
- if strings.TrimSpace(v) == "where" {
- // 存在条件
- existWhere = true
- }
- }
- if existWhere {
- sql = fmt.Sprintf("%s and id>%d ", sql, lastId)
- } else {
- sql = fmt.Sprintf("%s where id>%d ", sql, lastId)
- }
- }
- sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, pageNum*pageSize)
- return sql
- }
- func checkField(checkMap map[string]string) error {
- content := utils.MarshalJsonString(checkMap)
- req := &v1.FieldCheckRequest{Content: content}
- _, err := pb.AdmTask.FieldCheck(context.Background(), req)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "pb.AdmTask.FieldCheck"),
- zap.String("args", content),
- zap.String("error", err.Error()))
- return err
- }
- return nil
- }
- func checkDwsMsgParams(checkMap map[string]string) error {
- for key, value := range checkMap {
- if value == "0" {
- checkMap[key] = ""
- } else {
- checkMap[key] = strings.TrimSpace(value)
- }
- }
- var plateNo, plateType, vin, idcert string
- if v, ok := checkMap["plate_no"]; ok {
- plateNo = v
- }
- if v, ok := checkMap["plate_type"]; ok {
- plateType = v
- }
- if v, ok := checkMap["vin"]; ok {
- vin = v
- }
- if v, ok := checkMap["idcert"]; ok {
- idcert = v
- }
- if plateNo != "" || vin != "" || idcert != "" {
- req := &v1.CheckFormatRequest{PlateNo: plateNo, PlateType: plateType, Vin: vin, Idcert: idcert}
- reply, err := pb.AdmTask.CheckFormat(context.Background(), req)
- fmt.Println("check format :", reply, err)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "pb.AdmTask.CheckFormat"),
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("error", err.Error()))
- return err
- }
- if plateType == "" && reply.PlateType != "" {
- checkMap["plate_type"] = reply.PlateType
- }
- checkMap["vin"] = reply.Vin
- checkMap["idcert"] = reply.Idcert
- checkMap["plate_no"] = reply.PlateNo
- }
- return nil
- }
- // 发送消息到dws
- func SendDwsMsg(dwsMsg *apis.DwsMessage) error {
- var checkMapList []map[string]string
- json.Unmarshal([]byte(dwsMsg.Content), &checkMapList)
- if len(checkMapList) <= 0 {
- return nil
- }
- if config.Conf.RunMode != "prod" {
- fmt.Println("Check Start:", checkMapList)
- }
- // 标准字段检查
- err := checkField(checkMapList[0])
- if err != nil {
- status := status.Convert(err)
- if status.Code() == 20003 {
- // 参数错误
- fmt.Println("字段检查失败:", status.Message())
- return nil
- } else {
- // 服务错误
- fmt.Println("参数检查调用失败:", err)
- return err
- }
- }
- // 参数解析不通过不发送消息
- for index, _ := range checkMapList {
- err = checkDwsMsgParams(checkMapList[index])
- if err != nil {
- status := status.Convert(err)
- if status.Code() == 20003 {
- // 参数错误
- return nil
- } else {
- // 服务错误
- return err
- }
- }
- }
- if config.Conf.RunMode != "prod" {
- fmt.Println("Check End:", checkMapList)
- }
- dwsMsg.Content = utils.MarshalJsonString(checkMapList)
- mqMsg, _ := json.Marshal(*dwsMsg)
- err = mq.DwsMq.PublishMsg(mqMsg)
- if err != nil {
- failMsg := &model.FailMsg{}
- failMsg.Msg = string(mqMsg)
- err = failMsg.Insert(clinit.DB())
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert into t_adm-ods_fail_msg"),
- zap.String("fields", failMsg.Msg),
- zap.String("error", err.Error()))
- }
- return err
- }
- return nil
- }
- func NewDwsMessage(msg *apis.OdsMessage) *apis.DwsMessage {
- dwsMsg := &apis.DwsMessage{}
- dwsMsg.OdsMsgType = msg.MsgType
- dwsMsg.SourceCode = msg.SourceCode
- dwsMsg.TaskList = msg.TaskList
- dwsMsg.OfflineTaskId = msg.OfflineTaskId
- //dwsMsg.SourceLevel = msg.SourceLevel
- if msg.Timestamp != 0 {
- dwsMsg.Timestamp = msg.Timestamp
- } else {
- dwsMsg.Timestamp = time.Now().Unix()
- }
- /* if msg.SourceCode == consts.SOURCEODS2 || msg.SourceCode == consts.SOURCEODS4 {
- dwsMsg.NeedSleep = true
- }
- */
- return dwsMsg
- }
- func IsPlate(str string) bool {
- for _, r := range str {
- if unicode.Is(unicode.Han, r) {
- return true
- }
- }
- return false
- }
|