etcd.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "strings"
  7. "time"
  8. "adm-vehicle-style/parser"
  9. uuid "github.com/satori/go.uuid"
  10. clientv3 "go.etcd.io/etcd/client/v3"
  11. "go.etcd.io/etcd/client/v3/naming/endpoints"
  12. )
  13. var cli *clientv3.Client
  14. func Get() *clientv3.Client {
  15. return cli
  16. }
  17. func Init(addr, serviceIp, servicePort string) {
  18. var (
  19. err error
  20. )
  21. cli, err = clientv3.New(clientv3.Config{
  22. Endpoints: strings.Split(addr, ","),
  23. DialTimeout: 5 * time.Second,
  24. })
  25. if err != nil {
  26. panic(err)
  27. }
  28. // 注册续租
  29. _ = relet(parser.Conf.Rpc.ADMVehicleStyle.ServiceName,
  30. serviceIp+":"+servicePort,
  31. parser.Conf.Rpc.Prefix,
  32. )
  33. if err != nil {
  34. panic(err)
  35. }
  36. }
  37. func getValue(addr string) string {
  38. return `{"Addr:":"http://` + addr + `"}`
  39. }
  40. // 续租
  41. func relet(serviceName, serviceAddr, prefix string) error {
  42. var ctx = context.Background()
  43. // 创建一个租约
  44. lease := clientv3.NewLease(cli)
  45. cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3)
  46. defer cancel()
  47. leaseResp, err := lease.Grant(cancelCtx, 3)
  48. if err != nil {
  49. return err
  50. }
  51. // 长链接
  52. leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID)
  53. if err != nil {
  54. return err
  55. }
  56. em, err := endpoints.NewManager(cli, prefix)
  57. if err != nil {
  58. return err
  59. }
  60. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  61. defer cancel()
  62. if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{
  63. Addr: serviceAddr,
  64. }, clientv3.WithLease(leaseResp.ID)); err != nil {
  65. return err
  66. }
  67. log.Println("Register etcd success")
  68. del := func() {
  69. log.Println("Register close")
  70. cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
  71. defer cancel()
  72. em.DeleteEndpoint(cancelCtx, serviceName)
  73. lease.Close()
  74. }
  75. // 保持注册状态(连接断开重连)
  76. keepRegister(ctx, leaseChannel, del, serviceName,serviceAddr, prefix)
  77. return nil
  78. }
  79. func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, serviceAddr ,prefix string) {
  80. go func() {
  81. failedCount := 0
  82. for {
  83. select {
  84. case resp := <-leaseChannel:
  85. if resp != nil {
  86. //log.Println("keep alive success.")
  87. } else {
  88. log.Println("keep alive failed.")
  89. failedCount++
  90. for failedCount > 3 {
  91. cleanFunc()
  92. if err := relet(serviceName, serviceAddr, prefix); err != nil {
  93. time.Sleep(time.Second)
  94. continue
  95. }
  96. return
  97. }
  98. continue
  99. }
  100. case <-ctx.Done():
  101. cleanFunc()
  102. cli.Close()
  103. return
  104. }
  105. }
  106. }()
  107. }
  108. func UnRegisterEtcd(serviceName, serviceAddr string) {
  109. }