123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package etcd
- import (
- "context"
- "fmt"
- "log"
- "strings"
- "time"
- "adm-data/parser"
- uuid "github.com/satori/go.uuid"
- clientv3 "go.etcd.io/etcd/client/v3"
- "go.etcd.io/etcd/client/v3/naming/endpoints"
- )
- var cli *clientv3.Client
- func Get() *clientv3.Client {
- return cli
- }
- func Init(addr, serviceIp, servicePort string) {
- var (
- err error
- )
- cli, err = clientv3.New(clientv3.Config{
- Endpoints: strings.Split(addr, ","),
- DialTimeout: 5 * time.Second,
- })
- if err != nil {
- panic(err)
- }
- // 注册续租
- _ = relet(parser.Conf.Rpc.ADMData.ServiceName,
- serviceIp+":"+servicePort,
- parser.Conf.Rpc.Prefix,
- )
- if err != nil {
- panic(err)
- }
- }
- func getValue(addr string) string {
- return `{"Addr:":"http://` + addr + `"}`
- }
- // 续租
- func relet(serviceName, serviceAddr, prefix string) error {
- var ctx = context.Background()
- // 创建一个租约
- lease := clientv3.NewLease(cli)
- cancelCtx, cancel := context.WithTimeout(ctx, time.Second*3)
- defer cancel()
- leaseResp, err := lease.Grant(cancelCtx, 3)
- if err != nil {
- return err
- }
- // 长链接
- leaseChannel, err := lease.KeepAlive(ctx, leaseResp.ID)
- if err != nil {
- return err
- }
- em, err := endpoints.NewManager(cli, prefix)
- if err != nil {
- return err
- }
- cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
- defer cancel()
- if err := em.AddEndpoint(cancelCtx, fmt.Sprintf("%s/%s/%s", prefix, serviceName, uuid.NewV4().String()), endpoints.Endpoint{
- Addr: serviceAddr,
- }, clientv3.WithLease(leaseResp.ID)); err != nil {
- return err
- }
- log.Println("Register etcd success")
- del := func() {
- log.Println("Register close")
- cancelCtx, cancel = context.WithTimeout(ctx, time.Second*3)
- defer cancel()
- em.DeleteEndpoint(cancelCtx, serviceName)
- lease.Close()
- }
- // 保持注册状态(连接断开重连)
- keepRegister(ctx, leaseChannel, del, serviceName,serviceAddr, prefix)
- return nil
- }
- func keepRegister(ctx context.Context, leaseChannel <-chan *clientv3.LeaseKeepAliveResponse, cleanFunc func(), serviceName, serviceAddr ,prefix string) {
- go func() {
- failedCount := 0
- for {
- select {
- case resp := <-leaseChannel:
- if resp != nil {
- //log.Println("keep alive success.")
- } else {
- log.Println("keep alive failed.")
- failedCount++
- for failedCount > 3 {
- cleanFunc()
- if err := relet(serviceName, serviceAddr, prefix); err != nil {
- time.Sleep(time.Second)
- continue
- }
- return
- }
- continue
- }
- case <-ctx.Done():
- cleanFunc()
- cli.Close()
- return
- }
- }
- }()
- }
- func UnRegisterEtcd(serviceName, serviceAddr string) {
- }
|