package parser import ( "context" "fmt" "log" "time" uuid "github.com/satori/go.uuid" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/naming/endpoints" ) var cli *clientv3.Client func GetEtcdClient() *clientv3.Client { return cli } func EctdHandler(etcdAddrs []string, serveAddr string) { var ( err error ) cli, err = clientv3.New(clientv3.Config{ Endpoints: etcdAddrs, DialTimeout: 5 * time.Second, }) if err != nil { panic(err) } // 注册续租 err = relet(Conf.Rpc.AdmTask.ServiceName, serveAddr, Conf.Rpc.Prefix, ) if err != nil { panic(err) } } func getValue(addr string) string { return `{"Addr:":"http://` + addr + `"}` } // 续租 func relet(serviceName, serviceAddr, prefix string) error { var ctx = context.Background() // 创建一个租约 lease := clientv3.NewLease(cli) cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3) defer cancel() leaseResp, err := lease.Grant(cancelCtx, 3) if err != nil { return err } // 长链接 leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID) if err != nil { return err } em, err := endpoints.NewManager(cli, prefix) if err != nil { return err } cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{ Addr: serviceAddr, }, clientv3.WithLease(leaseResp.ID)); err != nil { return err } log.Println("Register etcd success") del := func() { log.Println("Register close") cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() em.DeleteEndpoint(cancelCtx, serviceName) lease.Close() } // 保持注册状态(连接断开重连) keepRegister(ctx, leaseChannel, del, serviceName, serviceAddr, prefix) return nil } func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, serviceAddr, prefix string) { go func() { failedCount := 0 for { select { case resp := <-leaseChannel: if resp != nil { //log.Println("keep alive success.") } else { log.Println("keep alive failed.") failedCount++ for failedCount > 3 { cleanFunc() if err := relet(serviceName, serviceAddr, prefix); err != nil { time.Sleep(time.Second) continue } return } continue } case <-ctx.Done(): cleanFunc() cli.Close() return } } }() } func UnRegisterEtcd(serviceName, serviceAddr string) { }