|
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package handle
- import (
- "context"
- "fmt"
- "strings"
- "time"
- "adm-ods/apis"
- "adm-ods/common.in/clinit"
- "adm-ods/common.in/utils"
- "adm-ods/consts"
- "adm-ods/errors"
- "adm-ods/pb"
- v1 "adm-ods/pb/v1"
- dutils "adm-ods/utils"
- "github.com/tealeg/xlsx"
- "go.uber.org/zap"
- "gorm.io/gorm"
- )
- func updateTask(taskId, total, finishCount, isFinish int32, lastId int64) error {
- req := &v1.UpdateOfflineTaskRequest{Total: total, TaskId: taskId, FinishCount: finishCount, IsFinish: isFinish}
- _, err := pb.AdmTask.UpdateOfflineTask(context.Background(), req)
- if err != nil {
- l.Error("rpc",
- zap.String("call", "pb.AdmTask.UpdateOfflineTask"),
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("error", err.Error()))
- return errors.UpdateTaskError
- }
- if finishCount != 0 {
- err = dutils.SetLastId(int64(taskId), lastId)
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- if isFinish != 0 {
- dutils.DeleteLastId(int64(taskId))
- }
- return err
- }
- func handleOfflineFromExecl(msg *apis.OdsMessage) (err error) {
- if len(msg.TaskList) == 0 {
- l.Error("func",
- zap.String("call", "HandleOfflineData"),
- zap.String("args", utils.MarshalJsonString(*msg)),
- zap.String("error", "excel导入任务必须指定任务"))
- return nil
- }
- nameMap := make(map[int]string)
- downloadedFileName := msg.Content
- if true {
- err = utils.OssDownloadFile(msg.Content, downloadedFileName)
- // 下载文件失败
- if err != nil {
- l.Error("func",
- zap.String("call", "handleOfflineFromExecl.OssDownloadFile"),
- zap.String("args", msg.Content),
- zap.String("error", err.Error()))
- return err
- }
- }
- file, err := xlsx.OpenFile(downloadedFileName)
- if err != nil {
- l.Error("func",
- zap.String("call", "handleOfflineFromExecl.OpenFile"),
- zap.String("args", downloadedFileName),
- zap.String("error", err.Error()))
- return err
- }
- if len(file.Sheets) == 0 {
- l.Error("func",
- zap.String("call", "handleOfflineFromExecl.Sheets"),
- zap.String("args", downloadedFileName),
- zap.String("error", "not found sheets"))
- return nil
- }
- sheet := file.Sheets[0]
- if len(sheet.Rows) == 0 {
- l.Error("func",
- zap.String("call", "handleOfflineFromExecl.Rows"),
- zap.String("args", downloadedFileName),
- zap.String("error", "not found rows"))
- return nil
- }
- // 获取最后一次发送id
- lastId := dutils.GetLastId(msg.OfflineTaskId)
- if lastId == 0 {
- // 设置任务总数
- err = updateTask(int32(msg.OfflineTaskId), int32(len(sheet.Rows)-1), 0, 0, 0)
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- for index, row := range sheet.Rows {
- dataMap := make(map[string]interface{})
- if index == 0 {
- for cellIndex, cell := range row.Cells {
- value := strings.TrimSpace(cell.Value)
- if value != "" {
- nameMap[cellIndex] = value
- }
- }
- continue
- }
- if len(nameMap) == 0 {
- l.Error("func",
- zap.String("call", "handleOfflineFromExecl.Rows"),
- zap.String("args", downloadedFileName),
- zap.String("error", "name map len is 0"))
- return fmt.Errorf("表头错误")
- }
- if int64(index) <= lastId {
- fmt.Println("already send:", index)
- // 小于最后一次发送,表示已经发送过,继续后面的内容
- continue
- }
- for cellIndex, cell := range row.Cells {
- if v, ok := nameMap[cellIndex]; ok {
- dataMap[v] = cell.Value
- }
- }
- if len(dataMap) > 0 {
- dwsMsg := dutils.NewDwsMessage(msg)
- var dataMapList []map[string]interface{}
- dataMapList = append(dataMapList, dataMap)
- dwsMsg.Content = utils.MarshalJsonString(dataMapList)
- err = dutils.SendDwsMsg(dwsMsg)
- if err != nil {
- return errors.SenMqError
- } else {
- err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, int64(index))
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- }
- }
- /*err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0)
- if err != nil {
- return err
- }*/
- // 已处理完返回正常,结束消费
- return nil
- }
- func handleOfflineFromDb(msg *apis.OdsMessage) (err error) {
- pageSize := consts.DEFAULTPAGESIZE
- pageNum := 0
- total := int64(0)
- countSql := strings.Replace(msg.Content, "*", "count(*)", -1)
- err = clinit.DB().Raw(countSql).Count(&total).Error
- if err != nil {
- if err == gorm.ErrRecordNotFound {
- return nil
- }
- return err
- }
- if total == 0 {
- return nil
- }
- offset := dutils.GetLastId(msg.OfflineTaskId)
- if offset == 0 {
- err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 0, 0)
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- task, ok := OdsOnlineTaskMap[msg.SourceCode]
- isFirst := true
- id := int64(0)
- for {
- // 判断是否结束任务
- if dutils.IsStop(msg.OfflineTaskId) {
- err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 2, 0)
- if err != nil {
- return errors.UpdateTaskError
- }
- return nil
- }
- // 循环查询数据
- var results []map[string]interface{}
- sql := msg.Content
- if isFirst {
- // 第一次判断
- sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset)
- isFirst = false
- } else {
- if !strings.Contains(sql, "where") && id > 0 {
- sql = fmt.Sprintf("%s where id>%d limit %d", sql, id, pageSize)
- } else {
- sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset)
- }
- }
- fmt.Println("sql11111111111111111111111111111111111:", sql, offset, id)
- err = clinit.DB().Raw(sql).Find(&results).Error
- if err != nil {
- // 查无返回正常,没有数据处理
- if err == gorm.ErrRecordNotFound {
- return nil
- }
- return err
- }
- resultLen := int64(len(results))
- if resultLen == 0 {
- return nil
- }
- offset = offset + resultLen
- for _, v := range results {
- if v, ok := v["id"]; ok {
- switch v.(type) {
- case int:
- id = int64(v.(int))
- case int32:
- id = int64(v.(int32))
- case int64:
- id = v.(int64)
- }
- }
- dwsMsg := dutils.NewDwsMessage(msg)
- if ok {
- dataMapList, err := task.ParasFunction(v["content"].(string))
- if err != nil {
- // 更新完成进度数
- err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset)
- if err != nil {
- return errors.UpdateTaskError
- }
- continue
- }
- if len(dataMapList) > 0 {
- dwsMsg.Content = utils.MarshalJsonString(dataMapList)
- err = dutils.SendDwsMsg(dwsMsg)
- if err != nil {
- return errors.SenMqError
- }
- }
- } else {
- delete(v, "id")
- delete(v, "created_at")
- delete(v, "updated_at")
- var dataMapList []map[string]interface{}
- dataMapList = append(dataMapList, v)
- dwsMsg.Content = utils.MarshalJsonString(dataMapList)
- err = dutils.SendDwsMsg(dwsMsg)
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- // 更新完成进度数
- err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset)
- if err != nil {
- return errors.UpdateTaskError
- }
- }
- if len(results) < pageSize {
- return nil
- }
- pageNum++
- time.Sleep(2 * time.Second)
- }
- return nil
- }
- // 处理离线任务
- func HandleOfflineData(msg *apis.OdsMessage) (err error) {
- defer func() {
- if err != nil {
- //if err != errors.UpdateTaskError && err != errors.SenMqError {
- err = updateTask(int32(msg.OfflineTaskId), 0, 0, 2, 0)
- //}
- } else {
- err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0)
- }
- }()
- if msg.OfflineTaskId == 0 {
- l.Error("func",
- zap.String("call", "HandleOfflineData"),
- zap.String("args", utils.MarshalJsonString(*msg)),
- zap.String("error", "离线任务id为空"))
- return errors.ArgsError
- }
- if msg.From == consts.FromDb {
- err = handleOfflineFromDb(msg)
- } else if msg.From == consts.FromExcel {
- // excel 必须指定任务
- err = handleOfflineFromExecl(msg)
- }
- return err
- }
|