package style import ( "bytes" "context" "encoding/csv" "encoding/json" "fmt" "reflect" "strconv" "time" "adm-vehicle-style/errors" "adm-vehicle-style/model" "adm-vehicle-style/parser" "adm-vehicle-style/pb" v1 "adm-vehicle-style/pb/v1" "github.com/aliyun/aliyun-oss-go-sdk/oss" jsoniter "github.com/json-iterator/go" "git.getensh.com/common/gopkgsv2/database" "git.getensh.com/common/gopkgsv2/logger" "go.uber.org/zap" "google.golang.org/grpc/status" "gorm.io/gorm" ) func createFileAndUpload(id int64, list []model.GdSyStyle, fields []string) { values := make([][]string, 0, 10000) values = append(values, fields) for _, v := range list { refType := reflect.TypeOf(v) refVal := reflect.ValueOf(v) length := refType.NumField() item := make([]string, 0, length) for i := 0; i < length; i++ { vField := refVal.Field(i) f := refType.Field(i) j := f.Tag.Get("json") for _, field := range fields { if field == j { switch vField.Kind() { case reflect.String: item = append(item, vField.String()) case reflect.Int64: item = append(item, strconv.FormatInt(vField.Int(), 10)) } } } } values = append(values, item) } client, err := oss.New( parser.Conf.Oss.EndPoint, parser.Conf.Oss.AccessKey, parser.Conf.Oss.AccessSecret, ) if err != nil { return } bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket) if err != nil { return } path := "export_style/" + strconv.FormatInt(time.Now().UnixNano(), 10) + ".csv" var b bytes.Buffer _, _ = b.WriteString("\xEF\xBB\xBF") w := csv.NewWriter(&b) w.UseCRLF = true w.WriteAll(values) reader := bytes.NewReader(b.Bytes()) err = bucket.PutObject(path, reader) if err != nil { return } model.NewExportStyleTaskModel().Update(database.DB().Where("id = ?", id), map[string]interface{}{ "path": "https://" + parser.Conf.Oss.DownLoadBucket + "." + parser.Conf.Oss.EndPoint + "/" + path, }) } func Export(ctx context.Context, req *v1.ExportStyleRequest) (reply *v1.ExportStyleReply, err error) { reply = &v1.ExportStyleReply{} // 捕获各个task中的异常并返回给调用者 defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) e := &status.Status{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } } }() if req.TaskName == "" || req.Fields == "" { return reply, errors.ParamsError } type p05Request struct { TaskName string `json:"task_name"` Type int `json:"type"` StartTime int64 `json:"start_time"` EndTime int64 `json:"end_time"` List string `json:"list"` } r := p05Request{ TaskName: req.TaskName, List: req.Fields, } str, _ := jsoniter.MarshalToString(r) _, err = pb.Data.Query(ctx, &v1.QueryRequest{ Code: "P05", Params: str, }) if err != nil { logger.Error("func", zap.String("call", "pb.Data.Query.P05"), zap.String("params", str), zap.String("error", err.Error())) return reply, err } return reply, nil } func ExportTaskList(ctx context.Context, req *v1.EmptyReply) (reply *v1.ExportStyleTaskListReply, err error) { reply = &v1.ExportStyleTaskListReply{} // 捕获各个task中的异常并返回给调用者 defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) e := &status.Status{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } } }() list, err := model.NewExportStyleTaskModel().List(database.DB()) if err != nil && err != gorm.ErrRecordNotFound { return reply, errors.SystemError } if err == gorm.ErrRecordNotFound { return reply, nil } reply.List = make([]*v1.ExportStyleTaskList, 0, len(list)) for _, v := range list { reply.List = append(reply.List, &v1.ExportStyleTaskList{ Id: v.ID, TaskName: v.TaskName, Path: v.Path, CreatedAt: v.CreatedAt.Format("2006-01-02 15:04:05"), }) } return reply, nil } func DeleteExportTask(ctx context.Context, req *v1.DeleteExportStyleTaskRequest) (reply *v1.EmptyReply, err error) { reply = &v1.EmptyReply{} // 捕获各个task中的异常并返回给调用者 defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) e := &status.Status{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } } }() if req.Id <= 0 { return reply, errors.ParamsError } db := database.DB().Where("id = ?", req.Id) task, err := model.NewExportStyleTaskModel().Get(db) if err != nil && err != gorm.ErrRecordNotFound { return reply, errors.SystemError } if err == gorm.ErrRecordNotFound { return reply, errors.DataNotExistError } if task.Path != "" { client, err := oss.New( parser.Conf.Oss.EndPoint, parser.Conf.Oss.AccessKey, parser.Conf.Oss.AccessSecret, ) if err != nil { return reply, errors.SystemError } bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket) if err != nil { return reply, errors.SystemError } path := task.Path[len("https://"+parser.Conf.Oss.DownLoadBucket+"."+parser.Conf.Oss.EndPoint+"/"):] err = bucket.DeleteObject(path) if err != nil { return reply, errors.SystemError } } if err = model.NewExportStyleTaskModel().Delete(db); err != nil { return reply, errors.SystemError } return reply, nil }