123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- /*
- package access_log 提供写日志功能
- */
- package access_log
- import (
- "context"
- "encoding/json"
- "fmt"
- "gd_access_log/apis"
- "gd_access_log/consts"
- "gd_access_log/errors"
- "gd_access_log/impl/warning"
- "gd_access_log/rpc_apis"
- "gd_access_log/rpc_apis/gd_management"
- "os"
- "time"
- "gd_access_log/common.in/mq"
- "gd_access_log/common.in/storage"
- "gd_access_log/common.in/utils"
- "github.com/astaxie/beego/orm"
- "go.uber.org/zap"
- )
- const MaxTimes = 5
- // 发送消息到ods
- func SendOdsMsg(odsMsg *apis.OdsMessage) error {
- mqMsg, _ := json.Marshal(*odsMsg)
- err := mq.OdsMq.PublishMsg(mqMsg)
- if err != nil {
- // TODO 记录消息
- /*failMsg := &model.FailMsg{}
- failMsg.Msg = string(mqMsg)
- err = failMsg.Insert(clinit.DB())
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert into t_gd_ods_fail_msg"),
- zap.String("fields", failMsg.Msg),
- zap.String("error", err.Error()))
- }*/
- return err
- }
- return nil
- }
- func timestampToDate(timestamp int64) string {
- if timestamp == 0 {
- return ""
- }
- return time.Unix(timestamp, 0).Format("2006-01-02")
- }
- func getMonthFromtimestamp(timestamp int64) int {
- return int(time.Unix(timestamp, 0).Month())
- }
- func writeAccessLogMonth(o orm.Ormer, accessLog *apis.AccessLog) (id int64, month int, err error) {
- month = getMonthFromtimestamp(accessLog.Timestamp)
- currentMonth := int(time.Now().Month())
- if month > 12 || month < 0 {
- month = currentMonth
- }
- accessLog.Id = 0
- switch month {
- case 1:
- id, err = o.Insert(&apis.TGdAccessLogMonth1{*accessLog})
- case 2:
- id, err = o.Insert(&apis.TGdAccessLogMonth2{*accessLog})
- case 3:
- id, err = o.Insert(&apis.TGdAccessLogMonth3{*accessLog})
- case 4:
- id, err = o.Insert(&apis.TGdAccessLogMonth4{*accessLog})
- case 5:
- id, err = o.Insert(&apis.TGdAccessLogMonth5{*accessLog})
- case 6:
- id, err = o.Insert(&apis.TGdAccessLogMonth6{*accessLog})
- case 7:
- id, err = o.Insert(&apis.TGdAccessLogMonth7{*accessLog})
- case 8:
- id, err = o.Insert(&apis.TGdAccessLogMonth8{*accessLog})
- case 9:
- id, err = o.Insert(&apis.TGdAccessLogMonth9{*accessLog})
- case 10:
- id, err = o.Insert(&apis.TGdAccessLogMonth10{*accessLog})
- case 11:
- id, err = o.Insert(&apis.TGdAccessLogMonth11{*accessLog})
- case 12:
- id, err = o.Insert(&apis.TGdAccessLogMonth12{*accessLog})
- }
- if err != nil {
- return id, month, errors.DataBaseError
- }
- return id, month, nil
- }
- func writeThirdAccessLogMonth(o orm.Ormer, accessId int64, month int, accessLog *apis.ThirdpartLog) (err error) {
- accessLog.AccessId = accessId
- accessLog.Id = 0
- switch month {
- case 1:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth1{*accessLog})
- case 2:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth2{*accessLog})
- case 3:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth3{*accessLog})
- case 4:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth4{*accessLog})
- case 5:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth5{*accessLog})
- case 6:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth6{*accessLog})
- case 7:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth7{*accessLog})
- case 8:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth8{*accessLog})
- case 9:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth9{*accessLog})
- case 10:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth10{*accessLog})
- case 11:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth11{*accessLog})
- case 12:
- _, err = o.Insert(&apis.TGdThirdpartLogMonth12{*accessLog})
- }
- if err != nil {
- return errors.DataBaseError
- }
- return nil
- }
- func writeMonthLog(accessLog *apis.AccessLog, thirdLog []*apis.ThirdpartLog, o orm.Ormer) error {
- accessId, month, err := writeAccessLogMonth(o, accessLog)
- if err != nil {
- return err
- }
- for _, p := range thirdLog {
- if p.ProviderName != "本地数据" {
- err := writeThirdAccessLogMonth(o, accessId, month, p)
- if err != nil {
- return err
- }
- }
- }
- return nil
- }
- func IsReuse(LReq []apis.ThirdpartLogWrite) bool {
- if len(LReq) > 0 {
- for _, v := range LReq {
- if v.ProviderCode == "" {
- return false
- }
- }
- return true
- } else {
- return true
- }
- }
- func getSearch(m map[string]interface{}) string {
- if len(m) > 0 {
- if search, _ := m["plate_no"].(string); search != "" {
- return search
- } else if search, _ := m["vin"].(string); search != "" {
- return search
- } else if search, _ := m["idcard"].(string); search != "" {
- return search
- } else if search, _ := m["id_card"].(string); search != "" {
- return search
- } else if search, _ := m["degree_id"].(string); search != "" {
- return search
- } else if search, _ := m["education_id"].(string); search != "" {
- return search
- } else if search, _ := m["order_no"].(string); search != "" {
- return search
- } else {
- return ""
- }
- }
- return ""
- }
- func writeAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, o orm.Ormer) (int64, int64, *apis.AccessLog, error) {
- var accessLog apis.AccessLog
- var accessId int64
- var rpcCount int
- var dbCount int
- var historyId int64
- for {
- mReq := gd_management.ManagementGetNameReq{}
- mReq.MerchantId = req.MerchantId
- mReq.ApiId = req.ApiId
- mreply, err := rpc_apis.Management.ManagementGetName(ctx, &mReq)
- if err != nil {
- rpcCount++
- if rpcCount > MaxTimes {
- l.Error("func",
- zap.String("call", "ManagementGetName"),
- zap.String("args", utils.MarshalJsonString(mReq)),
- zap.String("error", err.Error()))
- return 0, 0, nil, err
- }
- time.Sleep(time.Second)
- continue
- }
- accessLog.MerchantDataApiId = req.MerchantDataApiId
- if accessLog.MerchantDataApiId == 0 {
- mreq := gd_management.ManagementGetMerchantApiIdReq{
- MerchantChildApiId: req.MerchantChildApiId,
- }
- reply, _ := rpc_apis.Management.ManagementGetMerchantApiId(ctx, &mreq)
- accessLog.MerchantDataApiId = reply.MerchantDataApiId
- }
- accessLog.RawCode = req.AccessLogWrite.Code
- accessLog.Code = req.AccessLogWrite.RespCode
- accessLog.Msg = req.AccessLogWrite.Msg
- accessLog.State = getState(accessLog.Code)
- accessLog.ApiId = req.ApiId
- accessLog.RemoteAddr = req.RemoteAddr
- accessLog.MerchantId = req.MerchantId
- accessLog.Elapsed = Decimal(req.AccessLogWrite.Elapsed)
- accessLog.OrderNo = req.OrderNo
- accessLog.IsReuse = IsReuse(req.ThirdpartLogWrites)
- /*if len(req.ThirdpartLogWrites) == 0 {
- accessLog.IsReuse = true
- } else {
- accessLog.IsReuse = req.AccessLogWrite.IsReuse
- }*/
- accessLog.UtSource = req.AccessLogWrite.UtSource
- accessLog.ProviderCount = len(req.ThirdpartLogWrites)
- m := map[string]interface{}{}
- json.Unmarshal([]byte(req.AccessLogWrite.RequestParams), &m)
- bytes := []byte(req.AccessLogWrite.RequestParams)
- if len(m) > 0 {
- accessLog.Search = getSearch(m)
- if accessLog.Search == "" {
- accessLog.Search = req.AccessLogWrite.Search
- }
- bytes, _ = json.Marshal(m)
- }
- m = nil
- // 如果入参为空,则使用出参作为关键字
- if accessLog.Search == "" {
- t := map[string]interface{}{}
- _ = json.Unmarshal([]byte(req.AccessLogWrite.RawResponseParams), &t)
- if len(t) > 0 {
- accessLog.Search = getSearch(t)
- }
- t = nil
- }
- // 原始请求参数search
- m1 := map[string]interface{}{}
- json.Unmarshal([]byte(req.AccessLogWrite.RawRequestParams), &m1)
- if len(m1) > 0 {
- accessLog.RawSearch = getSearch(m1)
- m1 = nil
- }
- accessLog.RequestParams = string(bytes)
- if len(req.AccessLogWrite.ResponseParams) < 1024*1024 {
- accessLog.ResponseParams = req.AccessLogWrite.ResponseParams
- } else {
- accessLog.ResponseParams = "数据量大于1M,未记录"
- }
- accessLog.RawRequestParams = req.AccessLogWrite.RawRequestParams
- if len(req.AccessLogWrite.RawResponseParams) < 1024*1024 {
- accessLog.RawResponseParams = req.AccessLogWrite.RawResponseParams
- } else {
- accessLog.RawResponseParams = "数据量大于1M,未记录"
- }
- accessLog.Timestamp = req.TimeStamp
- accessLog.ApiName = mreply.ApiName
- accessLog.MerchantName = mreply.MerchantName
- accessLog.CreateTime = timestampToDate(req.TimeStamp)
- if true {
- go warning.AccessLogWarn(&accessLog)
- return 0, 0, &accessLog, nil
- }
- for {
- accessId, err = o.Insert(&accessLog)
- if err != nil {
- dbCount++
- if dbCount > MaxTimes {
- l.Error("mysql",
- zap.String("sql", "insert access log"),
- zap.String("args", utils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- return 0, 0, nil, err
- }
- time.Sleep(time.Second)
- continue
- }
- history := apis.TGdAccessLogHistory{}
- history.AccessLog = accessLog
- history.Id = 0
- historyId, err = o.Insert(&history)
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert access log history"),
- zap.String("args", utils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- return 0, 0, nil, err
- }
- go warning.AccessLogWarn(&accessLog)
- break
- }
- break
- }
- return accessId, historyId, &accessLog, nil
- }
- func writeThirdpartAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, id int64, historyId int64, merchantName, apiName string, o orm.Ormer) ([]*apis.ThirdpartLog, error) {
- var rpcCount int
- var dbCount int
- ret := []*apis.ThirdpartLog{}
- for index, _ := range req.ThirdpartLogWrites {
- for {
- var accessLog apis.ThirdpartLog
- mReq := gd_management.ManagementGetNameReq{}
- mReq.ProviderApiId = req.ThirdpartLogWrites[index].ProviderApiId
- mreply, err := rpc_apis.Management.ManagementGetName(ctx, &mReq)
- if err != nil {
- rpcCount++
- if rpcCount > MaxTimes {
- l.Error("func",
- zap.String("call", "ManagementGetName"),
- zap.String("args", utils.MarshalJsonString(mReq)),
- zap.String("error", err.Error()))
- return nil, err
- }
- time.Sleep(time.Second)
- continue
- }
- if req.ThirdpartLogWrites[index].Code == 20001 {
- accessLog.State = true
- } else {
- accessLog.State = req.ThirdpartLogWrites[index].State
- }
- accessLog.ProviderId = mreply.ProviderId
- accessLog.ProviderApiId = req.ThirdpartLogWrites[index].ProviderApiId
- accessLog.ApiId = req.ApiId
- accessLog.RemoteAddr = req.RemoteAddr
- if req.ThirdpartLogWrites[index].RawCode == "" {
- accessLog.RawCode = fmt.Sprintf("%d", req.ThirdpartLogWrites[index].Code)
- } else {
- accessLog.RawCode = fmt.Sprintf("%s", req.ThirdpartLogWrites[index].RawCode)
- }
- accessLog.Code = req.ThirdpartLogWrites[index].Code
- accessLog.Elapsed = Decimal(req.ThirdpartLogWrites[index].Elapsed)
- accessLog.MerchantId = req.MerchantId
- accessLog.Msg = req.ThirdpartLogWrites[index].Msg
- accessLog.RequestParams = req.ThirdpartLogWrites[index].RequestParams
- if len(req.ThirdpartLogWrites[index].ResponseParams) < 1024*1024 {
- accessLog.ResponseParams = req.ThirdpartLogWrites[index].ResponseParams
- } else {
- accessLog.ResponseParams = "数据量大于1M,未记录"
- }
- accessLog.ApiName = apiName
- accessLog.MerchantName = merchantName
- accessLog.ProviderApiName = mreply.ProviderApiName
- accessLog.ProviderName = mreply.ProviderName
- if req.ThirdpartLogWrites[index].Timestamp > 0 {
- accessLog.Timestamp = req.ThirdpartLogWrites[index].Timestamp
- } else {
- accessLog.Timestamp = req.TimeStamp
- }
- accessLog.RemoteAddr = req.RemoteAddr
- accessLog.AccessId = id
- accessLog.CreateTime = timestampToDate(accessLog.Timestamp)
- accessLog.Search = req.ThirdpartLogWrites[index].Search
- if true {
- // 发送mq消息
- if accessLog.ProviderName != "本地数据" && accessLog.State != false {
- odsMsg := &apis.OdsMessage{}
- odsMsg.MsgType = consts.ODSPROVIDERLOG
- odsMsg.SourceCode = mreply.ProviderApiCode
- content, _ := json.Marshal(accessLog)
- odsMsg.Content = string(content)
- go SendOdsMsg(odsMsg)
- }
- go warning.ThirdLogWarn(&accessLog)
- ret = append(ret, &accessLog)
- break
- }
- for {
- _, err = o.Insert(&accessLog)
- if err != nil {
- dbCount++
- if dbCount > MaxTimes {
- l.Error("mysql",
- zap.String("sql", "insert t_gd_thirdpart_access_log"),
- zap.String("args", utils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- return nil, errors.DataBaseError
- }
- time.Sleep(time.Second)
- continue
- }
- history := apis.TGdThirdpartAccessLogHistory{}
- history.ThirdpartLog = accessLog
- history.AccessId = historyId
- history.Id = 0
- _, err = o.Insert(&history)
- if err != nil {
- l.Error("mysql",
- zap.String("sql", "insert t_gd_thirdpart_access_log_history"),
- zap.String("args", utils.MarshalJsonString(accessLog)),
- zap.String("error", err.Error()))
- return nil, errors.DataBaseError
- }
- go warning.ThirdLogWarn(&accessLog)
- ret = append(ret, &accessLog)
- break
- }
- break
- }
- }
- return ret, nil
- }
- func writeToLocal(req *apis.LogAddAccessLogReq) {
- bytes, _ := json.Marshal(req)
- f, err := os.OpenFile("/var/log/gd_access_log_req.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
- if err != nil {
- return
- }
- defer f.Close()
- value := fmt.Sprintf("%s\n", string(bytes))
- f.Write([]byte(value))
- }
- func LogAddAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, reply *apis.LogAddAccessLogReply) error {
- task := func(o orm.Ormer) error {
- d, _ := json.Marshal(*req)
- fmt.Println(string(d))
- id, historyId, accessLog, err := writeAccessLog(ctx, req, o)
- if err != nil {
- l.Error("access log",
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("err", err.Error()))
- writeToLocal(req)
- return err
- }
- thirdLog, err := writeThirdpartAccessLog(ctx, req, id, historyId, accessLog.MerchantName, accessLog.ApiName, o)
- if err != nil {
- l.Error("thirdparty access log",
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("err", err.Error()))
- writeToLocal(req)
- return err
- }
- if err := writeMonthLog(accessLog, thirdLog, o); err != nil {
- l.Error("month log",
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("err", err.Error()))
- return err
- }
- if err := writeLogDay(accessLog, thirdLog, o); err != nil {
- l.Error("day log",
- zap.String("args", utils.MarshalJsonString(*req)),
- zap.String("err", err.Error()))
- return err
- }
- return nil
- }
- tasks := []storage.DbaTasker{}
- tasks = append(tasks, storage.GenerateDbaTask(task))
- storage.ExecTrans(tasks...)
- return nil
- }
|