package query import ( "bufio" "bytes" "context" "encoding/csv" "encoding/json" "fmt" "reflect" "runtime" "strconv" "strings" "time" "adm-data/consts" "adm-data/errors" "adm-data/model" "adm-data/parser" v1 "adm-data/pb/v1" "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/go-redis/redis" "github.com/tealeg/xlsx" jsoniter "github.com/json-iterator/go" "git.getensh.com/common/gopkgsv2/cache" "git.getensh.com/common/gopkgsv2/database" "git.getensh.com/common/gopkgsv2/id" "git.getensh.com/common/gopkgsv2/logger" "go.uber.org/zap" "google.golang.org/grpc/status" ) type p05Request struct { TaskName string `json:"task_name"` Type int `json:"type"` StartTime int64 `json:"start_time"` EndTime int64 `json:"end_time"` List string `json:"list"` } type p05Response struct { ExportId string `json:"export_id"` } type p05Msg struct { ExportId string `json:"export_id"` p05Request } func p05(ctx context.Context, params string) (reply *v1.QueryResponse, err error) { reply = &v1.QueryResponse{} // 捕获各个task中的异常并返回给调用者 defer func() { if r := recover(); r != nil { err = fmt.Errorf("%+v", r) e := &status.Status{} if er := json.Unmarshal([]byte(err.Error()), e); er != nil { logger.Error("err", zap.String("system_err", err.Error()), zap.Stack("stacktrace")) } } }() var req p05Request err = jsoniter.UnmarshalFromString(params, &req) if err != nil && (req.Type < 0 || req.Type > 2) { return nil, errors.ParamsError } if req.StartTime < 0 { return nil, status.Error(10003, "参数错误,开始时间不能为负数") } if req.EndTime < 0 { return nil, status.Error(10003, "参数错误,结束时间不能为负数") } if req.StartTime != 0 && req.EndTime != 0 && req.StartTime > req.EndTime { return nil, status.Error(10003, "参数错误,开始时间大于结束时间") } if req.List != "" { arr := strings.Split(req.List, ",") m := model.GdSyStyle{} refType := reflect.TypeOf(m) for _, v := range arr { exist := false for i := 0; i < refType.NumField(); i++ { f := refType.Field(i) j := f.Tag.Get("json") if v == j { exist = true break } } if !exist { return nil, status.Error(11003, "字段:"+v+"不存在") } } } uid, err := id.GetUniqueID() if err != nil { return nil, errors.CreateIdError } exportId := strconv.FormatUint(uid, 10) data := model.ExportStyleTask{ ExportId: exportId, TaskName: req.TaskName, CreatedAt: time.Now(), } if req.TaskName == "" { data.TaskName = "GD车型导出" } if err = model.NewExportStyleTaskModel().Insert(database.DB(), &data); err != nil { return nil, errors.ExportError } msg := p05Msg{ exportId, req, } m, err := jsoniter.Marshal(msg) if err != nil { return nil, errors.CreateMsgError } if _, err = cache.Redis().RPush(consts.ExportStylelist, m); err != nil { return nil, errors.CreateMsgError } if _, err = cache.Redis().Publish(consts.ExportStyleChannel, "export"); err != nil { return nil, errors.SendMsgError } reply.Data = `{"export_id":"` + exportId + `"}` return reply, nil } func Export(req p05Request) (string, error) { var ( fieldsArr []string fields []string m = model.GdSyStyle{} ) if req.List != "" { arr := strings.Split(req.List, ",") refType := reflect.TypeOf(m) for _, v := range arr { exist := false for i := 0; i < refType.NumField(); i++ { f := refType.Field(i) j := f.Tag.Get("json") if strings.TrimSpace(v) == j { exist = true break } } if !exist { return "", status.Error(11003, "字段:"+v+"不存在") } } fieldsArr = make([]string, 0, len(arr)) for _, v := range arr { fieldsArr = append(fieldsArr, "`"+v+"`") fields = append(fields, strings.TrimSpace(v)) } } else { refType := reflect.TypeOf(m) for i := 0; i < refType.NumField(); i++ { fieldsArr = append(fieldsArr, "`"+refType.Field(i).Tag.Get("json")+"`") fields = append(fields, refType.Field(i).Tag.Get("json")) } } db := database.DB().Where("is_on = 1") if req.Type != 0 { if req.StartTime != 0 || req.EndTime != 0 { if req.EndTime == 0 { req.EndTime = time.Now().Unix() } switch req.Type { case 1: db = db.Where("created_at >= ? AND created_at <= ?", req.StartTime, req.EndTime) case 2: db = db.Where("updated_at >= ? AND updated_at <= ?", req.StartTime, req.EndTime) } } } file := xlsx.NewFile() sheet, _ := file.AddSheet("车型列表") sheet.Rows = make([]*xlsx.Row, 0, 60000) // 第一行 row := sheet.AddRow() for _, v := range fields { cell := row.AddCell() cell.Value = v } var id int for { list, err := model.NewSyStyleModel().StyleList(db.Where("id > ?", id), strings.Join(fieldsArr, ","), 1000) if err != nil { break } for _, v := range list { row := sheet.AddRow() refType := reflect.TypeOf(v) refVal := reflect.ValueOf(v) for i := 0; i < refType.NumField(); i++ { f := refType.Field(i) j := f.Tag.Get("json") vField := refVal.Field(i) for _, field := range fields { if field == j { cell := row.AddCell() cell.Value = vField.String() } } } } break } client, err := oss.New( parser.Conf.Oss.EndPoint, parser.Conf.Oss.AccessKey, parser.Conf.Oss.AccessSecret, ) if err != nil { return "", err } bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket) if err != nil { return "", err } path := "export_style/" + strconv.FormatInt(time.Now().Unix(), 10) + ".xlsx" var b bytes.Buffer writer := bufio.NewWriter(&b) file.Write(writer) reader := bytes.NewReader(b.Bytes()) err = bucket.PutObject(path, reader) if err != nil { return "", err } return path, nil } func ExportCsv(req p05Request) (string, error) { var ( fieldsArr []string m = model.GdSyStyle{} values = make([][]string, 0, 10000) fields []string hasId bool ) if req.List != "" { arr := strings.Split(req.List, ",") for k := range arr { arr[k] = strings.TrimSpace(arr[k]) if arr[k] == "id" { hasId = true } } refType := reflect.TypeOf(m) for _, v := range arr { exist := false for i := 0; i < refType.NumField(); i++ { f := refType.Field(i) j := f.Tag.Get("json") if v == j { exist = true break } } if !exist { return "", status.Error(11003, "字段:"+v+"不存在") } } fieldsArr = make([]string, 0, len(arr)) fields = make([]string, 0, len(arr)) for _, v := range arr { fieldsArr = append(fieldsArr, "`"+v+"`") fields = append(fields, v) } values = append(values, fields) } else { hasId = true refType := reflect.TypeOf(m) fields = make([]string, 0, refType.NumField()) for i := 0; i < refType.NumField(); i++ { fieldsArr = append(fieldsArr, "`"+refType.Field(i).Tag.Get("json")+"`") fields = append(fields, refType.Field(i).Tag.Get("json")) } values = append(values, fields) } var id int fieldStr := strings.Join(fieldsArr, ",") if !hasId { fieldStr = "`id`," + fieldStr } for { db := database.DB().Where("is_on = 1") if req.Type != 0 { if req.StartTime != 0 || req.EndTime != 0 { if req.EndTime == 0 { req.EndTime = time.Now().Unix() } switch req.Type { case 1: db = db.Where("created_at >= ? AND created_at <= ?", req.StartTime, req.EndTime) case 2: db = db.Where("updated_at >= ? AND updated_at <= ?", req.StartTime, req.EndTime) } } } db = db.Where("id > ?", id) list, err := model.NewSyStyleModel().StyleListByMap(db, fieldStr, 1000) if err != nil { break } for _, v := range list { item := make([]string, 0, len(v)) for _, field := range fields { if i, ok := v[field]; ok { switch i.(type) { case string: item = append(item, i.(string)) case int64: item = append(item, strconv.FormatInt(i.(int64), 10)) } } } values = append(values, item) id = int(v["id"].(int64)) } // for _, v := range list { // refType := reflect.TypeOf(v) // refVal := reflect.ValueOf(v) // length := refType.NumField() // item := make([]string, 0, length) // for _, field := range fields { // for i := 0; i < length; i++ { // vField := refVal.Field(i) // f := refType.Field(i) // j := f.Tag.Get("json") // if field == j { // switch vField.Kind() { // case reflect.String: // item = append(item, vField.String()) // case reflect.Int64: // item = append(item, strconv.FormatInt(vField.Int(), 10)) // } // } // } // } // values = append(values, item) // id = int(v.ID) // } } client, err := oss.New( parser.Conf.Oss.EndPoint, parser.Conf.Oss.AccessKey, parser.Conf.Oss.AccessSecret, ) if err != nil { return "", err } bucket, err := client.Bucket(parser.Conf.Oss.DownLoadBucket) if err != nil { return "", err } path := "export_style/" + strconv.FormatInt(time.Now().UnixNano(), 10) + ".csv" var b bytes.Buffer _, _ = b.WriteString("\xEF\xBB\xBF") w := csv.NewWriter(&b) w.UseCRLF = true w.WriteAll(values) reader := bytes.NewReader(b.Bytes()) err = bucket.PutObject(path, reader) if err != nil { return "", err } return path, nil } func HandleExportMsg(message *redis.Message) { if message.Payload != "export" { return } for { res, err := cache.Redis().LPop(consts.ExportStylelist) if err != nil || res == "" { return } var msg p05Msg err = jsoniter.UnmarshalFromString(res, &msg) if err != nil { return } if msg.ExportId == "" { return } task, err := model.NewExportStyleTaskModel().Get(database.DB().Where("export_id = ?", msg.ExportId)) if err != nil { return } values := make(map[string]interface{}) if time.Now().Unix()-task.CreatedAt.Unix() > 3600 { // 超过两个小时,消息不处理 values["status"] = 2 } else { // path, err := Export(msg.p05Request) path, err := ExportCsv(msg.p05Request) if err != nil { values["status"] = 2 } else { values["status"] = 1 values["path"] = "https://" + parser.Conf.Oss.DownLoadBucket + "." + parser.Conf.Oss.EndPoint + "/" + path } } model.NewExportStyleTaskModel().Update(database.DB().Where("export_id = ?", msg.ExportId), values) runtime.GC() } }