sentinel.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package redis
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "strings"
  7. "sync"
  8. "time"
  9. "gopkg.in/redis.v5/internal"
  10. "gopkg.in/redis.v5/internal/pool"
  11. )
  12. //------------------------------------------------------------------------------
  13. // FailoverOptions are used to configure a failover client and should
  14. // be passed to NewFailoverClient.
  15. type FailoverOptions struct {
  16. // The master name.
  17. MasterName string
  18. // A seed list of host:port addresses of sentinel nodes.
  19. SentinelAddrs []string
  20. // Following options are copied from Options struct.
  21. Password string
  22. DB int
  23. MaxRetries int
  24. DialTimeout time.Duration
  25. ReadTimeout time.Duration
  26. WriteTimeout time.Duration
  27. PoolSize int
  28. PoolTimeout time.Duration
  29. IdleTimeout time.Duration
  30. IdleCheckFrequency time.Duration
  31. }
  32. func (opt *FailoverOptions) options() *Options {
  33. return &Options{
  34. Addr: "FailoverClient",
  35. DB: opt.DB,
  36. Password: opt.Password,
  37. MaxRetries: opt.MaxRetries,
  38. DialTimeout: opt.DialTimeout,
  39. ReadTimeout: opt.ReadTimeout,
  40. WriteTimeout: opt.WriteTimeout,
  41. PoolSize: opt.PoolSize,
  42. PoolTimeout: opt.PoolTimeout,
  43. IdleTimeout: opt.IdleTimeout,
  44. IdleCheckFrequency: opt.IdleCheckFrequency,
  45. }
  46. }
  47. // NewFailoverClient returns a Redis client that uses Redis Sentinel
  48. // for automatic failover. It's safe for concurrent use by multiple
  49. // goroutines.
  50. func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
  51. opt := failoverOpt.options()
  52. opt.init()
  53. failover := &sentinelFailover{
  54. masterName: failoverOpt.MasterName,
  55. sentinelAddrs: failoverOpt.SentinelAddrs,
  56. opt: opt,
  57. }
  58. client := Client{
  59. baseClient: baseClient{
  60. opt: opt,
  61. connPool: failover.Pool(),
  62. onClose: func() error {
  63. return failover.Close()
  64. },
  65. },
  66. }
  67. client.cmdable.process = client.Process
  68. return &client
  69. }
  70. //------------------------------------------------------------------------------
  71. type sentinelClient struct {
  72. cmdable
  73. baseClient
  74. }
  75. func newSentinel(opt *Options) *sentinelClient {
  76. opt.init()
  77. client := sentinelClient{
  78. baseClient: baseClient{
  79. opt: opt,
  80. connPool: newConnPool(opt),
  81. },
  82. }
  83. client.cmdable = cmdable{client.Process}
  84. return &client
  85. }
  86. func (c *sentinelClient) PubSub() *PubSub {
  87. return &PubSub{
  88. base: baseClient{
  89. opt: c.opt,
  90. connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
  91. },
  92. }
  93. }
  94. func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
  95. cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
  96. c.Process(cmd)
  97. return cmd
  98. }
  99. func (c *sentinelClient) Sentinels(name string) *SliceCmd {
  100. cmd := NewSliceCmd("SENTINEL", "sentinels", name)
  101. c.Process(cmd)
  102. return cmd
  103. }
  104. type sentinelFailover struct {
  105. masterName string
  106. sentinelAddrs []string
  107. opt *Options
  108. pool *pool.ConnPool
  109. poolOnce sync.Once
  110. mu sync.RWMutex
  111. sentinel *sentinelClient
  112. }
  113. func (d *sentinelFailover) Close() error {
  114. return d.resetSentinel()
  115. }
  116. func (d *sentinelFailover) dial() (net.Conn, error) {
  117. addr, err := d.MasterAddr()
  118. if err != nil {
  119. return nil, err
  120. }
  121. return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
  122. }
  123. func (d *sentinelFailover) Pool() *pool.ConnPool {
  124. d.poolOnce.Do(func() {
  125. d.opt.Dialer = d.dial
  126. d.pool = newConnPool(d.opt)
  127. })
  128. return d.pool
  129. }
  130. func (d *sentinelFailover) MasterAddr() (string, error) {
  131. d.mu.Lock()
  132. defer d.mu.Unlock()
  133. // Try last working sentinel.
  134. if d.sentinel != nil {
  135. addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
  136. if err != nil {
  137. internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
  138. d._resetSentinel()
  139. } else {
  140. addr := net.JoinHostPort(addr[0], addr[1])
  141. internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
  142. return addr, nil
  143. }
  144. }
  145. for i, sentinelAddr := range d.sentinelAddrs {
  146. sentinel := newSentinel(&Options{
  147. Addr: sentinelAddr,
  148. DialTimeout: d.opt.DialTimeout,
  149. ReadTimeout: d.opt.ReadTimeout,
  150. WriteTimeout: d.opt.WriteTimeout,
  151. PoolSize: d.opt.PoolSize,
  152. PoolTimeout: d.opt.PoolTimeout,
  153. IdleTimeout: d.opt.IdleTimeout,
  154. })
  155. masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
  156. if err != nil {
  157. internal.Logf("sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)
  158. sentinel.Close()
  159. continue
  160. }
  161. // Push working sentinel to the top.
  162. d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
  163. d.setSentinel(sentinel)
  164. addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
  165. internal.Logf("sentinel: %q addr is %s", d.masterName, addr)
  166. return addr, nil
  167. }
  168. return "", errors.New("redis: all sentinels are unreachable")
  169. }
  170. func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {
  171. d.discoverSentinels(sentinel)
  172. d.sentinel = sentinel
  173. go d.listen(sentinel)
  174. }
  175. func (d *sentinelFailover) resetSentinel() error {
  176. d.mu.Lock()
  177. err := d._resetSentinel()
  178. d.mu.Unlock()
  179. return err
  180. }
  181. func (d *sentinelFailover) _resetSentinel() error {
  182. var err error
  183. if d.sentinel != nil {
  184. err = d.sentinel.Close()
  185. d.sentinel = nil
  186. }
  187. return err
  188. }
  189. func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {
  190. sentinels, err := sentinel.Sentinels(d.masterName).Result()
  191. if err != nil {
  192. internal.Logf("sentinel: Sentinels %q failed: %s", d.masterName, err)
  193. return
  194. }
  195. for _, sentinel := range sentinels {
  196. vals := sentinel.([]interface{})
  197. for i := 0; i < len(vals); i += 2 {
  198. key := vals[i].(string)
  199. if key == "name" {
  200. sentinelAddr := vals[i+1].(string)
  201. if !contains(d.sentinelAddrs, sentinelAddr) {
  202. internal.Logf(
  203. "sentinel: discovered new %q sentinel: %s",
  204. d.masterName, sentinelAddr,
  205. )
  206. d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
  207. }
  208. }
  209. }
  210. }
  211. }
  212. // closeOldConns closes connections to the old master after failover switch.
  213. func (d *sentinelFailover) closeOldConns(newMaster string) {
  214. // Good connections that should be put back to the pool. They
  215. // can't be put immediately, because pool.PopFree will return them
  216. // again on next iteration.
  217. cnsToPut := make([]*pool.Conn, 0)
  218. for {
  219. cn := d.pool.PopFree()
  220. if cn == nil {
  221. break
  222. }
  223. if cn.RemoteAddr().String() != newMaster {
  224. err := fmt.Errorf(
  225. "sentinel: closing connection to the old master %s",
  226. cn.RemoteAddr(),
  227. )
  228. internal.Logf(err.Error())
  229. d.pool.Remove(cn, err)
  230. } else {
  231. cnsToPut = append(cnsToPut, cn)
  232. }
  233. }
  234. for _, cn := range cnsToPut {
  235. d.pool.Put(cn)
  236. }
  237. }
  238. func (d *sentinelFailover) listen(sentinel *sentinelClient) {
  239. var pubsub *PubSub
  240. for {
  241. if pubsub == nil {
  242. pubsub = sentinel.PubSub()
  243. if err := pubsub.Subscribe("+switch-master"); err != nil {
  244. internal.Logf("sentinel: Subscribe failed: %s", err)
  245. pubsub.Close()
  246. d.resetSentinel()
  247. return
  248. }
  249. }
  250. msg, err := pubsub.ReceiveMessage()
  251. if err != nil {
  252. internal.Logf("sentinel: ReceiveMessage failed: %s", err)
  253. pubsub.Close()
  254. d.resetSentinel()
  255. return
  256. }
  257. switch msg.Channel {
  258. case "+switch-master":
  259. parts := strings.Split(msg.Payload, " ")
  260. if parts[0] != d.masterName {
  261. internal.Logf("sentinel: ignore new %s addr", parts[0])
  262. continue
  263. }
  264. addr := net.JoinHostPort(parts[3], parts[4])
  265. internal.Logf(
  266. "sentinel: new %q addr is %s",
  267. d.masterName, addr,
  268. )
  269. d.closeOldConns(addr)
  270. }
  271. }
  272. }
  273. func contains(slice []string, str string) bool {
  274. for _, s := range slice {
  275. if s == str {
  276. return true
  277. }
  278. }
  279. return false
  280. }