export.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package style
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/csv"
  6. "encoding/json"
  7. "fmt"
  8. "reflect"
  9. "strconv"
  10. "time"
  11. "adm-vehicle-style/errors"
  12. "adm-vehicle-style/model"
  13. "adm-vehicle-style/parser"
  14. "adm-vehicle-style/pb"
  15. v1 "adm-vehicle-style/pb/v1"
  16. "github.com/aliyun/aliyun-oss-go-sdk/oss"
  17. jsoniter "github.com/json-iterator/go"
  18. "git.getensh.com/common/gopkgsv2/database"
  19. "git.getensh.com/common/gopkgsv2/logger"
  20. "go.uber.org/zap"
  21. "google.golang.org/grpc/status"
  22. "gorm.io/gorm"
  23. )
  24. func createFileAndUpload(id int64, list []model.GdSyStyle, fields []string) {
  25. values := make([][]string, 0, 10000)
  26. values = append(values, fields)
  27. for _, v := range list {
  28. refType := reflect.TypeOf(v)
  29. refVal := reflect.ValueOf(v)
  30. length := refType.NumField()
  31. item := make([]string, 0, length)
  32. for i := 0; i < length; i++ {
  33. vField := refVal.Field(i)
  34. f := refType.Field(i)
  35. j := f.Tag.Get("json")
  36. for _, field := range fields {
  37. if field == j {
  38. switch vField.Kind() {
  39. case reflect.String:
  40. item = append(item, vField.String())
  41. case reflect.Int64:
  42. item = append(item, strconv.FormatInt(vField.Int(), 10))
  43. }
  44. }
  45. }
  46. }
  47. values = append(values, item)
  48. }
  49. client, err := oss.New(
  50. parser.Conf.Oss.EndPoint,
  51. parser.Conf.Oss.AccessKey,
  52. parser.Conf.Oss.AccessSecret,
  53. )
  54. if err != nil {
  55. return
  56. }
  57. bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket)
  58. if err != nil {
  59. return
  60. }
  61. path := "export_style/" + strconv.FormatInt(time.Now().UnixNano(), 10) + ".csv"
  62. var b bytes.Buffer
  63. _, _ = b.WriteString("\xEF\xBB\xBF")
  64. w := csv.NewWriter(&b)
  65. w.UseCRLF = true
  66. w.WriteAll(values)
  67. reader := bytes.NewReader(b.Bytes())
  68. err = bucket.PutObject(path, reader)
  69. if err != nil {
  70. return
  71. }
  72. model.NewExportStyleTaskModel().Update(database.DB().Where("id = ?", id), map[string]interface{}{
  73. "path": "https://" + parser.Conf.Oss.DownLoadBucket + "." + parser.Conf.Oss.EndPoint + "/" + path,
  74. })
  75. }
  76. func Export(ctx context.Context, req *v1.ExportStyleRequest) (reply *v1.ExportStyleReply, err error) {
  77. reply = &v1.ExportStyleReply{}
  78. // 捕获各个task中的异常并返回给调用者
  79. defer func() {
  80. if r := recover(); r != nil {
  81. err = fmt.Errorf("%+v", r)
  82. e := &status.Status{}
  83. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  84. logger.Error("err",
  85. zap.String("system_err", err.Error()),
  86. zap.Stack("stacktrace"))
  87. }
  88. }
  89. }()
  90. if req.TaskName == "" || req.Fields == "" {
  91. return reply, errors.ParamsError
  92. }
  93. type p05Request struct {
  94. TaskName string `json:"task_name"`
  95. Type int `json:"type"`
  96. StartTime int64 `json:"start_time"`
  97. EndTime int64 `json:"end_time"`
  98. List string `json:"list"`
  99. }
  100. r := p05Request{
  101. TaskName: req.TaskName,
  102. List: req.Fields,
  103. }
  104. str, _ := jsoniter.MarshalToString(r)
  105. _, err = pb.Data.Query(ctx, &v1.QueryRequest{
  106. Code: "P05",
  107. Params: str,
  108. })
  109. if err != nil {
  110. logger.Error("func",
  111. zap.String("call", "pb.Data.Query.P05"),
  112. zap.String("params", str),
  113. zap.String("error", err.Error()))
  114. return reply, err
  115. }
  116. return reply, nil
  117. }
  118. func ExportTaskList(ctx context.Context, req *v1.EmptyReply) (reply *v1.ExportStyleTaskListReply, err error) {
  119. reply = &v1.ExportStyleTaskListReply{}
  120. // 捕获各个task中的异常并返回给调用者
  121. defer func() {
  122. if r := recover(); r != nil {
  123. err = fmt.Errorf("%+v", r)
  124. e := &status.Status{}
  125. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  126. logger.Error("err",
  127. zap.String("system_err", err.Error()),
  128. zap.Stack("stacktrace"))
  129. }
  130. }
  131. }()
  132. list, err := model.NewExportStyleTaskModel().List(database.DB())
  133. if err != nil && err != gorm.ErrRecordNotFound {
  134. return reply, errors.SystemError
  135. }
  136. if err == gorm.ErrRecordNotFound {
  137. return reply, nil
  138. }
  139. reply.List = make([]*v1.ExportStyleTaskList, 0, len(list))
  140. for _, v := range list {
  141. reply.List = append(reply.List, &v1.ExportStyleTaskList{
  142. Id: v.ID,
  143. TaskName: v.TaskName,
  144. Path: v.Path,
  145. CreatedAt: v.CreatedAt.Format("2006-01-02 15:04:05"),
  146. })
  147. }
  148. return reply, nil
  149. }
  150. func DeleteExportTask(ctx context.Context, req *v1.DeleteExportStyleTaskRequest) (reply *v1.EmptyReply, err error) {
  151. reply = &v1.EmptyReply{}
  152. // 捕获各个task中的异常并返回给调用者
  153. defer func() {
  154. if r := recover(); r != nil {
  155. err = fmt.Errorf("%+v", r)
  156. e := &status.Status{}
  157. if er := json.Unmarshal([]byte(err.Error()), e); er != nil {
  158. logger.Error("err",
  159. zap.String("system_err", err.Error()),
  160. zap.Stack("stacktrace"))
  161. }
  162. }
  163. }()
  164. if req.Id <= 0 {
  165. return reply, errors.ParamsError
  166. }
  167. db := database.DB().Where("id = ?", req.Id)
  168. task, err := model.NewExportStyleTaskModel().Get(db)
  169. if err != nil && err != gorm.ErrRecordNotFound {
  170. return reply, errors.SystemError
  171. }
  172. if err == gorm.ErrRecordNotFound {
  173. return reply, errors.DataNotExistError
  174. }
  175. if task.Path != "" {
  176. client, err := oss.New(
  177. parser.Conf.Oss.EndPoint,
  178. parser.Conf.Oss.AccessKey,
  179. parser.Conf.Oss.AccessSecret,
  180. )
  181. if err != nil {
  182. return reply, errors.SystemError
  183. }
  184. bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket)
  185. if err != nil {
  186. return reply, errors.SystemError
  187. }
  188. path := task.Path[len("https://"+parser.Conf.Oss.DownLoadBucket+"."+parser.Conf.Oss.EndPoint+"/"):]
  189. err = bucket.DeleteObject(path)
  190. if err != nil {
  191. return reply, errors.SystemError
  192. }
  193. }
  194. if err = model.NewExportStyleTaskModel().Delete(db); err != nil {
  195. return reply, errors.SystemError
  196. }
  197. return reply, nil
  198. }