123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- // Copyright 2019 getensh.com. All rights reserved.
- // Use of this source code is governed by getensh.com.
- package utils
- import (
- "encoding/json"
- "fmt"
- "gd_vehicle/consts"
- "gd_vehicle/errors"
- "io/ioutil"
- "net/http"
- "strings"
- "sync"
- "time"
- "gd_vehicle/common.in/cache"
- "gd_vehicle/common.in/config"
- "github.com/astaxie/beego/orm"
- "github.com/afex/hystrix-go/hystrix"
- )
- const (
- HystrixClose = 0
- HystrixOpen = 1
- )
- type HystrixConfig struct {
- ProviderApiName string `json:"provider_api_name"`
- ProviderApiCode string `json:"provider_api_code"`
- MaxConcurrentRequests int `json:"max_concurrent_requests"`
- RequestVolumeThreshold int `json:"request_volume_threshold"`
- SleepWindow int `json:"sleep_window"`
- ErrorPercentThreshold int `json:"error_percent_threshold"`
- Period int `json:"period"` // 熔断统计周期
- HystrixStatus int `json:"hystrix_status"`
- IsOn int `json:"is_on"`
- LastSendTime int64 `json:"last_send_time"`
- Mutex sync.Mutex
- }
- var settingsMutex *sync.RWMutex
- var HystrixConfigMap map[string]*HystrixConfig
- var HystrixConfigStatusMap map[string]int
- // 构建熔断map
- func constructHystrixMap() map[string]*HystrixConfig {
- hystrixConfigMap := make(map[string]*HystrixConfig)
- hystrixConfigs := []HystrixConfig{}
- orm.NewOrm().Raw("select * from db_gd_management.t_gd_provider_api_hystrix where is_on=1").QueryRows(&hystrixConfigs)
- for index, _ := range hystrixConfigs {
- hystrixConfigMap[hystrixConfigs[index].ProviderApiCode] = &hystrixConfigs[index]
- /*if _, ok := HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode]; !ok {
- HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode] = 0
- }*/
- }
- return hystrixConfigMap
- }
- // 加载数据源熔断
- func ProviderApiHystrixLoad() error {
- //HystrixConfigStatusMap = make(map[string]int)
- HystrixConfigMap = constructHystrixMap()
- settingsMutex = &sync.RWMutex{}
- // 注册熔断
- for k, v := range HystrixConfigMap {
- fmt.Println("HystrixConfigMap:", v.ProviderApiCode, v.SleepWindow, v.RequestVolumeThreshold, v.ErrorPercentThreshold, v.Period)
- hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
- MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
- RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
- SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
- ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
- Period: v.Period,
- })
- }
- go SubscribeHystrix()
- //go HystrixTimer()
- return nil
- }
- func CheckProviderApiHystrix(cmd string) (*HystrixConfig, bool) {
- settingsMutex.RLock()
- defer settingsMutex.RUnlock()
- v, ok := HystrixConfigMap[cmd]
- return v, ok
- }
- func CheckHystrixStatus(hystrixConfig *HystrixConfig, err error, response string) {
- now := time.Now().UnixNano() / 1e6
- if hystrixConfig.HystrixStatus == HystrixOpen {
- // 熔断状态开启时,判断是否超过了窗口时间,没超过不发送任何通知
- if now < (hystrixConfig.LastSendTime + int64(hystrixConfig.SleepWindow)) {
- return
- }
- }
- if err == nil {
- // 如果已开启熔断,错误为空表示恢复
- if hystrixConfig.HystrixStatus == HystrixOpen {
- hystrixConfig.HystrixStatus = HystrixClose
- content := fmt.Sprintf("数据源%s(%s)已恢复(%s)",
- hystrixConfig.ProviderApiName,
- hystrixConfig.ProviderApiCode,
- time.Now().Format(consts.DaySecLayout))
- SendToDingTalk(content)
- //updateHystrixMap(hystrixConfig)
- }
- } else if err == errors.VendorError {
- // 如果是三方错误,并且熔断开启,表示尝试调用数据源,任然不通
- if hystrixConfig.HystrixStatus == HystrixOpen {
- content := fmt.Sprintf("数据源%s(%s)未恢复(%s),数据源响应:%s",
- hystrixConfig.ProviderApiName,
- hystrixConfig.ProviderApiCode,
- time.Now().Format(consts.DaySecLayout),
- response)
- SendToDingTalk(content)
- }
- } else {
- // 出现熔断错误,判断本地熔断状态,如果是关闭,将熔断状态设置为开启,并发送钉钉
- hystrixConfig.Mutex.Lock()
- defer hystrixConfig.Mutex.Unlock()
- if hystrixConfig.HystrixStatus == HystrixClose {
- hystrixConfig.HystrixStatus = HystrixOpen
- // xxx数据源请求失败超过xx%,触发熔断(时间段),数据源返回值
- content := fmt.Sprintf("数据源%s(%s)请求失败超过百分之%d,触发熔断(%s)",
- hystrixConfig.ProviderApiName,
- hystrixConfig.ProviderApiCode,
- hystrixConfig.ErrorPercentThreshold,
- time.Now().Format(consts.DaySecLayout))
- SendToDingTalk(content)
- hystrixConfig.LastSendTime = now
- }
- }
- }
- type Text struct {
- Content string `json:"content"`
- }
- type At struct {
- AtMobiles []string `json:"atMobiles"`
- IsAtAll bool `json:"isAtAll"`
- }
- type TextMsg struct {
- MsgType string `json:"msgtype"`
- Text Text `json:"text"`
- At At `json:"at"`
- }
- func SendToDingTalk(content string) {
- textMsg := TextMsg{}
- textMsg.MsgType = "text"
- textMsg.Text = Text{}
- textMsg.Text.Content = content
- webHook := config.Conf.ThirdPart.DingTalkWebhook
- contentByte, _ := json.Marshal(textMsg)
- req, err := http.NewRequest("POST", webHook, strings.NewReader(string(contentByte)))
- client := &http.Client{}
- req.Header.Set("Content-Type", "application/json")
- resp, err := client.Do(req)
- defer resp.Body.Close()
- if err != nil {
- fmt.Println("send error:", err)
- return
- }
- fmt.Println(resp.StatusCode)
- body, _ := ioutil.ReadAll(resp.Body)
- fmt.Println(string(body))
- }
- func ReloadHystrix() {
- hystrixConfigMapNew := constructHystrixMap()
- for k, v := range HystrixConfigMap {
- // 新的和老的都存在判断是否更新
- if value, ok := hystrixConfigMapNew[k]; ok {
- // 存在,判断是否更新
- if v.ErrorPercentThreshold != value.ErrorPercentThreshold ||
- v.MaxConcurrentRequests != value.MaxConcurrentRequests ||
- v.RequestVolumeThreshold != value.RequestVolumeThreshold ||
- v.SleepWindow != value.SleepWindow ||
- v.Period != value.Period {
- fmt.Println("update:", k, value.RequestVolumeThreshold, value.RequestVolumeThreshold, value.SleepWindow, value.Period)
- hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
- MaxConcurrentRequests: value.MaxConcurrentRequests, //最大并发请求数
- RequestVolumeThreshold: value.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
- SleepWindow: value.SleepWindow, //熔断发生后的等待恢复时间
- ErrorPercentThreshold: value.ErrorPercentThreshold, //失败占比
- Period: value.Period,
- })
- v.ErrorPercentThreshold = value.ErrorPercentThreshold
- v.MaxConcurrentRequests = value.MaxConcurrentRequests
- v.RequestVolumeThreshold = value.RequestVolumeThreshold
- v.SleepWindow = value.SleepWindow
- v.Period = value.Period
- }
- } else {
- // 新的不存在
- settingsMutex.Lock()
- delete(HystrixConfigMap, k)
- settingsMutex.Unlock()
- }
- }
- for k, v := range hystrixConfigMapNew {
- if _, ok := HystrixConfigMap[k]; !ok {
- hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
- MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
- RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
- SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
- ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
- Period: v.Period,
- })
- settingsMutex.Lock()
- HystrixConfigMap[k] = v
- settingsMutex.Unlock()
- }
- }
- }
- func SubscribeHystrix() {
- pubsub := cache.Redis.Subscribe(config.Conf.ThirdPart.HystrixPublishChannel)
- defer pubsub.Close()
- for msg := range pubsub.Channel() {
- if msg.Payload == "hystrix-update" {
- ReloadHystrix()
- }
- }
- }
|