log_add_access_log.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. /*
  4. package access_log 提供写日志功能
  5. */
  6. package access_log
  7. import (
  8. "context"
  9. "encoding/json"
  10. "fmt"
  11. "gd_access_log/apis"
  12. "gd_access_log/consts"
  13. "gd_access_log/errors"
  14. "gd_access_log/impl/warning"
  15. "gd_access_log/rpc_apis"
  16. "gd_access_log/rpc_apis/gd_management"
  17. "os"
  18. "time"
  19. "gd_access_log/common.in/mq"
  20. "gd_access_log/common.in/storage"
  21. "gd_access_log/common.in/utils"
  22. "github.com/astaxie/beego/orm"
  23. "go.uber.org/zap"
  24. )
  25. const MaxTimes = 5
  26. // 发送消息到ods
  27. func SendOdsMsg(odsMsg *apis.OdsMessage) error {
  28. mqMsg, _ := json.Marshal(*odsMsg)
  29. err := mq.OdsMq.PublishMsg(mqMsg)
  30. if err != nil {
  31. // TODO 记录消息
  32. /*failMsg := &model.FailMsg{}
  33. failMsg.Msg = string(mqMsg)
  34. err = failMsg.Insert(clinit.DB())
  35. if err != nil {
  36. l.Error("mysql",
  37. zap.String("sql", "insert into t_gd_ods_fail_msg"),
  38. zap.String("fields", failMsg.Msg),
  39. zap.String("error", err.Error()))
  40. }*/
  41. return err
  42. }
  43. return nil
  44. }
  45. func timestampToDate(timestamp int64) string {
  46. if timestamp == 0 {
  47. return ""
  48. }
  49. return time.Unix(timestamp, 0).Format("2006-01-02")
  50. }
  51. func getMonthFromtimestamp(timestamp int64) int {
  52. return int(time.Unix(timestamp, 0).Month())
  53. }
  54. func writeAccessLogMonth(o orm.Ormer, accessLog *apis.AccessLog) (id int64, month int, err error) {
  55. month = getMonthFromtimestamp(accessLog.Timestamp)
  56. currentMonth := int(time.Now().Month())
  57. if month > 12 || month < 0 {
  58. month = currentMonth
  59. }
  60. accessLog.Id = 0
  61. switch month {
  62. case 1:
  63. id, err = o.Insert(&apis.TGdAccessLogMonth1{*accessLog})
  64. case 2:
  65. id, err = o.Insert(&apis.TGdAccessLogMonth2{*accessLog})
  66. case 3:
  67. id, err = o.Insert(&apis.TGdAccessLogMonth3{*accessLog})
  68. case 4:
  69. id, err = o.Insert(&apis.TGdAccessLogMonth4{*accessLog})
  70. case 5:
  71. id, err = o.Insert(&apis.TGdAccessLogMonth5{*accessLog})
  72. case 6:
  73. id, err = o.Insert(&apis.TGdAccessLogMonth6{*accessLog})
  74. case 7:
  75. id, err = o.Insert(&apis.TGdAccessLogMonth7{*accessLog})
  76. case 8:
  77. id, err = o.Insert(&apis.TGdAccessLogMonth8{*accessLog})
  78. case 9:
  79. id, err = o.Insert(&apis.TGdAccessLogMonth9{*accessLog})
  80. case 10:
  81. id, err = o.Insert(&apis.TGdAccessLogMonth10{*accessLog})
  82. case 11:
  83. id, err = o.Insert(&apis.TGdAccessLogMonth11{*accessLog})
  84. case 12:
  85. id, err = o.Insert(&apis.TGdAccessLogMonth12{*accessLog})
  86. }
  87. if err != nil {
  88. return id, month, errors.DataBaseError
  89. }
  90. return id, month, nil
  91. }
  92. func writeThirdAccessLogMonth(o orm.Ormer, accessId int64, month int, accessLog *apis.ThirdpartLog) (err error) {
  93. accessLog.AccessId = accessId
  94. accessLog.Id = 0
  95. switch month {
  96. case 1:
  97. _, err = o.Insert(&apis.TGdThirdpartLogMonth1{*accessLog})
  98. case 2:
  99. _, err = o.Insert(&apis.TGdThirdpartLogMonth2{*accessLog})
  100. case 3:
  101. _, err = o.Insert(&apis.TGdThirdpartLogMonth3{*accessLog})
  102. case 4:
  103. _, err = o.Insert(&apis.TGdThirdpartLogMonth4{*accessLog})
  104. case 5:
  105. _, err = o.Insert(&apis.TGdThirdpartLogMonth5{*accessLog})
  106. case 6:
  107. _, err = o.Insert(&apis.TGdThirdpartLogMonth6{*accessLog})
  108. case 7:
  109. _, err = o.Insert(&apis.TGdThirdpartLogMonth7{*accessLog})
  110. case 8:
  111. _, err = o.Insert(&apis.TGdThirdpartLogMonth8{*accessLog})
  112. case 9:
  113. _, err = o.Insert(&apis.TGdThirdpartLogMonth9{*accessLog})
  114. case 10:
  115. _, err = o.Insert(&apis.TGdThirdpartLogMonth10{*accessLog})
  116. case 11:
  117. _, err = o.Insert(&apis.TGdThirdpartLogMonth11{*accessLog})
  118. case 12:
  119. _, err = o.Insert(&apis.TGdThirdpartLogMonth12{*accessLog})
  120. }
  121. if err != nil {
  122. return errors.DataBaseError
  123. }
  124. return nil
  125. }
  126. func writeMonthLog(accessLog *apis.AccessLog, thirdLog []*apis.ThirdpartLog, o orm.Ormer) error {
  127. accessId, month, err := writeAccessLogMonth(o, accessLog)
  128. if err != nil {
  129. return err
  130. }
  131. for _, p := range thirdLog {
  132. if p.ProviderName != "本地数据" {
  133. err := writeThirdAccessLogMonth(o, accessId, month, p)
  134. if err != nil {
  135. return err
  136. }
  137. }
  138. }
  139. return nil
  140. }
  141. func IsReuse(LReq []apis.ThirdpartLogWrite) bool {
  142. if len(LReq) > 0 {
  143. for _, v := range LReq {
  144. if v.ProviderCode == "" {
  145. return false
  146. }
  147. }
  148. return true
  149. } else {
  150. return true
  151. }
  152. }
  153. func getSearch(m map[string]interface{}) string {
  154. if len(m) > 0 {
  155. if search, _ := m["plate_no"].(string); search != "" {
  156. return search
  157. } else if search, _ := m["vin"].(string); search != "" {
  158. return search
  159. } else if search, _ := m["idcard"].(string); search != "" {
  160. return search
  161. } else if search, _ := m["id_card"].(string); search != "" {
  162. return search
  163. } else if search, _ := m["degree_id"].(string); search != "" {
  164. return search
  165. } else if search, _ := m["education_id"].(string); search != "" {
  166. return search
  167. } else if search, _ := m["order_no"].(string); search != "" {
  168. return search
  169. } else {
  170. return ""
  171. }
  172. }
  173. return ""
  174. }
  175. func writeAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, o orm.Ormer) (int64, int64, *apis.AccessLog, error) {
  176. var accessLog apis.AccessLog
  177. var accessId int64
  178. var rpcCount int
  179. var dbCount int
  180. var historyId int64
  181. for {
  182. mReq := gd_management.ManagementGetNameReq{}
  183. mReq.MerchantId = req.MerchantId
  184. mReq.ApiId = req.ApiId
  185. mreply, err := rpc_apis.Management.ManagementGetName(ctx, &mReq)
  186. if err != nil {
  187. rpcCount++
  188. if rpcCount > MaxTimes {
  189. l.Error("func",
  190. zap.String("call", "ManagementGetName"),
  191. zap.String("args", utils.MarshalJsonString(mReq)),
  192. zap.String("error", err.Error()))
  193. return 0, 0, nil, err
  194. }
  195. time.Sleep(time.Second)
  196. continue
  197. }
  198. accessLog.MerchantDataApiId = req.MerchantDataApiId
  199. if accessLog.MerchantDataApiId == 0 {
  200. mreq := gd_management.ManagementGetMerchantApiIdReq{
  201. MerchantChildApiId: req.MerchantChildApiId,
  202. }
  203. reply, _ := rpc_apis.Management.ManagementGetMerchantApiId(ctx, &mreq)
  204. accessLog.MerchantDataApiId = reply.MerchantDataApiId
  205. }
  206. accessLog.RawCode = req.AccessLogWrite.Code
  207. accessLog.Code = req.AccessLogWrite.RespCode
  208. accessLog.Msg = req.AccessLogWrite.Msg
  209. accessLog.State = getState(accessLog.Code)
  210. accessLog.ApiId = req.ApiId
  211. accessLog.RemoteAddr = req.RemoteAddr
  212. accessLog.MerchantId = req.MerchantId
  213. accessLog.Elapsed = Decimal(req.AccessLogWrite.Elapsed)
  214. accessLog.OrderNo = req.OrderNo
  215. accessLog.IsReuse = IsReuse(req.ThirdpartLogWrites)
  216. /*if len(req.ThirdpartLogWrites) == 0 {
  217. accessLog.IsReuse = true
  218. } else {
  219. accessLog.IsReuse = req.AccessLogWrite.IsReuse
  220. }*/
  221. accessLog.UtSource = req.AccessLogWrite.UtSource
  222. accessLog.ProviderCount = len(req.ThirdpartLogWrites)
  223. m := map[string]interface{}{}
  224. json.Unmarshal([]byte(req.AccessLogWrite.RequestParams), &m)
  225. bytes := []byte(req.AccessLogWrite.RequestParams)
  226. if len(m) > 0 {
  227. accessLog.Search = getSearch(m)
  228. if accessLog.Search == "" {
  229. accessLog.Search = req.AccessLogWrite.Search
  230. }
  231. bytes, _ = json.Marshal(m)
  232. }
  233. m = nil
  234. // 如果入参为空,则使用出参作为关键字
  235. if accessLog.Search == "" {
  236. t := map[string]interface{}{}
  237. _ = json.Unmarshal([]byte(req.AccessLogWrite.RawResponseParams), &t)
  238. if len(t) > 0 {
  239. accessLog.Search = getSearch(t)
  240. }
  241. t = nil
  242. }
  243. // 原始请求参数search
  244. m1 := map[string]interface{}{}
  245. json.Unmarshal([]byte(req.AccessLogWrite.RawRequestParams), &m1)
  246. if len(m1) > 0 {
  247. accessLog.RawSearch = getSearch(m1)
  248. m1 = nil
  249. }
  250. accessLog.RequestParams = string(bytes)
  251. if len(req.AccessLogWrite.ResponseParams) < 1024*1024 {
  252. accessLog.ResponseParams = req.AccessLogWrite.ResponseParams
  253. } else {
  254. accessLog.ResponseParams = "数据量大于1M,未记录"
  255. }
  256. accessLog.RawRequestParams = req.AccessLogWrite.RawRequestParams
  257. if len(req.AccessLogWrite.RawResponseParams) < 1024*1024 {
  258. accessLog.RawResponseParams = req.AccessLogWrite.RawResponseParams
  259. } else {
  260. accessLog.RawResponseParams = "数据量大于1M,未记录"
  261. }
  262. accessLog.Timestamp = req.TimeStamp
  263. accessLog.ApiName = mreply.ApiName
  264. accessLog.MerchantName = mreply.MerchantName
  265. accessLog.CreateTime = timestampToDate(req.TimeStamp)
  266. if true {
  267. go warning.AccessLogWarn(&accessLog)
  268. return 0, 0, &accessLog, nil
  269. }
  270. for {
  271. accessId, err = o.Insert(&accessLog)
  272. if err != nil {
  273. dbCount++
  274. if dbCount > MaxTimes {
  275. l.Error("mysql",
  276. zap.String("sql", "insert access log"),
  277. zap.String("args", utils.MarshalJsonString(accessLog)),
  278. zap.String("error", err.Error()))
  279. return 0, 0, nil, err
  280. }
  281. time.Sleep(time.Second)
  282. continue
  283. }
  284. history := apis.TGdAccessLogHistory{}
  285. history.AccessLog = accessLog
  286. history.Id = 0
  287. historyId, err = o.Insert(&history)
  288. if err != nil {
  289. l.Error("mysql",
  290. zap.String("sql", "insert access log history"),
  291. zap.String("args", utils.MarshalJsonString(accessLog)),
  292. zap.String("error", err.Error()))
  293. return 0, 0, nil, err
  294. }
  295. go warning.AccessLogWarn(&accessLog)
  296. break
  297. }
  298. break
  299. }
  300. return accessId, historyId, &accessLog, nil
  301. }
  302. func writeThirdpartAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, id int64, historyId int64, merchantName, apiName string, o orm.Ormer) ([]*apis.ThirdpartLog, error) {
  303. var rpcCount int
  304. var dbCount int
  305. ret := []*apis.ThirdpartLog{}
  306. for index, _ := range req.ThirdpartLogWrites {
  307. for {
  308. var accessLog apis.ThirdpartLog
  309. mReq := gd_management.ManagementGetNameReq{}
  310. mReq.ProviderApiId = req.ThirdpartLogWrites[index].ProviderApiId
  311. mreply, err := rpc_apis.Management.ManagementGetName(ctx, &mReq)
  312. if err != nil {
  313. rpcCount++
  314. if rpcCount > MaxTimes {
  315. l.Error("func",
  316. zap.String("call", "ManagementGetName"),
  317. zap.String("args", utils.MarshalJsonString(mReq)),
  318. zap.String("error", err.Error()))
  319. return nil, err
  320. }
  321. time.Sleep(time.Second)
  322. continue
  323. }
  324. if req.ThirdpartLogWrites[index].Code == 20001 {
  325. accessLog.State = true
  326. } else {
  327. accessLog.State = req.ThirdpartLogWrites[index].State
  328. }
  329. accessLog.ProviderId = mreply.ProviderId
  330. accessLog.ProviderApiId = req.ThirdpartLogWrites[index].ProviderApiId
  331. accessLog.ApiId = req.ApiId
  332. accessLog.RemoteAddr = req.RemoteAddr
  333. if req.ThirdpartLogWrites[index].RawCode == "" {
  334. accessLog.RawCode = fmt.Sprintf("%d", req.ThirdpartLogWrites[index].Code)
  335. } else {
  336. accessLog.RawCode = fmt.Sprintf("%s", req.ThirdpartLogWrites[index].RawCode)
  337. }
  338. accessLog.Code = req.ThirdpartLogWrites[index].Code
  339. accessLog.Elapsed = Decimal(req.ThirdpartLogWrites[index].Elapsed)
  340. accessLog.MerchantId = req.MerchantId
  341. accessLog.Msg = req.ThirdpartLogWrites[index].Msg
  342. accessLog.RequestParams = req.ThirdpartLogWrites[index].RequestParams
  343. if len(req.ThirdpartLogWrites[index].ResponseParams) < 1024*1024 {
  344. accessLog.ResponseParams = req.ThirdpartLogWrites[index].ResponseParams
  345. } else {
  346. accessLog.ResponseParams = "数据量大于1M,未记录"
  347. }
  348. accessLog.ApiName = apiName
  349. accessLog.MerchantName = merchantName
  350. accessLog.ProviderApiName = mreply.ProviderApiName
  351. accessLog.ProviderName = mreply.ProviderName
  352. if req.ThirdpartLogWrites[index].Timestamp > 0 {
  353. accessLog.Timestamp = req.ThirdpartLogWrites[index].Timestamp
  354. } else {
  355. accessLog.Timestamp = req.TimeStamp
  356. }
  357. accessLog.RemoteAddr = req.RemoteAddr
  358. accessLog.AccessId = id
  359. accessLog.CreateTime = timestampToDate(accessLog.Timestamp)
  360. accessLog.Search = req.ThirdpartLogWrites[index].Search
  361. if true {
  362. // 发送mq消息
  363. if accessLog.ProviderName != "本地数据" && accessLog.State != false {
  364. odsMsg := &apis.OdsMessage{}
  365. odsMsg.MsgType = consts.ODSPROVIDERLOG
  366. odsMsg.SourceCode = mreply.ProviderApiCode
  367. content, _ := json.Marshal(accessLog)
  368. odsMsg.Content = string(content)
  369. go SendOdsMsg(odsMsg)
  370. }
  371. go warning.ThirdLogWarn(&accessLog)
  372. ret = append(ret, &accessLog)
  373. break
  374. }
  375. for {
  376. _, err = o.Insert(&accessLog)
  377. if err != nil {
  378. dbCount++
  379. if dbCount > MaxTimes {
  380. l.Error("mysql",
  381. zap.String("sql", "insert t_gd_thirdpart_access_log"),
  382. zap.String("args", utils.MarshalJsonString(accessLog)),
  383. zap.String("error", err.Error()))
  384. return nil, errors.DataBaseError
  385. }
  386. time.Sleep(time.Second)
  387. continue
  388. }
  389. history := apis.TGdThirdpartAccessLogHistory{}
  390. history.ThirdpartLog = accessLog
  391. history.AccessId = historyId
  392. history.Id = 0
  393. _, err = o.Insert(&history)
  394. if err != nil {
  395. l.Error("mysql",
  396. zap.String("sql", "insert t_gd_thirdpart_access_log_history"),
  397. zap.String("args", utils.MarshalJsonString(accessLog)),
  398. zap.String("error", err.Error()))
  399. return nil, errors.DataBaseError
  400. }
  401. go warning.ThirdLogWarn(&accessLog)
  402. ret = append(ret, &accessLog)
  403. break
  404. }
  405. break
  406. }
  407. }
  408. return ret, nil
  409. }
  410. func writeToLocal(req *apis.LogAddAccessLogReq) {
  411. bytes, _ := json.Marshal(req)
  412. f, err := os.OpenFile("/var/log/gd_access_log_req.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  413. if err != nil {
  414. return
  415. }
  416. defer f.Close()
  417. value := fmt.Sprintf("%s\n", string(bytes))
  418. f.Write([]byte(value))
  419. }
  420. func LogAddAccessLog(ctx context.Context, req *apis.LogAddAccessLogReq, reply *apis.LogAddAccessLogReply) error {
  421. task := func(o orm.Ormer) error {
  422. d, _ := json.Marshal(*req)
  423. fmt.Println(string(d))
  424. id, historyId, accessLog, err := writeAccessLog(ctx, req, o)
  425. if err != nil {
  426. l.Error("access log",
  427. zap.String("args", utils.MarshalJsonString(*req)),
  428. zap.String("err", err.Error()))
  429. writeToLocal(req)
  430. return err
  431. }
  432. thirdLog, err := writeThirdpartAccessLog(ctx, req, id, historyId, accessLog.MerchantName, accessLog.ApiName, o)
  433. if err != nil {
  434. l.Error("thirdparty access log",
  435. zap.String("args", utils.MarshalJsonString(*req)),
  436. zap.String("err", err.Error()))
  437. writeToLocal(req)
  438. return err
  439. }
  440. if err := writeMonthLog(accessLog, thirdLog, o); err != nil {
  441. l.Error("month log",
  442. zap.String("args", utils.MarshalJsonString(*req)),
  443. zap.String("err", err.Error()))
  444. return err
  445. }
  446. if err := writeLogDay(accessLog, thirdLog, o); err != nil {
  447. l.Error("day log",
  448. zap.String("args", utils.MarshalJsonString(*req)),
  449. zap.String("err", err.Error()))
  450. return err
  451. }
  452. return nil
  453. }
  454. tasks := []storage.DbaTasker{}
  455. tasks = append(tasks, storage.GenerateDbaTask(task))
  456. storage.ExecTrans(tasks...)
  457. return nil
  458. }