etcd.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package parser
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "time"
  7. uuid "github.com/satori/go.uuid"
  8. clientv3 "go.etcd.io/etcd/client/v3"
  9. "go.etcd.io/etcd/client/v3/naming/endpoints"
  10. )
  11. var cli *clientv3.Client
  12. func GetEtcdClient() *clientv3.Client {
  13. return cli
  14. }
  15. func EctdRegister(etcdAddrs []string, serviceName,serviceAddr,prefix string) {
  16. var (
  17. err error
  18. )
  19. cli, err = clientv3.New(clientv3.Config{
  20. Endpoints: etcdAddrs,
  21. DialTimeout: 5 * time.Second,
  22. })
  23. if err != nil {
  24. panic(err)
  25. }
  26. // 注册续租
  27. err = relet(serviceName,
  28. serviceAddr,
  29. prefix,
  30. )
  31. if err != nil {
  32. panic(err)
  33. }
  34. }
  35. func getValue(addr string) string {
  36. return `{"Addr:":"http://` + addr + `"}`
  37. }
  38. // 续租
  39. func relet(serviceName, serviceAddr, prefix string) error {
  40. var ctx = context.Background()
  41. // 创建一个租约
  42. lease := clientv3.NewLease(cli)
  43. cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3)
  44. defer cancel()
  45. leaseResp, err := lease.Grant(cancelCtx, 3)
  46. if err != nil {
  47. return err
  48. }
  49. // 长链接
  50. leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID)
  51. if err != nil {
  52. return err
  53. }
  54. em, err := endpoints.NewManager(cli, prefix)
  55. if err != nil {
  56. return err
  57. }
  58. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  59. defer cancel()
  60. uuid, _ := uuid.NewV4()
  61. if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.String()), endpoints.Endpoint{
  62. Addr: serviceAddr,
  63. }, clientv3.WithLease(leaseResp.ID)); err != nil {
  64. return err
  65. }
  66. log.Println("Register etcd success")
  67. del := func() {
  68. log.Println("Register close")
  69. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  70. defer cancel()
  71. em.DeleteEndpoint(cancelCtx, serviceName)
  72. lease.Close()
  73. }
  74. // 保持注册状态(连接断开重连)
  75. keepRegister(ctx, leaseChannel, del, serviceName, serviceAddr, prefix)
  76. return nil
  77. }
  78. func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, serviceAddr, prefix string) {
  79. go func() {
  80. failedCount := 0
  81. for {
  82. select {
  83. case resp := <-leaseChannel:
  84. if resp != nil {
  85. //log.Println("keep alive success.")
  86. } else {
  87. log.Println("keep alive failed.")
  88. failedCount++
  89. for failedCount > 3 {
  90. cleanFunc()
  91. if err := relet(serviceName, serviceAddr, prefix); err != nil {
  92. time.Sleep(time.Second)
  93. continue
  94. }
  95. return
  96. }
  97. continue
  98. }
  99. case <-ctx.Done():
  100. cleanFunc()
  101. cli.Close()
  102. return
  103. }
  104. }
  105. }()
  106. }
  107. func UnRegisterEtcd(serviceName, serviceAddr string) {
  108. }