监听etcd中的kv变化,常用来做集群中的配置下发、状态同步,非常有价值。
package main
import (
"go.etcd.io/etcd/clientv3"
"time"
"fmt"
"context"
"go.etcd.io/etcd/mvcc/mvccpb"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
watcher clientv3.Watcher
getResp *clientv3.GetResponse
watchStartRevision int64
watchRespChan <-chan clientv3.WatchResponse
watchResp clientv3.WatchResponse
event *clientv3.Event
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"0.0.0.0:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// KV
kv = clientv3.NewKV(client)
// 模拟etcd中KV的变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")
time.Sleep(1 * time.Second)
}
}()
// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
fmt.Println(err)
return
}
// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}
// 当前etcd集群事务ID, 单调递增的(监听/cron/jobs/job7后续的变化,也就是通过监听版本变化)
watchStartRevision = getResp.Header.Revision + 1
// 创建一个watcher(监听器)
watcher = clientv3.NewWatcher(client)
// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)
ctx, cancelFunc := context.WithCancel(context.TODO())
//5秒钟后取消
time.AfterFunc(5 * time.Second, func() {
cancelFunc()
})
//这里ctx感知到cancel则会关闭watcher
watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event = range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}
}
输出:
[root@bogon etcd]# go run demo8.go
当前值: i am job7
从该版本向后监听: 33
删除了 Revision: 33
修改为: i am job7 Revision: 34 34
删除了 Revision: 35
修改为: i am job7 Revision: 36 36
删除了 Revision: 37
修改为: i am job7 Revision: 38 38
删除了 Revision: 39
修改为: i am job7 Revision: 40 40
删除了 Revision: 41
[root@bogon etcd]#
有疑问加站长微信联系(非本文作者)