Etcd使用

busgo · · 1809 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 前言 我们在使用go语言来构建分布式应用集群的时候,通常会选择一个分布式协调框架来协调整个分布式集群正常工作(如:服务注册/发现、选主等)。Java语言有hadoop体系中zookeeper(不得不说现在zookeeper太过于臃肿),现在容器(docker)盛行的时代 k8s采用的是etcd来作为自己的协调框架。现在越来越多的项目开始采用etcd来作为自己的服务协调框架。go语言中目前发现基于Raft算法中有[Etcd](https://github.com/etcd-io/etcd)和[Consul](https://github.com/hashicorp/consul)。不得不说下这个hashicorp公司自己的产品中使用[Etcd](https://github.com/etcd-io/etcd) 而不是自己的[Consul](https://github.com/hashicorp/consul)。那我们今天了解下这个大名鼎鼎的[Etcd](https://github.com/etcd-io/etcd) 怎么使用的,本篇涉及到etcd的KV、Lease、Tx等内容。 ## 开始 #### 1.初始化 ```go // 定义一个etcd客户端结构体 type Etcd struct { endpoints []string client *clientv3.Client kv clientv3.KV timeout time.Duration } // 定义key变更事件常量 const ( KeyCreateChangeEvent = iota KeyUpdateChangeEvent KeyDeleteChangeEvent ) // key 变化事件 type KeyChangeEvent struct { Type int Key string Value []byte } // 监听key 变化响应 type WatchKeyChangeResponse struct { Event chan *KeyChangeEvent CancelFunc context.CancelFunc Watcher clientv3.Watcher } type TxResponse struct { Success bool LeaseID clientv3.LeaseID Lease clientv3.Lease Key string Value string } ``` ##### 1.1.初始化函数 ```go // create a etcd func NewEtcd(endpoints []string, timeout time.Duration) (etcd *Etcd, err error) { var ( client *clientv3.Client ) conf := clientv3.Config{ Endpoints: endpoints, DialTimeout: timeout, } if client, err = clientv3.New(conf); err != nil { return } etcd = &Etcd{ endpoints: endpoints, client: client, kv: clientv3.NewKV(client), timeout: timeout, } return } ``` #### 2.Get操作 ##### 2.1.根据key获取value ```go // get value from a key func (etcd *Etcd) Get(key string) (value []byte, err error) { var ( getResponse *clientv3.GetResponse ) ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() if getResponse, err = etcd.kv.Get(ctx, key); err != nil { return } if len(getResponse.Kvs) == 0 { return } value = getResponse.Kvs[0].Value return } ``` ##### 2.2.根据key前缀获取value列表 ```go // get values from prefixKey func (etcd *Etcd) GetWithPrefixKey(prefixKey string) (keys [][]byte, values [][]byte, err error) { var ( getResponse *clientv3.GetResponse ) ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() if getResponse, err = etcd.kv.Get(ctx, prefixKey, clientv3.WithPrefix()); err != nil { return } if len(getResponse.Kvs) == 0 { return } keys = make([][]byte, 0) values = make([][]byte, 0) for i := 0; i < len(getResponse.Kvs); i++ { keys = append(keys, getResponse.Kvs[i].Key) values = append(values, getResponse.Kvs[i].Value) } return } ``` ##### 2.3.根据key前缀获取指定条数 ```go // get values from prefixKey limit func (etcd *Etcd) GetWithPrefixKeyLimit(prefixKey string, limit int64) (keys [][]byte, values [][]byte, err error) { var ( getResponse *clientv3.GetResponse ) ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() if getResponse, err = etcd.kv.Get(ctx, prefixKey, clientv3.WithPrefix(), clientv3.WithLimit(limit)); err != nil { return } if len(getResponse.Kvs) == 0 { return } keys = make([][]byte, 0) values = make([][]byte, 0) for i := 0; i < len(getResponse.Kvs); i++ { keys = append(keys, getResponse.Kvs[i].Key) values = append(values, getResponse.Kvs[i].Value) } return } ``` #### 3.Put操作 ##### 3.1.put一个值 ```go // put a key func (etcd *Etcd) Put(key, value string) (err error) { ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() if _, err = etcd.kv.Put(ctx, key, value); err != nil { return } return } ``` ##### 3.2.Put一个不存在的值 ```go // put a key not exist func (etcd *Etcd) PutNotExist(key, value string) (success bool, oldValue []byte, err error) { var ( txnResponse *clientv3.TxnResponse ) ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() txn := etcd.client.Txn(ctx) txnResponse, err = txn.If(clientv3.Compare(clientv3.Version(key), "=", 0)). Then(clientv3.OpPut(key, value)). Else(clientv3.OpGet(key)). Commit() if err != nil { return } if txnResponse.Succeeded { success = true } else { oldValue = make([]byte, 0) oldValue = txnResponse.Responses[0].GetResponseRange().Kvs[0].Value } return } ``` ##### 3.3.更新一个已经存在的值 ```go func (etcd *Etcd) Update(key, value, oldValue string) (success bool, err error) { var ( txnResponse *clientv3.TxnResponse ) ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() txn := etcd.client.Txn(ctx) txnResponse, err = txn.If(clientv3.Compare(clientv3.Value(key), "=", oldValue)). Then(clientv3.OpPut(key, value)). Commit() if err != nil { return } if txnResponse.Succeeded { success = true } return } ``` #### 4.Delete操作 ##### 4.1.根据key删除 ```go func (etcd *Etcd) Delete(key string) (err error) { ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() _, err = etcd.kv.Delete(ctx, key) return } ``` ##### 4.2.根据一个key前缀删除 ```go // delete the keys with prefix key func (etcd *Etcd) DeleteWithPrefixKey(prefixKey string) (err error) { ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() _, err = etcd.kv.Delete(ctx, prefixKey, clientv3.WithPrefix()) return } ``` #### 5.Watch ##### 5.1.watch 一个key ```go // watch a key func (etcd *Etcd) Watch(key string) (keyChangeEventResponse *WatchKeyChangeResponse) { watcher := clientv3.NewWatcher(etcd.client) watchChans := watcher.Watch(context.Background(), key) keyChangeEventResponse = &WatchKeyChangeResponse{ Event: make(chan *KeyChangeEvent, 250), Watcher: watcher, } go func() { for ch := range watchChans { if ch.Canceled { goto End } for _, event := range ch.Events { etcd.handleKeyChangeEvent(event, keyChangeEventResponse.Event) } } End: log.Println("the watcher lose for key:", key) }() return } // handle the key change event func (etcd *Etcd) handleKeyChangeEvent(event *clientv3.Event, events chan *KeyChangeEvent) { changeEvent := &KeyChangeEvent{ Key: string(event.Kv.Key), } switch event.Type { case mvccpb.PUT: if event.IsCreate() { changeEvent.Type = KeyCreateChangeEvent } else { changeEvent.Type = KeyUpdateChangeEvent } changeEvent.Value = event.Kv.Value case mvccpb.DELETE: changeEvent.Type = KeyDeleteChangeEvent } events <- changeEvent } ``` ##### 5.2.watch一个key前缀 ```go // watch with prefix key func (etcd *Etcd) WatchWithPrefixKey(prefixKey string) (keyChangeEventResponse *WatchKeyChangeResponse) { watcher := clientv3.NewWatcher(etcd.client) watchChans := watcher.Watch(context.Background(), prefixKey, clientv3.WithPrefix()) keyChangeEventResponse = &WatchKeyChangeResponse{ Event: make(chan *KeyChangeEvent, 250), Watcher: watcher, } go func() { for ch := range watchChans { if ch.Canceled { goto End } for _, event := range ch.Events { etcd.handleKeyChangeEvent(event, keyChangeEventResponse.Event) } } End: log.Println("the watcher lose for prefixKey:", prefixKey) }() return } // handle the key change event func (etcd *Etcd) handleKeyChangeEvent(event *clientv3.Event, events chan *KeyChangeEvent) { changeEvent := &KeyChangeEvent{ Key: string(event.Kv.Key), } switch event.Type { case mvccpb.PUT: if event.IsCreate() { changeEvent.Type = KeyCreateChangeEvent } else { changeEvent.Type = KeyUpdateChangeEvent } changeEvent.Value = event.Kv.Value case mvccpb.DELETE: changeEvent.Type = KeyDeleteChangeEvent } events <- changeEvent } ``` #### 6.Lease的使用 ##### 6.1.创建一个指定时间的临时key ```go func (etcd *Etcd) TxWithTTL(key, value string, ttl int64) (txResponse *TxResponse, err error) { var ( txnResponse *clientv3.TxnResponse leaseID clientv3.LeaseID v []byte ) lease := clientv3.NewLease(etcd.client) grantResponse, err := lease.Grant(context.Background(), ttl) leaseID = grantResponse.ID ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() txn := etcd.client.Txn(ctx) txnResponse, err = txn.If( clientv3.Compare(clientv3.Version(key), "=", 0)). Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID))).Commit() if err != nil { _ = lease.Close() return } txResponse = &TxResponse{ LeaseID: leaseID, Lease: lease, } if txnResponse.Succeeded { txResponse.Success = true } else { // close the lease _ = lease.Close() v, err = etcd.Get(key) if err != nil { return } txResponse.Success = false txResponse.Key = key txResponse.Value = string(v) } return } ``` ###### 6.2.创建一个不间断续约的临时key ```go func (etcd *Etcd) TxKeepaliveWithTTL(key, value string, ttl int64) (txResponse *TxResponse, err error) { var ( txnResponse *clientv3.TxnResponse leaseID clientv3.LeaseID aliveResponses <-chan *clientv3.LeaseKeepAliveResponse v []byte ) lease := clientv3.NewLease(etcd.client) grantResponse, err := lease.Grant(context.Background(), ttl) leaseID = grantResponse.ID if aliveResponses, err = lease.KeepAlive(context.Background(), leaseID); err != nil { return } go func() { for ch := range aliveResponses { if ch == nil { goto End } } End: log.Printf("the tx keepalive has lose key:%s", key) }() ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout) defer cancelFunc() txn := etcd.client.Txn(ctx) txnResponse, err = txn.If( clientv3.Compare(clientv3.Version(key), "=", 0)). Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID))). Else( clientv3.OpGet(key), ).Commit() if err != nil { _ = lease.Close() return } txResponse = &TxResponse{ LeaseID: leaseID, Lease: lease, } if txnResponse.Succeeded { txResponse.Success = true } else { // close the lease _ = lease.Close() txResponse.Success = false if v, err = etcd.Get(key); err != nil { return } txResponse.Key = key txResponse.Value = string(v) } return } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1809 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传