hystrix_config.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright 2019 getensh.com. All rights reserved.
  2. // Use of this source code is governed by getensh.com.
  3. package utils
  4. import (
  5. "encoding/json"
  6. "fmt"
  7. "gd_vehicle/consts"
  8. "gd_vehicle/errors"
  9. "io/ioutil"
  10. "net/http"
  11. "strings"
  12. "sync"
  13. "time"
  14. "gd_vehicle/common.in/cache"
  15. "gd_vehicle/common.in/config"
  16. "github.com/astaxie/beego/orm"
  17. "github.com/afex/hystrix-go/hystrix"
  18. )
  19. const (
  20. HystrixClose = 0
  21. HystrixOpen = 1
  22. )
  23. type HystrixConfig struct {
  24. ProviderApiName string `json:"provider_api_name"`
  25. ProviderApiCode string `json:"provider_api_code"`
  26. MaxConcurrentRequests int `json:"max_concurrent_requests"`
  27. RequestVolumeThreshold int `json:"request_volume_threshold"`
  28. SleepWindow int `json:"sleep_window"`
  29. ErrorPercentThreshold int `json:"error_percent_threshold"`
  30. Period int `json:"period"` // 熔断统计周期
  31. HystrixStatus int `json:"hystrix_status"`
  32. IsOn int `json:"is_on"`
  33. LastSendTime int64 `json:"last_send_time"`
  34. Mutex sync.Mutex
  35. }
  36. var settingsMutex *sync.RWMutex
  37. var HystrixConfigMap map[string]*HystrixConfig
  38. var HystrixConfigStatusMap map[string]int
  39. // 构建熔断map
  40. func constructHystrixMap() map[string]*HystrixConfig {
  41. hystrixConfigMap := make(map[string]*HystrixConfig)
  42. hystrixConfigs := []HystrixConfig{}
  43. orm.NewOrm().Raw("select * from db_gd_management.t_gd_provider_api_hystrix where is_on=1").QueryRows(&hystrixConfigs)
  44. for index, _ := range hystrixConfigs {
  45. hystrixConfigMap[hystrixConfigs[index].ProviderApiCode] = &hystrixConfigs[index]
  46. /*if _, ok := HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode]; !ok {
  47. HystrixConfigStatusMap[hystrixConfigs[index].ProviderApiCode] = 0
  48. }*/
  49. }
  50. return hystrixConfigMap
  51. }
  52. // 加载数据源熔断
  53. func ProviderApiHystrixLoad() error {
  54. //HystrixConfigStatusMap = make(map[string]int)
  55. HystrixConfigMap = constructHystrixMap()
  56. settingsMutex = &sync.RWMutex{}
  57. // 注册熔断
  58. for k, v := range HystrixConfigMap {
  59. fmt.Println("HystrixConfigMap:", v.ProviderApiCode, v.SleepWindow, v.RequestVolumeThreshold, v.ErrorPercentThreshold, v.Period)
  60. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  61. MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
  62. RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  63. SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
  64. ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
  65. Period: v.Period,
  66. })
  67. }
  68. go SubscribeHystrix()
  69. //go HystrixTimer()
  70. return nil
  71. }
  72. func CheckProviderApiHystrix(cmd string) (*HystrixConfig, bool) {
  73. settingsMutex.RLock()
  74. defer settingsMutex.RUnlock()
  75. v, ok := HystrixConfigMap[cmd]
  76. return v, ok
  77. }
  78. func CheckHystrixStatus(hystrixConfig *HystrixConfig, err error, response string) {
  79. now := time.Now().UnixNano() / 1e6
  80. if hystrixConfig.HystrixStatus == HystrixOpen {
  81. // 熔断状态开启时,判断是否超过了窗口时间,没超过不发送任何通知
  82. if now < (hystrixConfig.LastSendTime + int64(hystrixConfig.SleepWindow)) {
  83. return
  84. }
  85. }
  86. if err == nil {
  87. // 如果已开启熔断,错误为空表示恢复
  88. if hystrixConfig.HystrixStatus == HystrixOpen {
  89. hystrixConfig.HystrixStatus = HystrixClose
  90. content := fmt.Sprintf("数据源%s(%s)已恢复(%s)",
  91. hystrixConfig.ProviderApiName,
  92. hystrixConfig.ProviderApiCode,
  93. time.Now().Format(consts.DaySecLayout))
  94. SendToDingTalk(content)
  95. //updateHystrixMap(hystrixConfig)
  96. }
  97. } else if err == errors.VendorError {
  98. // 如果是三方错误,并且熔断开启,表示尝试调用数据源,任然不通
  99. if hystrixConfig.HystrixStatus == HystrixOpen {
  100. content := fmt.Sprintf("数据源%s(%s)未恢复(%s),数据源响应:%s",
  101. hystrixConfig.ProviderApiName,
  102. hystrixConfig.ProviderApiCode,
  103. time.Now().Format(consts.DaySecLayout),
  104. response)
  105. SendToDingTalk(content)
  106. }
  107. } else {
  108. // 出现熔断错误,判断本地熔断状态,如果是关闭,将熔断状态设置为开启,并发送钉钉
  109. hystrixConfig.Mutex.Lock()
  110. defer hystrixConfig.Mutex.Unlock()
  111. if hystrixConfig.HystrixStatus == HystrixClose {
  112. hystrixConfig.HystrixStatus = HystrixOpen
  113. // xxx数据源请求失败超过xx%,触发熔断(时间段),数据源返回值
  114. content := fmt.Sprintf("数据源%s(%s)请求失败超过百分之%d,触发熔断(%s)",
  115. hystrixConfig.ProviderApiName,
  116. hystrixConfig.ProviderApiCode,
  117. hystrixConfig.ErrorPercentThreshold,
  118. time.Now().Format(consts.DaySecLayout))
  119. SendToDingTalk(content)
  120. hystrixConfig.LastSendTime = now
  121. }
  122. }
  123. }
  124. type Text struct {
  125. Content string `json:"content"`
  126. }
  127. type At struct {
  128. AtMobiles []string `json:"atMobiles"`
  129. IsAtAll bool `json:"isAtAll"`
  130. }
  131. type TextMsg struct {
  132. MsgType string `json:"msgtype"`
  133. Text Text `json:"text"`
  134. At At `json:"at"`
  135. }
  136. func SendToDingTalk(content string) {
  137. textMsg := TextMsg{}
  138. textMsg.MsgType = "text"
  139. textMsg.Text = Text{}
  140. textMsg.Text.Content = content
  141. webHook := config.Conf.ThirdPart.DingTalkWebhook
  142. contentByte, _ := json.Marshal(textMsg)
  143. req, err := http.NewRequest("POST", webHook, strings.NewReader(string(contentByte)))
  144. client := &http.Client{}
  145. req.Header.Set("Content-Type", "application/json")
  146. resp, err := client.Do(req)
  147. defer resp.Body.Close()
  148. if err != nil {
  149. fmt.Println("send error:", err)
  150. return
  151. }
  152. fmt.Println(resp.StatusCode)
  153. body, _ := ioutil.ReadAll(resp.Body)
  154. fmt.Println(string(body))
  155. }
  156. func ReloadHystrix() {
  157. hystrixConfigMapNew := constructHystrixMap()
  158. for k, v := range HystrixConfigMap {
  159. // 新的和老的都存在判断是否更新
  160. if value, ok := hystrixConfigMapNew[k]; ok {
  161. // 存在,判断是否更新
  162. if v.ErrorPercentThreshold != value.ErrorPercentThreshold ||
  163. v.MaxConcurrentRequests != value.MaxConcurrentRequests ||
  164. v.RequestVolumeThreshold != value.RequestVolumeThreshold ||
  165. v.SleepWindow != value.SleepWindow ||
  166. v.Period != value.Period {
  167. fmt.Println("update:", k, value.RequestVolumeThreshold, value.RequestVolumeThreshold, value.SleepWindow, value.Period)
  168. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  169. MaxConcurrentRequests: value.MaxConcurrentRequests, //最大并发请求数
  170. RequestVolumeThreshold: value.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  171. SleepWindow: value.SleepWindow, //熔断发生后的等待恢复时间
  172. ErrorPercentThreshold: value.ErrorPercentThreshold, //失败占比
  173. Period: value.Period,
  174. })
  175. v.ErrorPercentThreshold = value.ErrorPercentThreshold
  176. v.MaxConcurrentRequests = value.MaxConcurrentRequests
  177. v.RequestVolumeThreshold = value.RequestVolumeThreshold
  178. v.SleepWindow = value.SleepWindow
  179. v.Period = value.Period
  180. }
  181. } else {
  182. // 新的不存在
  183. settingsMutex.Lock()
  184. delete(HystrixConfigMap, k)
  185. settingsMutex.Unlock()
  186. }
  187. }
  188. for k, v := range hystrixConfigMapNew {
  189. if _, ok := HystrixConfigMap[k]; !ok {
  190. hystrix.ConfigureCommand(k, hystrix.CommandConfig{Timeout: int(20000000), //cmd的超时时间,一旦超时则返回失败
  191. MaxConcurrentRequests: v.MaxConcurrentRequests, //最大并发请求数
  192. RequestVolumeThreshold: v.RequestVolumeThreshold, //一个统计窗口10(修改为60s了)秒内请求数量。达到这个请求数量后才去判断是否要开启熔断
  193. SleepWindow: v.SleepWindow, //熔断发生后的等待恢复时间
  194. ErrorPercentThreshold: v.ErrorPercentThreshold, //失败占比
  195. Period: v.Period,
  196. })
  197. settingsMutex.Lock()
  198. HystrixConfigMap[k] = v
  199. settingsMutex.Unlock()
  200. }
  201. }
  202. }
  203. func SubscribeHystrix() {
  204. pubsub := cache.Redis.Subscribe(config.Conf.ThirdPart.HystrixPublishChannel)
  205. defer pubsub.Close()
  206. for msg := range pubsub.Channel() {
  207. if msg.Payload == "hystrix-update" {
  208. ReloadHystrix()
  209. }
  210. }
  211. }