offline_data.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package handle
  4. import (
  5. "context"
  6. "fmt"
  7. "strings"
  8. "time"
  9. "adm-ods/apis"
  10. "adm-ods/common.in/clinit"
  11. "adm-ods/common.in/utils"
  12. "adm-ods/consts"
  13. "adm-ods/errors"
  14. "adm-ods/pb"
  15. v1 "adm-ods/pb/v1"
  16. dutils "adm-ods/utils"
  17. "github.com/tealeg/xlsx"
  18. "go.uber.org/zap"
  19. "gorm.io/gorm"
  20. )
  21. func updateTask(taskId, total, finishCount, isFinish int32, lastId int64) error {
  22. req := &v1.UpdateOfflineTaskRequest{Total: total, TaskId: taskId, FinishCount: finishCount, IsFinish: isFinish}
  23. _, err := pb.AdmTask.UpdateOfflineTask(context.Background(), req)
  24. if err != nil {
  25. l.Error("rpc",
  26. zap.String("call", "pb.AdmTask.UpdateOfflineTask"),
  27. zap.String("args", utils.MarshalJsonString(*req)),
  28. zap.String("error", err.Error()))
  29. return errors.UpdateTaskError
  30. }
  31. if finishCount != 0 {
  32. err = dutils.SetLastId(int64(taskId), lastId)
  33. if err != nil {
  34. return errors.UpdateTaskError
  35. }
  36. }
  37. if isFinish != 0 {
  38. dutils.DeleteLastId(int64(taskId))
  39. }
  40. return err
  41. }
  42. func handleOfflineFromExecl(msg *apis.OdsMessage) (err error) {
  43. if len(msg.TaskList) == 0 {
  44. l.Error("func",
  45. zap.String("call", "HandleOfflineData"),
  46. zap.String("args", utils.MarshalJsonString(*msg)),
  47. zap.String("error", "excel导入任务必须指定任务"))
  48. return nil
  49. }
  50. nameMap := make(map[int]string)
  51. downloadedFileName := msg.Content
  52. if true {
  53. err = utils.OssDownloadFile(msg.Content, downloadedFileName)
  54. // 下载文件失败
  55. if err != nil {
  56. l.Error("func",
  57. zap.String("call", "handleOfflineFromExecl.OssDownloadFile"),
  58. zap.String("args", msg.Content),
  59. zap.String("error", err.Error()))
  60. return err
  61. }
  62. }
  63. file, err := xlsx.OpenFile(downloadedFileName)
  64. if err != nil {
  65. l.Error("func",
  66. zap.String("call", "handleOfflineFromExecl.OpenFile"),
  67. zap.String("args", downloadedFileName),
  68. zap.String("error", err.Error()))
  69. return err
  70. }
  71. if len(file.Sheets) == 0 {
  72. l.Error("func",
  73. zap.String("call", "handleOfflineFromExecl.Sheets"),
  74. zap.String("args", downloadedFileName),
  75. zap.String("error", "not found sheets"))
  76. return nil
  77. }
  78. sheet := file.Sheets[0]
  79. if len(sheet.Rows) == 0 {
  80. l.Error("func",
  81. zap.String("call", "handleOfflineFromExecl.Rows"),
  82. zap.String("args", downloadedFileName),
  83. zap.String("error", "not found rows"))
  84. return nil
  85. }
  86. // 获取最后一次发送id
  87. lastId := dutils.GetLastId(msg.OfflineTaskId)
  88. if lastId == 0 {
  89. // 设置任务总数
  90. err = updateTask(int32(msg.OfflineTaskId), int32(len(sheet.Rows)-1), 0, 0, 0)
  91. if err != nil {
  92. return errors.UpdateTaskError
  93. }
  94. }
  95. for index, row := range sheet.Rows {
  96. dataMap := make(map[string]interface{})
  97. if index == 0 {
  98. for cellIndex, cell := range row.Cells {
  99. value := strings.TrimSpace(cell.Value)
  100. if value != "" {
  101. nameMap[cellIndex] = value
  102. }
  103. }
  104. continue
  105. }
  106. if len(nameMap) == 0 {
  107. l.Error("func",
  108. zap.String("call", "handleOfflineFromExecl.Rows"),
  109. zap.String("args", downloadedFileName),
  110. zap.String("error", "name map len is 0"))
  111. return fmt.Errorf("表头错误")
  112. }
  113. if int64(index) <= lastId {
  114. fmt.Println("already send:", index)
  115. // 小于最后一次发送,表示已经发送过,继续后面的内容
  116. continue
  117. }
  118. for cellIndex, cell := range row.Cells {
  119. if v, ok := nameMap[cellIndex]; ok {
  120. dataMap[v] = cell.Value
  121. }
  122. }
  123. if len(dataMap) > 0 {
  124. dwsMsg := dutils.NewDwsMessage(msg)
  125. var dataMapList []map[string]interface{}
  126. dataMapList = append(dataMapList, dataMap)
  127. dwsMsg.Content = utils.MarshalJsonString(dataMapList)
  128. err = dutils.SendDwsMsg(dwsMsg)
  129. if err != nil {
  130. return errors.SenMqError
  131. } else {
  132. err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, int64(index))
  133. if err != nil {
  134. return errors.UpdateTaskError
  135. }
  136. }
  137. }
  138. }
  139. /*err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0)
  140. if err != nil {
  141. return err
  142. }*/
  143. // 已处理完返回正常,结束消费
  144. return nil
  145. }
  146. func handleOfflineFromDb(msg *apis.OdsMessage) (err error) {
  147. pageSize := consts.DEFAULTPAGESIZE
  148. pageNum := 0
  149. total := int64(0)
  150. countSql := strings.Replace(msg.Content, "*", "count(*)", -1)
  151. err = clinit.DB().Raw(countSql).Count(&total).Error
  152. if err != nil {
  153. if err == gorm.ErrRecordNotFound {
  154. return nil
  155. }
  156. return err
  157. }
  158. if total == 0 {
  159. return nil
  160. }
  161. offset := dutils.GetLastId(msg.OfflineTaskId)
  162. if offset == 0 {
  163. err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 0, 0)
  164. if err != nil {
  165. return errors.UpdateTaskError
  166. }
  167. }
  168. task, ok := OdsOnlineTaskMap[msg.SourceCode]
  169. isFirst := true
  170. id := int64(0)
  171. for {
  172. // 判断是否结束任务
  173. if dutils.IsStop(msg.OfflineTaskId) {
  174. err = updateTask(int32(msg.OfflineTaskId), int32(total), 0, 2, 0)
  175. if err != nil {
  176. return errors.UpdateTaskError
  177. }
  178. return nil
  179. }
  180. // 循环查询数据
  181. var results []map[string]interface{}
  182. sql := msg.Content
  183. if isFirst {
  184. // 第一次判断
  185. sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset)
  186. isFirst = false
  187. } else {
  188. if !strings.Contains(sql, "where") && id > 0 {
  189. sql = fmt.Sprintf("%s where id>%d limit %d", sql, id, pageSize)
  190. } else {
  191. sql = fmt.Sprintf("%s limit %d offset %d", sql, pageSize, offset)
  192. }
  193. }
  194. fmt.Println("sql11111111111111111111111111111111111:", sql, offset, id)
  195. err = clinit.DB().Raw(sql).Find(&results).Error
  196. if err != nil {
  197. // 查无返回正常,没有数据处理
  198. if err == gorm.ErrRecordNotFound {
  199. return nil
  200. }
  201. return err
  202. }
  203. resultLen := int64(len(results))
  204. if resultLen == 0 {
  205. return nil
  206. }
  207. offset = offset + resultLen
  208. for _, v := range results {
  209. if v, ok := v["id"]; ok {
  210. switch v.(type) {
  211. case int:
  212. id = int64(v.(int))
  213. case int32:
  214. id = int64(v.(int32))
  215. case int64:
  216. id = v.(int64)
  217. }
  218. }
  219. dwsMsg := dutils.NewDwsMessage(msg)
  220. if ok {
  221. dataMapList, err := task.ParasFunction(v["content"].(string))
  222. if err != nil {
  223. // 更新完成进度数
  224. err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset)
  225. if err != nil {
  226. return errors.UpdateTaskError
  227. }
  228. continue
  229. }
  230. if len(dataMapList) > 0 {
  231. dwsMsg.Content = utils.MarshalJsonString(dataMapList)
  232. err = dutils.SendDwsMsg(dwsMsg)
  233. if err != nil {
  234. return errors.SenMqError
  235. }
  236. }
  237. } else {
  238. delete(v, "id")
  239. delete(v, "created_at")
  240. delete(v, "updated_at")
  241. var dataMapList []map[string]interface{}
  242. dataMapList = append(dataMapList, v)
  243. dwsMsg.Content = utils.MarshalJsonString(dataMapList)
  244. err = dutils.SendDwsMsg(dwsMsg)
  245. if err != nil {
  246. return errors.UpdateTaskError
  247. }
  248. }
  249. // 更新完成进度数
  250. err = updateTask(int32(msg.OfflineTaskId), 0, 1, 0, offset)
  251. if err != nil {
  252. return errors.UpdateTaskError
  253. }
  254. }
  255. if len(results) < pageSize {
  256. return nil
  257. }
  258. pageNum++
  259. time.Sleep(2 * time.Second)
  260. }
  261. return nil
  262. }
  263. // 处理离线任务
  264. func HandleOfflineData(msg *apis.OdsMessage) (err error) {
  265. defer func() {
  266. if err != nil {
  267. //if err != errors.UpdateTaskError && err != errors.SenMqError {
  268. err = updateTask(int32(msg.OfflineTaskId), 0, 0, 2, 0)
  269. //}
  270. } else {
  271. err = updateTask(int32(msg.OfflineTaskId), 0, 0, 1, 0)
  272. }
  273. }()
  274. if msg.OfflineTaskId == 0 {
  275. l.Error("func",
  276. zap.String("call", "HandleOfflineData"),
  277. zap.String("args", utils.MarshalJsonString(*msg)),
  278. zap.String("error", "离线任务id为空"))
  279. return errors.ArgsError
  280. }
  281. if msg.From == consts.FromDb {
  282. err = handleOfflineFromDb(msg)
  283. } else if msg.From == consts.FromExcel {
  284. // excel 必须指定任务
  285. err = handleOfflineFromExecl(msg)
  286. }
  287. return err
  288. }