p05.go 10 KB


  1. package query
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/csv"
  7. "encoding/json"
  8. "fmt"
  9. "reflect"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "time"
  14. "adm-data/consts"
  15. "adm-data/errors"
  16. "adm-data/model"
  17. "adm-data/parser"
  18. v1 "adm-data/pb/v1"
  19. "github.com/aliyun/aliyun-oss-go-sdk/oss"
  20. "github.com/go-redis/redis"
  21. "github.com/tealeg/xlsx"
  22. jsoniter "github.com/json-iterator/go"
  23. "git.getensh.com/common/gopkgsv2/cache"
  24. "git.getensh.com/common/gopkgsv2/database"
  25. "git.getensh.com/common/gopkgsv2/id"
  26. "git.getensh.com/common/gopkgsv2/logger"
  27. "go.uber.org/zap"
  28. "google.golang.org/grpc/status"
  29. )
  30. type p05Request struct {
  31. TaskName string `json:"task_name"`
  32. Type int `json:"type"`
  33. StartTime int64 `json:"start_time"`
  34. EndTime int64 `json:"end_time"`
  35. List string `json:"list"`
  36. }
  37. type p05Response struct {
  38. ExportId string `json:"export_id"`
  39. }
  40. type p05Msg struct {
  41. ExportId string `json:"export_id"`
  42. p05Request
  43. }
  44. func p05(ctx context.Context, params string) (reply *v1.QueryResponse, err error) {
  45. reply = &v1.QueryResponse{}
  46. // 捕获各个task中的异常并返回给调用者
  47. defer func() {
  48. if r := recover(); r != nil {
  49. err = fmt.Errorf("%+v", r)
  50. e := &status.Status{}
  51. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  52. logger.Error("err",
  53. zap.String("system_err", err.Error()),
  54. zap.Stack("stacktrace"))
  55. }
  56. }
  57. }()
  58. var req p05Request
  59. err = jsoniter.UnmarshalFromString(params, &req)
  60. if err != nil && (req.Type < 0 || req.Type > 2) {
  61. return nil, errors.ParamsError
  62. }
  63. if req.StartTime < 0 {
  64. return nil, status.Error(10003, "参数错误,开始时间不能为负数")
  65. }
  66. if req.EndTime < 0 {
  67. return nil, status.Error(10003, "参数错误,结束时间不能为负数")
  68. }
  69. if req.StartTime != 0 && req.EndTime != 0 && req.StartTime > req.EndTime {
  70. return nil, status.Error(10003, "参数错误,开始时间大于结束时间")
  71. }
  72. if req.List != "" {
  73. arr := strings.Split(req.List, ",")
  74. m := model.GdSyStyle{}
  75. refType := reflect.TypeOf(m)
  76. for _, v := range arr {
  77. exist := false
  78. for i := 0; i < refType.NumField(); i++ {
  79. f := refType.Field(i)
  80. j := f.Tag.Get("json")
  81. if v == j {
  82. exist = true
  83. break
  84. }
  85. }
  86. if !exist {
  87. return nil, status.Error(11003, "字段:"+v+"不存在")
  88. }
  89. }
  90. }
  91. uid, err := id.GetUniqueID()
  92. if err != nil {
  93. return nil, errors.CreateIdError
  94. }
  95. exportId := strconv.FormatUint(uid, 10)
  96. data := model.ExportStyleTask{
  97. ExportId: exportId,
  98. TaskName: req.TaskName,
  99. CreatedAt: time.Now(),
  100. }
  101. if req.TaskName == "" {
  102. data.TaskName = "GD车型导出"
  103. }
  104. if err = model.NewExportStyleTaskModel().Insert(database.DB(), &data); err != nil {
  105. return nil, errors.ExportError
  106. }
  107. msg := p05Msg{
  108. exportId,
  109. req,
  110. }
  111. m, err := jsoniter.Marshal(msg)
  112. if err != nil {
  113. return nil, errors.CreateMsgError
  114. }
  115. if _, err = cache.Redis().RPush(consts.ExportStylelist, m); err != nil {
  116. return nil, errors.CreateMsgError
  117. }
  118. if _, err = cache.Redis().Publish(consts.ExportStyleChannel, "export"); err != nil {
  119. return nil, errors.SendMsgError
  120. }
  121. reply.Data = `{"export_id":"` + exportId + `"}`
  122. return reply, nil
  123. }
  124. func Export(req p05Request) (string, error) {
  125. var (
  126. fieldsArr []string
  127. fields []string
  128. m = model.GdSyStyle{}
  129. )
  130. if req.List != "" {
  131. arr := strings.Split(req.List, ",")
  132. refType := reflect.TypeOf(m)
  133. for _, v := range arr {
  134. exist := false
  135. for i := 0; i < refType.NumField(); i++ {
  136. f := refType.Field(i)
  137. j := f.Tag.Get("json")
  138. if strings.TrimSpace(v) == j {
  139. exist = true
  140. break
  141. }
  142. }
  143. if !exist {
  144. return "", status.Error(11003, "字段:"+v+"不存在")
  145. }
  146. }
  147. fieldsArr = make([]string, 0, len(arr))
  148. for _, v := range arr {
  149. fieldsArr = append(fieldsArr, "`"+v+"`")
  150. fields = append(fields, strings.TrimSpace(v))
  151. }
  152. } else {
  153. refType := reflect.TypeOf(m)
  154. for i := 0; i < refType.NumField(); i++ {
  155. fieldsArr = append(fieldsArr, "`"+refType.Field(i).Tag.Get("json")+"`")
  156. fields = append(fields, refType.Field(i).Tag.Get("json"))
  157. }
  158. }
  159. db := database.DB().Where("is_on = 1")
  160. if req.Type != 0 {
  161. if req.StartTime != 0 || req.EndTime != 0 {
  162. if req.EndTime == 0 {
  163. req.EndTime = time.Now().Unix()
  164. }
  165. switch req.Type {
  166. case 1:
  167. db = db.Where("created_at >= ? AND created_at <= ?", req.StartTime, req.EndTime)
  168. case 2:
  169. db = db.Where("updated_at >= ? AND updated_at <= ?", req.StartTime, req.EndTime)
  170. }
  171. }
  172. }
  173. file := xlsx.NewFile()
  174. sheet, _ := file.AddSheet("车型列表")
  175. sheet.Rows = make([]*xlsx.Row, 0, 60000)
  176. // 第一行
  177. row := sheet.AddRow()
  178. for _, v := range fields {
  179. cell := row.AddCell()
  180. cell.Value = v
  181. }
  182. var id int
  183. for {
  184. list, err := model.NewSyStyleModel().StyleList(db.Where("id > ?", id), strings.Join(fieldsArr, ","), 1000)
  185. if err != nil {
  186. break
  187. }
  188. for _, v := range list {
  189. row := sheet.AddRow()
  190. refType := reflect.TypeOf(v)
  191. refVal := reflect.ValueOf(v)
  192. for i := 0; i < refType.NumField(); i++ {
  193. f := refType.Field(i)
  194. j := f.Tag.Get("json")
  195. vField := refVal.Field(i)
  196. for _, field := range fields {
  197. if field == j {
  198. cell := row.AddCell()
  199. cell.Value = vField.String()
  200. }
  201. }
  202. }
  203. }
  204. break
  205. }
  206. client, err := oss.New(
  207. parser.Conf.Oss.EndPoint,
  208. parser.Conf.Oss.AccessKey,
  209. parser.Conf.Oss.AccessSecret,
  210. )
  211. if err != nil {
  212. return "", err
  213. }
  214. bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket)
  215. if err != nil {
  216. return "", err
  217. }
  218. path := "export_style/" + strconv.FormatInt(time.Now().Unix(), 10) + ".xlsx"
  219. var b bytes.Buffer
  220. writer := bufio.NewWriter(&b)
  221. file.Write(writer)
  222. reader := bytes.NewReader(b.Bytes())
  223. err = bucket.PutObject(path, reader)
  224. if err != nil {
  225. return "", err
  226. }
  227. return path, nil
  228. }
  229. func ExportCsv(req p05Request) (string, error) {
  230. var (
  231. fieldsArr []string
  232. m = model.GdSyStyle{}
  233. values = make([][]string, 0, 10000)
  234. fields []string
  235. hasId bool
  236. )
  237. if req.List != "" {
  238. arr := strings.Split(req.List, ",")
  239. for k := range arr {
  240. arr[k] = strings.TrimSpace(arr[k])
  241. if arr[k] == "id" {
  242. hasId = true
  243. }
  244. }
  245. refType := reflect.TypeOf(m)
  246. for _, v := range arr {
  247. exist := false
  248. for i := 0; i < refType.NumField(); i++ {
  249. f := refType.Field(i)
  250. j := f.Tag.Get("json")
  251. if v == j {
  252. exist = true
  253. break
  254. }
  255. }
  256. if !exist {
  257. return "", status.Error(11003, "字段:"+v+"不存在")
  258. }
  259. }
  260. fieldsArr = make([]string, 0, len(arr))
  261. fields = make([]string, 0, len(arr))
  262. for _, v := range arr {
  263. fieldsArr = append(fieldsArr, "`"+v+"`")
  264. fields = append(fields, v)
  265. }
  266. values = append(values, fields)
  267. } else {
  268. hasId = true
  269. refType := reflect.TypeOf(m)
  270. fields = make([]string, 0, refType.NumField())
  271. for i := 0; i < refType.NumField(); i++ {
  272. fieldsArr = append(fieldsArr, "`"+refType.Field(i).Tag.Get("json")+"`")
  273. fields = append(fields, refType.Field(i).Tag.Get("json"))
  274. }
  275. values = append(values, fields)
  276. }
  277. var id int
  278. fieldStr := strings.Join(fieldsArr, ",")
  279. if !hasId {
  280. fieldStr = "`id`," + fieldStr
  281. }
  282. for {
  283. db := database.DB().Where("is_on = 1")
  284. if req.Type != 0 {
  285. if req.StartTime != 0 || req.EndTime != 0 {
  286. if req.EndTime == 0 {
  287. req.EndTime = time.Now().Unix()
  288. }
  289. switch req.Type {
  290. case 1:
  291. db = db.Where("created_at >= ? AND created_at <= ?", req.StartTime, req.EndTime)
  292. case 2:
  293. db = db.Where("updated_at >= ? AND updated_at <= ?", req.StartTime, req.EndTime)
  294. }
  295. }
  296. }
  297. db = db.Where("id > ?", id)
  298. list, err := model.NewSyStyleModel().StyleListByMap(db, fieldStr, 1000)
  299. if err != nil {
  300. break
  301. }
  302. for _, v := range list {
  303. item := make([]string, 0, len(v))
  304. for _, field := range fields {
  305. if i, ok := v[field]; ok {
  306. switch i.(type) {
  307. case string:
  308. item = append(item, i.(string))
  309. case int64:
  310. item = append(item, strconv.FormatInt(i.(int64), 10))
  311. }
  312. }
  313. }
  314. values = append(values, item)
  315. id = int(v["id"].(int64))
  316. }
  317. // for _, v := range list {
  318. // refType := reflect.TypeOf(v)
  319. // refVal := reflect.ValueOf(v)
  320. // length := refType.NumField()
  321. // item := make([]string, 0, length)
  322. // for _, field := range fields {
  323. // for i := 0; i < length; i++ {
  324. // vField := refVal.Field(i)
  325. // f := refType.Field(i)
  326. // j := f.Tag.Get("json")
  327. // if field == j {
  328. // switch vField.Kind() {
  329. // case reflect.String:
  330. // item = append(item, vField.String())
  331. // case reflect.Int64:
  332. // item = append(item, strconv.FormatInt(vField.Int(), 10))
  333. // }
  334. // }
  335. // }
  336. // }
  337. // values = append(values, item)
  338. // id = int(v.ID)
  339. // }
  340. }
  341. client, err := oss.New(
  342. parser.Conf.Oss.EndPoint,
  343. parser.Conf.Oss.AccessKey,
  344. parser.Conf.Oss.AccessSecret,
  345. )
  346. if err != nil {
  347. return "", err
  348. }
  349. bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket)
  350. if err != nil {
  351. return "", err
  352. }
  353. path := "export_style/" + strconv.FormatInt(time.Now().UnixNano(), 10) + ".csv"
  354. var b bytes.Buffer
  355. _, _ = b.WriteString("\xEF\xBB\xBF")
  356. w := csv.NewWriter(&b)
  357. w.UseCRLF = true
  358. w.WriteAll(values)
  359. reader := bytes.NewReader(b.Bytes())
  360. err = bucket.PutObject(path, reader)
  361. if err != nil {
  362. return "", err
  363. }
  364. return path, nil
  365. }
  366. func HandleExportMsg(message *redis.Message) {
  367. if message.Payload != "export" {
  368. return
  369. }
  370. for {
  371. res, err := cache.Redis().LPop(consts.ExportStylelist)
  372. if err != nil || res == "" {
  373. return
  374. }
  375. var msg p05Msg
  376. err = jsoniter.UnmarshalFromString(res, &msg)
  377. if err != nil {
  378. return
  379. }
  380. if msg.ExportId == "" {
  381. return
  382. }
  383. task, err := model.NewExportStyleTaskModel().Get(database.DB().Where("export_id = ?", msg.ExportId))
  384. if err != nil {
  385. return
  386. }
  387. values := make(map[string]interface{})
  388. if time.Now().Unix()-task.CreatedAt.Unix() > 3600 {
  389. // 超过两个小时,消息不处理
  390. values["status"] = 2
  391. } else {
  392. // path, err := Export(msg.p05Request)
  393. path, err := ExportCsv(msg.p05Request)
  394. if err != nil {
  395. values["status"] = 2
  396. } else {
  397. values["status"] = 1
  398. values["path"] = "https://" + parser.Conf.Oss.DownLoadBucket + "." + parser.Conf.Oss.EndPoint + "/" + path
  399. }
  400. }
  401. model.NewExportStyleTaskModel().Update(database.DB().Where("export_id = ?", msg.ExportId), values)
  402. runtime.GC()
  403. }
  404. }