package etcd import ( "context" "fmt" "log" "strings" "time" "adm-data/parser" uuid "github.com/satori/go.uuid" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" ) var cli *clientv3.Client func Get() *clientv3.Client { return cli } func Init(addr, serviceIp, servicePort string) { var ( err error ) cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(addr, ","), DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } // 注册续租 _ = relet(parser.Conf.Rpc.ADMData.ServiceName, serviceIp+":"+servicePort, parser.Conf.Rpc.Prefix, ) if err != nil { panic(err) } } func getValue(addr string) string { return `{"Addr:":"http://` + addr + `"}` } // 续租 func relet(serviceName, serviceAddr, prefix string) error { var ctx = context.Background() // 创建一个租约 lease := clientv3.NewLease(cli) cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() leaseResp, err := lease.Grant(cancelCtx, 3) if err != nil { return err } // 长链接 leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err } em, err := endpoints.NewManager(cli, prefix) if err != nil { return err } cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{ Addr: serviceAddr, }, clientv3.WithLease(leaseResp.ID)); err != nil { return err } log.Println("Register etcd success") del := func() { log.Println("Register close") cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() em.DeleteEndpoint(cancelCtx, serviceName) lease.Close() } // 保持注册状态(连接断开重连) keepRegister(ctx, leaseChannel, del, serviceName,serviceAddr, prefix) return nil } func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, serviceAddr ,prefix string) { go func() { failedCount := 0 for { select { case resp := <-leaseChannel: if resp != nil { //log.Println("keep alive success.") } else { log.Println("keep alive failed.") failedCount++ for failedCount > 3 { cleanFunc() if err := relet(serviceName, serviceAddr, prefix); err != nil { time.Sleep(time.Second) continue } return } continue } case <-ctx.Done(): cleanFunc() cli.Close() return } } }() } func UnRegisterEtcd(serviceName, serviceAddr string) { }