服务发现
etc/discovery/discovery.go
package main import ( "context" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "log" "os" "os/signal" "sync" "syscall" "time" ) // etcd client put/get demo // go get go.etcd.io/etcd/client/v3 // 网上很多例子都写 go.etcd.io/etcd/client ,目测这个client目录中已经没有v3了,还是用 go get go.etcd.io/etcd/client/v3 靠谱 // ServiceDiscovery 服务发现 type ServiceDiscovery struct { cli *clientv3.Client // etcd client serverList map[string]string // 服务列表 lock sync.RWMutex } // NewServiceDiscovery 新建服务发现 func NewServiceDiscovery(endpoints []string) *ServiceDiscovery { // 初始化etcd client cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Duration(5) * time.Second, }) if err != nil { log.Fatalln(err) } return &ServiceDiscovery{ cli: cli, serverList: make(map[string]string), } } // WatchService 初始化服务列表和监视 func (s *ServiceDiscovery) WatchService(prefix string) error { // 根据前缀获取现有的key resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return err } // 遍历获取得到的k和v for _, ev := range resp.Kvs { s.SetServiceList(string(ev.Key), string(ev.Value)) } // 监视前缀,修改变更server go s.watcher(prefix) return nil } // watcher 监听Key的前缀 func (s *ServiceDiscovery) watcher(prefix string) { rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix()) log.Printf("watching prefix:%s now...", prefix) for wresp := range rch { for _, ev := range wresp.Events { switch ev.Type { case mvccpb.PUT: // 修改或者新增 s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value)) case mvccpb.DELETE: // 删除 s.DelServiceList(string(ev.Kv.Key)) } } } } func (s *ServiceDiscovery) SetServiceList(key, val string) { s.lock.Lock() defer s.lock.Unlock() s.serverList[key] = string(val) log.Println("put key:", key, "val:", val) } func (s *ServiceDiscovery) DelServiceList(key string) { s.lock.Lock() defer s.lock.Unlock() delete(s.serverList, key) log.Println("Del key :", key) } // GetServices 获取服务地址 func (s *ServiceDiscovery) GetServices() []string { s.lock.RLock() defer s.lock.RUnlock() addrs := make([]string, 0, len(s.serverList)) for _, v := range s.serverList { addrs = append(addrs, v) } return addrs } // Close 关闭服务 func (s *ServiceDiscovery) Close() error { return s.cli.Close() } func main() { var endpoints = []string{"127.0.0.1:2379"} ser := NewServiceDiscovery(endpoints) defer ser.Close() err := ser.WatchService("/server/") if err != nil { log.Fatal(err) } // 监控系统信号,等待 ctrl + c 系统信号通知服务关闭 c := make(chan os.Signal, 1) go func() { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }() for { select { case <-time.Tick(10 * time.Second): log.Println(ser.GetServices()) case <-c: log.Println("server discovery exit") return } } }
运行结果
$ go run discovery.go 2023/04/13 14:24:09 watching prefix:/server/ now... 2023/04/13 14:24:19 [] 2023/04/13 14:24:29 [] 2023/04/13 14:24:36 put key: /server/node1 val: localhost:8000 2023/04/13 14:24:39 [localhost:8000] 2023/04/13 14:24:49 [localhost:8000] 2023/04/13 14:24:59 Del key : /server/node1 2023/04/13 14:24:59 [] 2023/04/13 14:25:09 server discovery exit
服务注册
etc/register/register.go
package main import ( "context" "log" "os" "os/signal" "syscall" "time" clientv3 "go.etcd.io/etcd/client/v3" ) // etcd client put/get demo // go get go.etcd.io/etcd/client/v3 // 网上很多例子都写 go.etcd.io/etcd/client ,目测这个client目录中已经没有v3了,还是用 go get go.etcd.io/etcd/client/v3 靠谱 // ServiceRegister 创建租约注册服务 type ServiceRegister struct { cli *clientv3.Client // etcd v3 client leaseID clientv3.LeaseID // 租约ID // 租约keepalive相应的chan keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse key string // key val string // value } // NewServiceRegister 新建服务注册 func NewServiceRegister(endpoints []string, key, val string, lease int64, dailTimeout int) (*ServiceRegister, error) { cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: time.Duration(dailTimeout) * time.Second, }) if err != nil { return nil, err } ser := &ServiceRegister{ cli: cli, key: key, val: val, } // 申请租约设置时间keepalive if err := ser.putKeyWithLease(lease); err != nil { return nil, err } return ser, nil } // 设置租约 func (s *ServiceRegister) putKeyWithLease(lease int64) error { // 创建一个租约,并且设置ttl时间 resp, err := s.cli.Grant(context.Background(), lease) if err != nil { return err } // 注册服务并且绑定租约 _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID)) if err != nil { return err } // 设置续租 定期发送需求请求 // KeepAlive 使给定的租约永远有效。如果发布到通道的keepalive响应没有立即使用, // 则租约客户端至少每秒钟向etcd服务器发送保持活动请求,知道获取最新响应为止 // etcd client 会自动发送ttl到etcd server, 从而保证租约一直有效 leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID) if err != nil { return err } s.leaseID = resp.ID log.Println(s.leaseID) s.keepAliveChan = leaseRespChan log.Printf("Put key:%s val:%s success!", s.key, s.val) return nil } // ListenLeaseRespChan 监听续租情况 func (s *ServiceRegister) ListenLeaseRespChan() { for leaseKeepResp := range s.keepAliveChan { log.Println("续约成功", leaseKeepResp) } log.Println("关闭租约") } // Close 注销服务 func (s *ServiceRegister) Close() error { // 撤销租约 if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil { return err } log.Println("撤销租约") return s.cli.Close() } func main() { var endpoints = []string{"127.0.0.1:2379"} ser, err := NewServiceRegister(endpoints, "/server/node1", "localhost:8000", 6, 5) if err != nil { log.Fatalln(err) } // 监听相应的租约 go ser.ListenLeaseRespChan() // 监控系统型号,等待 ctrl + c 系统信号通知关闭 c := make(chan os.Signal, 1) go func() { signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) }() log.Printf("exit %s", <-c) ser.Close() }
运行结果
$ go run register.go 2023/04/13 14:24:36 7587869747887312076 2023/04/13 14:24:36 Put key:/server/node1 val:localhost:8000 success! 2023/04/13 14:24:36 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 2023/04/13 14:24:38 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 2023/04/13 14:24:40 续约成功 cluster_id:14841639068965178418 member_id:10276657743932975437 revision:62 raft_term:5 ... ... ^C2023/04/13 14:24:59 exit interrupt 2023/04/13 14:24:59 撤销租约 2023/04/13 14:24:59 关闭租约
《本文》有 0 条评论