etcd API使用

bytemode · · 2615 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

## 连接客户端 访问etcd首先要创建client,它需要传入一个Config配置. `Endpoints`:etcd的多个节点服务地址。 `DialTimeout`:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,不用再关心后续底层连接的状态了,client内部会重连。 ``` cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"localhost:2379"}, DialTimeout: 5 * time.Second, }) ``` 返回的 client,它的类型具体如下: ``` type Client struct { Cluster KV Lease Watcher Auth Maintenance Username string Password string } ``` 类型中的成员是etcd客户端几何核心功能模块的具体实现: `Cluster`:向集群里增加删除etcd节点,集群管理 `KV`:K-V键值库 `Lease`:租约相关操作,租约过期会删除授权的key `Watcher`:观察订阅,从而监听最新的数据变化 `Auth`:管理etcd的用户和权限,属于管理员操作 `Maintenance`:维护etcd,比如主动迁移etcd的leader节点 ## KV增删改查 ### kv对象 ``` type KV interface { // Put puts a key-value pair into etcd. Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) // Get retrieves keys. Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) // Delete deletes a key, or optionally using WithRange(end), [key, end). Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) // Compact compacts etcd KV history before the given rev. Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) // Do applies a single Op on KV without a transaction. Do(ctx context.Context, op Op) (OpResponse, error) // Txn creates a transaction. Txn(ctx context.Context) Txn } ``` ### KV存储读取删除 1. kv对象实例获取 `kv := clientev3.NewKV(client)` 2. 存储Put ``` putResp, err := kv.Put(context.TODO(),"/key/example", "hello") ``` `Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)` 参数介绍: `ctx`: Context包对象,是用来跟踪上下文的,列如超时控制 `key`: 存储对象的key `val`: 存储对象的value `opts`:  可变参数,额外选项 3. 查询Get `getResp, err := kv.Get(context.TODO(), "/key/example")` 4. 删除Delete `delREsp, err := kv.Delete(context.TODO(), "/key/example")` 5. WithPrefix ``` rangeResp, err := kv.Get(context.TODO(), "/key/example", clientv3.WithPrefix()) ``` WithPrefix()是指查找以/key/example为前缀的所有key ## 租约lease ### 租约对象 ``` type Lease interface { // Grant creates a new lease. 分配租约 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) // Revoke revokes the given lease. 撤销租约 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) // TimeToLive retrieves the lease information of the given lease ID. 获取ttl剩余舌尖 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) // Leases retrieves all leases. 列举所有租约 Leases(ctx context.Context) (*LeaseLeasesResponse, error) // KeepAlive keeps the given lease alive forever // 租约自动定时续期 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) // KeepAliveOnce renews the lease once. In most of the cases, KeepAlive // should be used instead of KeepAliveOnce. //续期一次租约 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) // Close releases all resources Lease keeps for efficient communication // with the etcd server. Close() error } ``` ### 租约使用 1. 获取租约 2. 设置租约ttl 3. 对keyPut时使用租约,租约到期key自动删除 4. 主动给Lease进行续约 ``` lease := clientv3.NewLease(client) grantResp, err := lease.Grant(context.TODO(), 10) kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID)) keepResp, err := lease.KeepAlive(context.TODO(), grantResp.ID) // sleep一会... ``` ## 事务Tnx etcd中事务是原子执行的,针对kv存储操作的if … then … else结构.将请求归并到一起放在原子块中.事务可以保护key不受其他并发更新操作的影响. etcd的实现方式有些不同,它的事务是基于cas方式实现的。在事务执行过程中,client和etcd之间没有维护事务会话,在commit事务时,它的“冲突判断(If)和执行过程Then/Else”一次性提交给etcd,etcd来作为一个原子过程来执行“If-Then-Else”。所以,etcd事务不会发生阻塞,无论成功,还是失败,都会立即返回,需要应用进行失败(发生冲突)重试。因此,这也就要求业务代码是可重试的。 ``` type Txn interface { // If takes a list of comparison. If all comparisons passed in succeed, // the operations passed into Then() will be executed. Or the operations // passed into Else() will be executed. If(cs ...Cmp) Txn // Then takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() succeed. Then(ops ...Op) Txn // Else takes a list of operations. The Ops list will be executed, if the // comparisons passed in If() fail. Else(ops ...Op) Txn // Commit tries to commit the transaction. Commit() (*TxnResponse, error) } ``` 1. 通过kv对象开启一个事务 2. if then else 书写事务 3. 最后Commit提交整个Txn事务 ``` txn := kv.Txn(context.TODO()) txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=","hello")). Then(clientv3.OpGet("/hi")). Else(clientv3.OpGet("/test/",clientv3.WithPrefix())). Commit() if txnResp.Succeeded { // If = true fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs) } else { // If =false fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs) } ``` 这个`Value(“/hi”)`返回的Cmp表达了:“`/hi`这个key对应的value” 利用Compare函数来继续为”主语”增加描述, 形成完整的条件语句:/hi这个key对应的value必须等于"hello" ## 监听watch watch接口 ``` type Watcher interface { // Watch watches on a key or prefix. The watched events will be returned // through the returned channel. // If the watch is slow or the required rev is compacted, the watch request // might be canceled from the server-side and the chan will be closed. // 'opts' can be: 'WithRev' and/or 'WithPrefix'. Watch(ctx context.Context, key string, opts ...OpOption) WatchChan // Close closes the watcher and cancels all watch requests. Close() error } ``` Watch 方法返回一个WatchChan 类似的变量,该通道传递WatchResponse类型 ``` //创建监听 wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV()) //range 监听事件 for v := range wc { for _, e := range v.Events { log.Printf("type:%v kv:%v val:%v prevKey:%v \n ", e.Type, string(e.Kv.Key),e.Kv.Value, e.PrevKv) } } ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2615 次点击  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传