// Copyright 2019 getensh.com. All rights reserved. // Use of this source code is governed by getensh.com. // package impl implement all interfaces of micro service for vehicle package impl import ( "context" "encoding/json" "runtime" "sync" "time" "gd_vehicle/apis" "gd_vehicle/impl/thirdparty_impl/adm" "gd_vehicle/impl/vehicle" "gd_vehicle/common.in/cache" "gd_vehicle/common.in/jsonrpc2" "gd_vehicle/common.in/task" "gd_vehicle/common.in/utils" "go.uber.org/zap" ) // 具体实现 type Rcvr struct { } // 操作pm的公共锁 var mutex sync.Mutex var pm = map[string]*ParamReuse{} //短时复用说明: //1.相同接口相同参数,如果一次调用未完成,又发起另一次调用,则复用第一次调用的结果 //2.第一次是指全局的第一次(服务器集群内该接口的该类参数的第一次调用) //3.如果本次为第一次调用,需设置本地参数缓存(ParamReuse)和redis参数缓存(ParamReuse) //4.如果已有本地参数缓存,则等待结果 //5.如果没有本地参数缓存,且没有redis缓存,则本次为第一次,作步骤3操作 //6.如果没有本地参数缓存, 但已有redis缓存,则等待redis结果 //7.本地参数引用计数为0时,清除本地参数缓存 //8.redis参数引用计数为0时,清除redis缓存 type ParamReuse struct { //参数引用计数 Used int //响应结果 Res string //是否使用了redis Redis bool `json:"-"` //参数锁引用计数 Locked int `json:"-"` //参数锁 Mx sync.Mutex `json:"-"` } const ( ParamExist = 1 ParamExistInRedis = 2 ParamNotExist = 3 ) func getFuncName() string { funcPtr, _, _, _ := runtime.Caller(1) return runtime.FuncForPC(funcPtr).Name() } func paramLock(req string) { mutex.Lock() p, ok := pm[req] if ok == false { pm[req] = &ParamReuse{} p = pm[req] } p.Locked += 1 mutex.Unlock() p.Mx.Lock() } func paramUnlock(req string, clean bool) { mutex.Lock() defer mutex.Unlock() if p, ok := pm[req]; ok { defer p.Mx.Unlock() if p.Locked > 0 { p.Locked -= 1 } if clean == false { return } if p.Used > 0 { p.Used -= 1 } if p.Locked == 0 && p.Used == 0 { if p.Redis { cleanParamReqFromRedis(req) } delete(pm, req) if len(pm) == 0 { pm = map[string]*ParamReuse{} } } } } func setParamReq(req string) int { paramLock(req) defer paramUnlock(req, false) p, _ := pm[req] if p.Used == 0 { utils.Lock(req) defer utils.UnLock(req) s, _ := cache.Redis.Get(req) p.Used = 1 p.Redis = true if s == "" { pr := ParamReuse{ Used: 1, } bytes, _ := json.Marshal(pr) cache.Redis.SetEx(req, 60, string(bytes)) return ParamNotExist } pr := ParamReuse{} json.Unmarshal([]byte(s), &pr) pr.Used++ bytes, _ := json.Marshal(pr) cache.Redis.SetEx(req, 60, string(bytes)) return ParamExistInRedis } p.Used += 1 return ParamExist } func cleanParamReqFromRedis(req string) { utils.Lock(req) defer utils.UnLock(req) s, _ := cache.Redis.Get(req) if s == "" { return } pr := ParamReuse{} json.Unmarshal([]byte(s), &pr) pr.Used -= 1 if pr.Used == 0 { cache.Redis.Del(req) return } bytes, _ := json.Marshal(pr) cache.Redis.SetEx(req, 60, string(bytes)) } func cleanParamReq(req string) { paramLock(req) paramUnlock(req, true) } func setParamRes(req string, res string) { paramLock(req) defer paramUnlock(req, false) p, _ := pm[req] p.Res = res if p.Redis == false { return } setParamResToRedis(req, res) } func setParamResLocal(req string, res string) { paramLock(req) defer paramUnlock(req, false) p, _ := pm[req] p.Res = res } func setParamResToRedis(req string, res string) { utils.Lock(req) defer utils.UnLock(req) s, _ := cache.Redis.Get(req) if s == "" { return } pr := ParamReuse{} json.Unmarshal([]byte(s), &pr) pr.Res = res bytes, _ := json.Marshal(pr) cache.Redis.SetEx(req, 60, string(bytes)) } func getParamRes(req string) string { paramLock(req) defer paramUnlock(req, false) p, _ := pm[req] return p.Res } func getParamResFromRedis(req string) string { s, _ := cache.Redis.Get(req) if s == "" { return "" } pr := ParamReuse{} json.Unmarshal([]byte(s), &pr) return pr.Res } func setOrWaitParam(req string) string { exist := setParamReq(req) if exist == ParamNotExist { return "" } count := 0 if exist == ParamExist { for { res := getParamRes(req) if res != "" { return res } time.Sleep(50 * time.Millisecond) count++ if count >= 1200 { break } } } if exist == ParamExistInRedis { for { res := getParamResFromRedis(req) if res != "" { setParamResLocal(req, res) return res } time.Sleep(50 * time.Millisecond) count++ if count >= 1200 { break } } } return "" } func (c *Rcvr) Query(ctx context.Context, req *apis.CommonReq, reply *apis.CommonReply) error { if false { funcName := getFuncName() newReq := *req newReq.MerchantApiInfo = apis.MerchantApiInfo{} reqBytes, _ := json.Marshal(newReq) reqString := string(reqBytes) + funcName setOrWaitParam(reqString) defer cleanParamReq(reqString) defer func() { setParamRes(reqString, "-") }() } t1 := func() error { return vehicle.Query(ctx, req, reply) } err := task.Do(ctx, t1) if err != nil { var e jsonrpc2.Error merr := json.Unmarshal([]byte(err.Error()), &e) if merr != nil { reply.ErrCode = int(1001) reply.ErrMsg = "服务错误" } reply.ErrCode = e.Code reply.ErrMsg = e.Message } return nil } func SetLogger(logger *zap.Logger) { vehicle.SetLogger(logger) adm.SetLogger(logger) }