首页 > golang > go操作etcd实现服务注册和发现亲测版
2023
03-05

go操作etcd实现服务注册和发现亲测版

服务发现

etc/discovery/discovery.go

package main

import (
   "context"
   "go.etcd.io/etcd/api/v3/mvccpb"
   clientv3 "go.etcd.io/etcd/client/v3"
   "log"
   "os"
   "os/signal"
   "sync"
   "syscall"
   "time"
)

// etcd client put/get demo
// go get go.etcd.io/etcd/client/v3
// 网上很多例子都写 go.etcd.io/etcd/client ,目测这个client目录中已经没有v3了,还是用 go get go.etcd.io/etcd/client/v3 靠谱

// ServiceDiscovery 服务发现
type ServiceDiscovery struct {
   cli        *clientv3.Client  // etcd client
   serverList map[string]string // 服务列表
   lock       sync.RWMutex
}

// NewServiceDiscovery 新建服务发现
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
   // 初始化etcd client
   cli, err := clientv3.New(clientv3.Config{
      Endpoints:   endpoints,
      DialTimeout: time.Duration(5) * time.Second,
   })
   if err != nil {
      log.Fatalln(err)
   }

   return &ServiceDiscovery{
      cli:        cli,
      serverList: make(map[string]string),
   }
}

// WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
   // 根据前缀获取现有的key
   resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
   if err != nil {
      return err
   }

   // 遍历获取得到的k和v
   for _, ev := range resp.Kvs {
      s.SetServiceList(string(ev.Key), string(ev.Value))
   }

   // 监视前缀,修改变更server
   go s.watcher(prefix)
   return nil
}

// watcher 监听Key的前缀
func (s *ServiceDiscovery) watcher(prefix string) {
   rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
   log.Printf("watching prefix:%s now...", prefix)
   for wresp := range rch {
      for _, ev := range wresp.Events {
         switch ev.Type {
         case mvccpb.PUT: // 修改或者新增
            s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
         case mvccpb.DELETE: // 删除
            s.DelServiceList(string(ev.Kv.Key))
         }
      }
   }
}

func (s *ServiceDiscovery) SetServiceList(key, val string) {
   s.lock.Lock()
   defer s.lock.Unlock()
   s.serverList[key] = string(val)
   log.Println("put key:", key, "val:", val)
}

func (s *ServiceDiscovery) DelServiceList(key string) {
   s.lock.Lock()
   defer s.lock.Unlock()
   delete(s.serverList, key)
   log.Println("Del key :", key)
}

// GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
   s.lock.RLock()
   defer s.lock.RUnlock()
   addrs := make([]string, 0, len(s.serverList))

   for _, v := range s.serverList {
      addrs = append(addrs, v)
   }
   return addrs
}

// Close 关闭服务
func (s *ServiceDiscovery) Close() error {
   return s.cli.Close()
}

func main() {

   var endpoints = []string{"127.0.0.1:2379"}
   ser := NewServiceDiscovery(endpoints)
   defer ser.Close()

   err := ser.WatchService("/server/")
   if err != nil {
      log.Fatal(err)
   }

   // 监控系统信号,等待 ctrl + c 系统信号通知服务关闭
   c := make(chan os.Signal, 1)
   go func() {
      signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
   }()
   for {
      select {
      case <-time.Tick(10 * time.Second):
         log.Println(ser.GetServices())
      case <-c:
         log.Println("server discovery exit")
         return
      }
   }

}

运行结果

$ go run discovery.go 

2023/04/13 14:24:09 watching prefix:/server/ now...
2023/04/13 14:24:19 []
2023/04/13 14:24:29 []
2023/04/13 14:24:36 put key: /server/node1 val: localhost:8000
2023/04/13 14:24:39 [localhost:8000]
2023/04/13 14:24:49 [localhost:8000]
2023/04/13 14:24:59 Del key : /server/node1
2023/04/13 14:24:59 []
2023/04/13 14:25:09 server discovery exit

服务注册

etc/register/register.go

package main

import (
   "context"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"

   clientv3 "go.etcd.io/etcd/client/v3"
)

// etcd client put/get demo
// go get go.etcd.io/etcd/client/v3
// 网上很多例子都写 go.etcd.io/etcd/client ,目测这个client目录中已经没有v3了,还是用 go get go.etcd.io/etcd/client/v3 靠谱

// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
   cli     *clientv3.Client // etcd v3 client
   leaseID clientv3.LeaseID // 租约ID
   // 租约keepalive相应的chan
   keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
   key           string // key
   val           string // value
}

// NewServiceRegister 新建服务注册
func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) (*ServiceRegister, error) {
   cli, err := clientv3.New(clientv3.Config{
      Endpoints:   endpoints,
      DialTimeout: time.Duration(dailTimeout) * time.Second,
   })
   if err != nil {
      return nil, err
   }

   ser := &ServiceRegister{
      cli: cli,
      key: key,
      val: val,
   }

   // 申请租约设置时间keepalive
   if err := ser.putKeyWithLease(lease); err != nil {
      return nil, err
   }
   return ser, nil
}

// 设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
   // 创建一个租约,并且设置ttl时间
   resp, err := s.cli.Grant(context.Background(), lease)
   if err != nil {
      return err
   }

   // 注册服务并且绑定租约
   _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))

   if err != nil {
      return err
   }

   // 设置续租 定期发送需求请求
   // KeepAlive 使给定的租约永远有效。如果发布到通道的keepalive响应没有立即使用,
   // 则租约客户端至少每秒钟向etcd服务器发送保持活动请求,知道获取最新响应为止
   // etcd client 会自动发送ttl到etcd server, 从而保证租约一直有效
   leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)
   if err != nil {
      return err
   }

   s.leaseID = resp.ID
   log.Println(s.leaseID)
   s.keepAliveChan = leaseRespChan
   log.Printf("Put key:%s val:%s success!", s.key, s.val)
   return nil
}

// ListenLeaseRespChan 监听续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
   for leaseKeepResp := range s.keepAliveChan {
      log.Println("续约成功", leaseKeepResp)
   }
   log.Println("关闭租约")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
   // 撤销租约
   if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
      return err
   }
   log.Println("撤销租约")
   return s.cli.Close()
}

func main() {

   var endpoints = []string{"127.0.0.1:2379"}

   ser, err := NewServiceRegister(endpoints, "/server/node1", "localhost:8000", 6, 5)
   if err != nil {
      log.Fatalln(err)
   }

   // 监听相应的租约
   go ser.ListenLeaseRespChan()

   // 监控系统型号,等待 ctrl + c 系统信号通知关闭
   c := make(chan os.Signal, 1)
   go func() {
      signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
   }()
   log.Printf("exit %s", <-c)
   ser.Close()

}

运行结果

$ go run register.go 

2023/04/13 14:24:36 7587869747887312076
2023/04/13 14:24:36 Put key:/server/node1 val:localhost:8000 success!
2023/04/13 14:24:36 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 
2023/04/13 14:24:38 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 
2023/04/13 14:24:40 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 
...
...
^C2023/04/13 14:24:59 exit interrupt
2023/04/13 14:24:59 撤销租约
2023/04/13 14:24:59 关闭租约


本文》有 0 条评论

留下一个回复