rcvr.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. // package impl implement all interfaces of micro service for vehicle
  4. package impl
  5. import (
  6. "context"
  7. "encoding/json"
  8. "runtime"
  9. "sync"
  10. "time"
  11. "gd_vehicle/apis"
  12. "gd_vehicle/impl/thirdparty_impl/adm"
  13. "gd_vehicle/impl/vehicle"
  14. "gd_vehicle/common.in/cache"
  15. "gd_vehicle/common.in/jsonrpc2"
  16. "gd_vehicle/common.in/task"
  17. "gd_vehicle/common.in/utils"
  18. "go.uber.org/zap"
  19. )
  20. // 具体实现
  21. type Rcvr struct {
  22. }
  23. // 操作pm的公共锁
  24. var mutex sync.Mutex
  25. var pm = map[string]*ParamReuse{}
  26. //短时复用说明:
  27. //1.相同接口相同参数,如果一次调用未完成,又发起另一次调用,则复用第一次调用的结果
  28. //2.第一次是指全局的第一次(服务器集群内该接口的该类参数的第一次调用)
  29. //3.如果本次为第一次调用,需设置本地参数缓存(ParamReuse)和redis参数缓存(ParamReuse)
  30. //4.如果已有本地参数缓存,则等待结果
  31. //5.如果没有本地参数缓存,且没有redis缓存,则本次为第一次,作步骤3操作
  32. //6.如果没有本地参数缓存, 但已有redis缓存,则等待redis结果
  33. //7.本地参数引用计数为0时,清除本地参数缓存
  34. //8.redis参数引用计数为0时,清除redis缓存
  35. type ParamReuse struct {
  36. //参数引用计数
  37. Used int
  38. //响应结果
  39. Res string
  40. //是否使用了redis
  41. Redis bool `json:"-"`
  42. //参数锁引用计数
  43. Locked int `json:"-"`
  44. //参数锁
  45. Mx sync.Mutex `json:"-"`
  46. }
  47. const (
  48. ParamExist = 1
  49. ParamExistInRedis = 2
  50. ParamNotExist = 3
  51. )
  52. func getFuncName() string {
  53. funcPtr, _, _, _ := runtime.Caller(1)
  54. return runtime.FuncForPC(funcPtr).Name()
  55. }
  56. func paramLock(req string) {
  57. mutex.Lock()
  58. p, ok := pm[req]
  59. if ok == false {
  60. pm[req] = &ParamReuse{}
  61. p = pm[req]
  62. }
  63. p.Locked += 1
  64. mutex.Unlock()
  65. p.Mx.Lock()
  66. }
  67. func paramUnlock(req string, clean bool) {
  68. mutex.Lock()
  69. defer mutex.Unlock()
  70. if p, ok := pm[req]; ok {
  71. defer p.Mx.Unlock()
  72. if p.Locked > 0 {
  73. p.Locked -= 1
  74. }
  75. if clean == false {
  76. return
  77. }
  78. if p.Used > 0 {
  79. p.Used -= 1
  80. }
  81. if p.Locked == 0 && p.Used == 0 {
  82. if p.Redis {
  83. cleanParamReqFromRedis(req)
  84. }
  85. delete(pm, req)
  86. if len(pm) == 0 {
  87. pm = map[string]*ParamReuse{}
  88. }
  89. }
  90. }
  91. }
  92. func setParamReq(req string) int {
  93. paramLock(req)
  94. defer paramUnlock(req, false)
  95. p, _ := pm[req]
  96. if p.Used == 0 {
  97. utils.Lock(req)
  98. defer utils.UnLock(req)
  99. s, _ := cache.Redis.Get(req)
  100. p.Used = 1
  101. p.Redis = true
  102. if s == "" {
  103. pr := ParamReuse{
  104. Used: 1,
  105. }
  106. bytes, _ := json.Marshal(pr)
  107. cache.Redis.SetEx(req, 60, string(bytes))
  108. return ParamNotExist
  109. }
  110. pr := ParamReuse{}
  111. json.Unmarshal([]byte(s), &pr)
  112. pr.Used++
  113. bytes, _ := json.Marshal(pr)
  114. cache.Redis.SetEx(req, 60, string(bytes))
  115. return ParamExistInRedis
  116. }
  117. p.Used += 1
  118. return ParamExist
  119. }
  120. func cleanParamReqFromRedis(req string) {
  121. utils.Lock(req)
  122. defer utils.UnLock(req)
  123. s, _ := cache.Redis.Get(req)
  124. if s == "" {
  125. return
  126. }
  127. pr := ParamReuse{}
  128. json.Unmarshal([]byte(s), &pr)
  129. pr.Used -= 1
  130. if pr.Used == 0 {
  131. cache.Redis.Del(req)
  132. return
  133. }
  134. bytes, _ := json.Marshal(pr)
  135. cache.Redis.SetEx(req, 60, string(bytes))
  136. }
  137. func cleanParamReq(req string) {
  138. paramLock(req)
  139. paramUnlock(req, true)
  140. }
  141. func setParamRes(req string, res string) {
  142. paramLock(req)
  143. defer paramUnlock(req, false)
  144. p, _ := pm[req]
  145. p.Res = res
  146. if p.Redis == false {
  147. return
  148. }
  149. setParamResToRedis(req, res)
  150. }
  151. func setParamResLocal(req string, res string) {
  152. paramLock(req)
  153. defer paramUnlock(req, false)
  154. p, _ := pm[req]
  155. p.Res = res
  156. }
  157. func setParamResToRedis(req string, res string) {
  158. utils.Lock(req)
  159. defer utils.UnLock(req)
  160. s, _ := cache.Redis.Get(req)
  161. if s == "" {
  162. return
  163. }
  164. pr := ParamReuse{}
  165. json.Unmarshal([]byte(s), &pr)
  166. pr.Res = res
  167. bytes, _ := json.Marshal(pr)
  168. cache.Redis.SetEx(req, 60, string(bytes))
  169. }
  170. func getParamRes(req string) string {
  171. paramLock(req)
  172. defer paramUnlock(req, false)
  173. p, _ := pm[req]
  174. return p.Res
  175. }
  176. func getParamResFromRedis(req string) string {
  177. s, _ := cache.Redis.Get(req)
  178. if s == "" {
  179. return ""
  180. }
  181. pr := ParamReuse{}
  182. json.Unmarshal([]byte(s), &pr)
  183. return pr.Res
  184. }
  185. func setOrWaitParam(req string) string {
  186. exist := setParamReq(req)
  187. if exist == ParamNotExist {
  188. return ""
  189. }
  190. count := 0
  191. if exist == ParamExist {
  192. for {
  193. res := getParamRes(req)
  194. if res != "" {
  195. return res
  196. }
  197. time.Sleep(50 * time.Millisecond)
  198. count++
  199. if count >= 1200 {
  200. break
  201. }
  202. }
  203. }
  204. if exist == ParamExistInRedis {
  205. for {
  206. res := getParamResFromRedis(req)
  207. if res != "" {
  208. setParamResLocal(req, res)
  209. return res
  210. }
  211. time.Sleep(50 * time.Millisecond)
  212. count++
  213. if count >= 1200 {
  214. break
  215. }
  216. }
  217. }
  218. return ""
  219. }
  220. func (c *Rcvr) Query(ctx context.Context, req *apis.CommonReq, reply *apis.CommonReply) error {
  221. if false {
  222. funcName := getFuncName()
  223. newReq := *req
  224. newReq.MerchantApiInfo = apis.MerchantApiInfo{}
  225. reqBytes, _ := json.Marshal(newReq)
  226. reqString := string(reqBytes) + funcName
  227. setOrWaitParam(reqString)
  228. defer cleanParamReq(reqString)
  229. defer func() {
  230. setParamRes(reqString, "-")
  231. }()
  232. }
  233. t1 := func() error {
  234. return vehicle.Query(ctx, req, reply)
  235. }
  236. err := task.Do(ctx, t1)
  237. if err != nil {
  238. var e jsonrpc2.Error
  239. merr := json.Unmarshal([]byte(err.Error()), &e)
  240. if merr != nil {
  241. reply.ErrCode = int(1001)
  242. reply.ErrMsg = "服务错误"
  243. }
  244. reply.ErrCode = e.Code
  245. reply.ErrMsg = e.Message
  246. }
  247. return nil
  248. }
  249. func SetLogger(logger *zap.Logger) {
  250. vehicle.SetLogger(logger)
  251. adm.SetLogger(logger)
  252. }