// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. package handle import ( "context" "fmt" "strings" "time" "adm-ods/apis" "adm-ods/common.in/clinit" "adm-ods/common.in/utils" "adm-ods/consts" "adm-ods/errors" "adm-ods/pb" v1 "adm-ods/pb/v1" dutils "adm-ods/utils" "github.com/tealeg/xlsx" "go.uber.org/zap" "gorm.io/gorm" ) func updateTask(taskId, total, finishCount, isFinish int32, lastId int64) error { req := &v1.UpdateOfflineTaskRequest{Total: total, TaskId: taskId, FinishCount: finishCount, IsFinish: isFinish} _, err := pb.AdmTask.UpdateOfflineTask(context.Background(), req) if err != nil { l.Error("rpc", zap.String("call", "pb.AdmTask.UpdateOfflineTask"), zap.String("args", utils.MarshalJsonString(*req)), zap.String("error", err.Error())) return errors.UpdateTaskError } if finishCount != 0 { err = dutils.SetLastId(int64(taskId), lastId) if err != nil { return errors.UpdateTaskError } } if isFinish != 0 { dutils.DeleteLastId(int64(taskId)) } return err } func handleOfflineFromExecl(msg *apis.OdsMessage) (err error) { if len(msg.TaskList) == 0 { l.Error("func", zap.String("call", "HandleOfflineData"), zap.String("args", utils.MarshalJsonString(*msg)), zap.String("error", "excel导入任务必须指定任务")) return nil } nameMap := make(map[int]string) downloadedFileName := msg.Content if true { err = utils.OssDownloadFile(msg.Content, downloadedFileName) // 下载文件失败 if err != nil { l.Error("func", zap.String("call", "handleOfflineFromExecl.OssDownloadFile"), zap.String("args", msg.Content), zap.String("error", err.Error())) return err } } file, err := xlsx.OpenFile(downloadedFileName) if err != nil { l.Error("func", zap.String("call", "handleOfflineFromExecl.OpenFile"), zap.String("args", downloadedFileName), zap.String("error", err.Error())) return err } if len(file.Sheets) == 0 { l.Error("func", zap.String("call", "handleOfflineFromExecl.Sheets"), zap.String("args", downloadedFileName), zap.String("error", "not found sheets")) return nil } sheet := file.Sheets[0] if len(sheet.Rows) == 0 { l.Error("func", zap.String("call", "handleOfflineFromExecl.Rows"), zap.String("args", downloadedFileName), zap.String("error", "not found rows")) return nil } // 获取最后一次发送id lastId := dutils.GetLastId(msg.OfflineTaskId) if lastId == 0 { // 设置任务总数 err = updateTask(int32(msg.OfflineTaskId), int32(len(sheet.Rows)-1), 0, 0, 0) if err != nil { return errors.UpdateTaskError } } for index, row := range sheet.Rows { dataMap := make(map[string]interface{}) if index == 0 { for cellIndex, cell := range row.Cells { value := strings.TrimSpace(cell.Value) if value != "" { nameMap[cellIndex] = value } } continue } if len(nameMap) == 0 { l.Error("func", zap.String("call", "handleOfflineFromExecl.Rows"), zap.String("args", downloadedFileName), zap.String("error", "name map len is 0")) return fmt.Errorf("表头错误") } if int64(index) <= lastId { fmt.Println("already send:", index) // 小于最后一次发送,表示已经发送过,继续后面的内容 continue } for cellIndex, cell := range row.Cells { if v, ok := nameMap[cellIndex]; ok { dataMap[v] = cell.Value } } if len(dataMap) > 0 { dwsMsg := dutils.NewDwsMessage(msg) var dataMapList []map[string]interface{} dataMapList = append(dataMapList, dataMap) dwsMsg.Content = utils.MarshalJsonString(dataMapList) err = dutils.SendDwsMsg(dwsMsg) if err != nil { return errors.SenMqError } else { err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, int64(index)) if err != nil { return errors.UpdateTaskError } } } } /*err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0) if err != nil { return err }*/ // 已处理完返回正常,结束消费 return nil } func handleOfflineFromDb(msg *apis.OdsMessage) (err error) { pageSize := consts.DEFAULTPAGESIZE pageNum := 0 total := int64(0) countSql := strings.Replace(msg.Content, "*", "count(*)", -1) err = clinit.DB().Raw(countSql).Count(&total).Error if err != nil { if err == gorm.ErrRecordNotFound { return nil } return err } if total == 0 { return nil } offset := dutils.GetLastId(msg.OfflineTaskId) if offset == 0 { err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 0, 0) if err != nil { return errors.UpdateTaskError } } task, ok := OdsOnlineTaskMap[msg.SourceCode] isFirst := true id := int64(0) for { // 判断是否结束任务 if dutils.IsStop(msg.OfflineTaskId) { err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 2, 0) if err != nil { return errors.UpdateTaskError } return nil } // 循环查询数据 var results []map[string]interface{} sql := msg.Content if isFirst { // 第一次判断 sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset) isFirst = false } else { if !strings.Contains(sql, "where") && id > 0 { sql = fmt.Sprintf("%s where id>%d limit %d", sql, id, pageSize) } else { sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset) } } fmt.Println("sql11111111111111111111111111111111111:", sql, offset, id) err = clinit.DB().Raw(sql).Find(&results).Error if err != nil { // 查无返回正常,没有数据处理 if err == gorm.ErrRecordNotFound { return nil } return err } resultLen := int64(len(results)) if resultLen == 0 { return nil } offset = offset + resultLen for _, v := range results { if v, ok := v["id"]; ok { switch v.(type) { case int: id = int64(v.(int)) case int32: id = int64(v.(int32)) case int64: id = v.(int64) } } dwsMsg := dutils.NewDwsMessage(msg) if ok { dataMapList, err := task.ParasFunction(v["content"].(string)) if err != nil { // 更新完成进度数 err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset) if err != nil { return errors.UpdateTaskError } continue } if len(dataMapList) > 0 { dwsMsg.Content = utils.MarshalJsonString(dataMapList) err = dutils.SendDwsMsg(dwsMsg) if err != nil { return errors.SenMqError } } } else { delete(v, "id") delete(v, "created_at") delete(v, "updated_at") var dataMapList []map[string]interface{} dataMapList = append(dataMapList, v) dwsMsg.Content = utils.MarshalJsonString(dataMapList) err = dutils.SendDwsMsg(dwsMsg) if err != nil { return errors.UpdateTaskError } } // 更新完成进度数 err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset) if err != nil { return errors.UpdateTaskError } } if len(results) < pageSize { return nil } pageNum++ time.Sleep(2 * time.Second) } return nil } // 处理离线任务 func HandleOfflineData(msg *apis.OdsMessage) (err error) { defer func() { if err != nil { //if err != errors.UpdateTaskError && err != errors.SenMqError { err = updateTask(int32(msg.OfflineTaskId), 0, 0, 2, 0) //} } else { err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0) } }() if msg.OfflineTaskId == 0 { l.Error("func", zap.String("call", "HandleOfflineData"), zap.String("args", utils.MarshalJsonString(*msg)), zap.String("error", "离线任务id为空")) return errors.ArgsError } if msg.From == consts.FromDb { err = handleOfflineFromDb(msg) } else if msg.From == consts.FromExcel { // excel 必须指定任务 err = handleOfflineFromExecl(msg) } return err }