123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- // package impl implement all interfaces of micro service for vehicle
- package impl
- import (
- "context"
- "encoding/json"
- "runtime"
- "sync"
- "time"
- "gd_vehicle/apis"
- "gd_vehicle/impl/thirdparty_impl/adm"
- "gd_vehicle/impl/vehicle"
- "gd_vehicle/common.in/cache"
- "gd_vehicle/common.in/jsonrpc2"
- "gd_vehicle/common.in/task"
- "gd_vehicle/common.in/utils"
- "go.uber.org/zap"
- )
- // 具体实现
- type Rcvr struct {
- }
- // 操作pm的公共锁
- var mutex sync.Mutex
- var pm = map[string]*ParamReuse{}
- //短时复用说明:
- //1.相同接口相同参数,如果一次调用未完成,又发起另一次调用,则复用第一次调用的结果
- //2.第一次是指全局的第一次(服务器集群内该接口的该类参数的第一次调用)
- //3.如果本次为第一次调用,需设置本地参数缓存(ParamReuse)和redis参数缓存(ParamReuse)
- //4.如果已有本地参数缓存,则等待结果
- //5.如果没有本地参数缓存,且没有redis缓存,则本次为第一次,作步骤3操作
- //6.如果没有本地参数缓存, 但已有redis缓存,则等待redis结果
- //7.本地参数引用计数为0时,清除本地参数缓存
- //8.redis参数引用计数为0时,清除redis缓存
- type ParamReuse struct {
- //参数引用计数
- Used int
- //响应结果
- Res string
- //是否使用了redis
- Redis bool `json:"-"`
- //参数锁引用计数
- Locked int `json:"-"`
- //参数锁
- Mx sync.Mutex `json:"-"`
- }
- const (
- ParamExist = 1
- ParamExistInRedis = 2
- ParamNotExist = 3
- )
- func getFuncName() string {
- funcPtr, _, _, _ := runtime.Caller(1)
- return runtime.FuncForPC(funcPtr).Name()
- }
- func paramLock(req string) {
- mutex.Lock()
- p, ok := pm[req]
- if ok == false {
- pm[req] = &ParamReuse{}
- p = pm[req]
- }
- p.Locked += 1
- mutex.Unlock()
- p.Mx.Lock()
- }
- func paramUnlock(req string, clean bool) {
- mutex.Lock()
- defer mutex.Unlock()
- if p, ok := pm[req]; ok {
- defer p.Mx.Unlock()
- if p.Locked > 0 {
- p.Locked -= 1
- }
- if clean == false {
- return
- }
- if p.Used > 0 {
- p.Used -= 1
- }
- if p.Locked == 0 && p.Used == 0 {
- if p.Redis {
- cleanParamReqFromRedis(req)
- }
- delete(pm, req)
- if len(pm) == 0 {
- pm = map[string]*ParamReuse{}
- }
- }
- }
- }
- func setParamReq(req string) int {
- paramLock(req)
- defer paramUnlock(req, false)
- p, _ := pm[req]
- if p.Used == 0 {
- utils.Lock(req)
- defer utils.UnLock(req)
- s, _ := cache.Redis.Get(req)
- p.Used = 1
- p.Redis = true
- if s == "" {
- pr := ParamReuse{
- Used: 1,
- }
- bytes, _ := json.Marshal(pr)
- cache.Redis.SetEx(req, 60, string(bytes))
- return ParamNotExist
- }
- pr := ParamReuse{}
- json.Unmarshal([]byte(s), &pr)
- pr.Used++
- bytes, _ := json.Marshal(pr)
- cache.Redis.SetEx(req, 60, string(bytes))
- return ParamExistInRedis
- }
- p.Used += 1
- return ParamExist
- }
- func cleanParamReqFromRedis(req string) {
- utils.Lock(req)
- defer utils.UnLock(req)
- s, _ := cache.Redis.Get(req)
- if s == "" {
- return
- }
- pr := ParamReuse{}
- json.Unmarshal([]byte(s), &pr)
- pr.Used -= 1
- if pr.Used == 0 {
- cache.Redis.Del(req)
- return
- }
- bytes, _ := json.Marshal(pr)
- cache.Redis.SetEx(req, 60, string(bytes))
- }
- func cleanParamReq(req string) {
- paramLock(req)
- paramUnlock(req, true)
- }
- func setParamRes(req string, res string) {
- paramLock(req)
- defer paramUnlock(req, false)
- p, _ := pm[req]
- p.Res = res
- if p.Redis == false {
- return
- }
- setParamResToRedis(req, res)
- }
- func setParamResLocal(req string, res string) {
- paramLock(req)
- defer paramUnlock(req, false)
- p, _ := pm[req]
- p.Res = res
- }
- func setParamResToRedis(req string, res string) {
- utils.Lock(req)
- defer utils.UnLock(req)
- s, _ := cache.Redis.Get(req)
- if s == "" {
- return
- }
- pr := ParamReuse{}
- json.Unmarshal([]byte(s), &pr)
- pr.Res = res
- bytes, _ := json.Marshal(pr)
- cache.Redis.SetEx(req, 60, string(bytes))
- }
- func getParamRes(req string) string {
- paramLock(req)
- defer paramUnlock(req, false)
- p, _ := pm[req]
- return p.Res
- }
- func getParamResFromRedis(req string) string {
- s, _ := cache.Redis.Get(req)
- if s == "" {
- return ""
- }
- pr := ParamReuse{}
- json.Unmarshal([]byte(s), &pr)
- return pr.Res
- }
- func setOrWaitParam(req string) string {
- exist := setParamReq(req)
- if exist == ParamNotExist {
- return ""
- }
- count := 0
- if exist == ParamExist {
- for {
- res := getParamRes(req)
- if res != "" {
- return res
- }
- time.Sleep(50 * time.Millisecond)
- count++
- if count >= 1200 {
- break
- }
- }
- }
- if exist == ParamExistInRedis {
- for {
- res := getParamResFromRedis(req)
- if res != "" {
- setParamResLocal(req, res)
- return res
- }
- time.Sleep(50 * time.Millisecond)
- count++
- if count >= 1200 {
- break
- }
- }
- }
- return ""
- }
- func (c *Rcvr) Query(ctx context.Context, req *apis.CommonReq, reply *apis.CommonReply) error {
- if false {
- funcName := getFuncName()
- newReq := *req
- newReq.MerchantApiInfo = apis.MerchantApiInfo{}
- reqBytes, _ := json.Marshal(newReq)
- reqString := string(reqBytes) + funcName
- setOrWaitParam(reqString)
- defer cleanParamReq(reqString)
- defer func() {
- setParamRes(reqString, "-")
- }()
- }
- t1 := func() error {
- return vehicle.Query(ctx, req, reply)
- }
- err := task.Do(ctx, t1)
- if err != nil {
- var e jsonrpc2.Error
- merr := json.Unmarshal([]byte(err.Error()), &e)
- if merr != nil {
- reply.ErrCode = int(1001)
- reply.ErrMsg = "服务错误"
- }
- reply.ErrCode = e.Code
- reply.ErrMsg = e.Message
- }
- return nil
- }
- func SetLogger(logger *zap.Logger) {
- vehicle.SetLogger(logger)
- adm.SetLogger(logger)
- }
|