## 连接客户端
访问etcd首先要创建client,它需要传入一个Config配置.
`Endpoints`:etcd的多个节点服务地址。
`DialTimeout`:创建client的首次连接超时时间,这里传了5秒,如果5秒都没有连接成功就会返回err;一旦client创建成功,不用再关心后续底层连接的状态了,client内部会重连。
```
cli,err := clientv3.New(clientv3.Config{
Endpoints:[]string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
```
返回的 client,它的类型具体如下:
```
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
Username string
Password string
}
```
类型中的成员是etcd客户端几何核心功能模块的具体实现:
`Cluster`:向集群里增加删除etcd节点,集群管理
`KV`:K-V键值库
`Lease`:租约相关操作,租约过期会删除授权的key
`Watcher`:观察订阅,从而监听最新的数据变化
`Auth`:管理etcd的用户和权限,属于管理员操作
`Maintenance`:维护etcd,比如主动迁移etcd的leader节点
## KV增删改查
### kv对象
```
type KV interface {
// Put puts a key-value pair into etcd.
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
// Get retrieves keys.
Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error)
// Delete deletes a key, or optionally using WithRange(end), [key, end).
Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error)
// Compact compacts etcd KV history before the given rev.
Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error)
// Do applies a single Op on KV without a transaction.
Do(ctx context.Context, op Op) (OpResponse, error)
// Txn creates a transaction.
Txn(ctx context.Context) Txn
}
```
### KV存储读取删除
1. kv对象实例获取 `kv := clientev3.NewKV(client)`
2. 存储Put
```
putResp, err := kv.Put(context.TODO(),"/key/example", "hello")
```
`Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)`
参数介绍:
`ctx`: Context包对象,是用来跟踪上下文的,列如超时控制
`key`: 存储对象的key
`val`: 存储对象的value
`opts`: 可变参数,额外选项
3. 查询Get
`getResp, err := kv.Get(context.TODO(), "/key/example")`
4. 删除Delete
`delREsp, err := kv.Delete(context.TODO(), "/key/example")`
5. WithPrefix
```
rangeResp, err := kv.Get(context.TODO(), "/key/example", clientv3.WithPrefix())
```
WithPrefix()是指查找以/key/example为前缀的所有key
## 租约lease
### 租约对象
```
type Lease interface {
// Grant creates a new lease. 分配租约
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease. 撤销租约
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID. 获取ttl剩余舌尖
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases. 列举所有租约
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever
// 租约自动定时续期
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. In most of the cases, KeepAlive
// should be used instead of KeepAliveOnce.
//续期一次租约
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
```
### 租约使用
1. 获取租约
2. 设置租约ttl
3. 对keyPut时使用租约,租约到期key自动删除
4. 主动给Lease进行续约
```
lease := clientv3.NewLease(client)
grantResp, err := lease.Grant(context.TODO(), 10)
kv.Put(context.TODO(), "/example/expireme", "lease-go", clientv3.WithLease(grantResp.ID))
keepResp, err := lease.KeepAlive(context.TODO(), grantResp.ID)
// sleep一会...
```
## 事务Tnx
etcd中事务是原子执行的,针对kv存储操作的if … then … else结构.将请求归并到一起放在原子块中.事务可以保护key不受其他并发更新操作的影响.
etcd的实现方式有些不同,它的事务是基于cas方式实现的。在事务执行过程中,client和etcd之间没有维护事务会话,在commit事务时,它的“冲突判断(If)和执行过程Then/Else”一次性提交给etcd,etcd来作为一个原子过程来执行“If-Then-Else”。所以,etcd事务不会发生阻塞,无论成功,还是失败,都会立即返回,需要应用进行失败(发生冲突)重试。因此,这也就要求业务代码是可重试的。
```
type Txn interface {
// If takes a list of comparison. If all comparisons passed in succeed,
// the operations passed into Then() will be executed. Or the operations
// passed into Else() will be executed.
If(cs ...Cmp) Txn
// Then takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() succeed.
Then(ops ...Op) Txn
// Else takes a list of operations. The Ops list will be executed, if the
// comparisons passed in If() fail.
Else(ops ...Op) Txn
// Commit tries to commit the transaction.
Commit() (*TxnResponse, error)
}
```
1. 通过kv对象开启一个事务
2. if then else 书写事务
3. 最后Commit提交整个Txn事务
```
txn := kv.Txn(context.TODO())
txnResp, err := txn.If(clientv3.Compare(clientv3.Value("/hi"), "=","hello")).
Then(clientv3.OpGet("/hi")).
Else(clientv3.OpGet("/test/",clientv3.WithPrefix())).
Commit()
if txnResp.Succeeded { // If = true
fmt.Println("~~~", txnResp.Responses[0].GetResponseRange().Kvs)
} else { // If =false
fmt.Println("!!!", txnResp.Responses[0].GetResponseRange().Kvs)
}
```
这个`Value(“/hi”)`返回的Cmp表达了:“`/hi`这个key对应的value”
利用Compare函数来继续为”主语”增加描述, 形成完整的条件语句:/hi这个key对应的value必须等于"hello"
## 监听watch
watch接口
```
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel.
// If the watch is slow or the required rev is compacted, the watch request
// might be canceled from the server-side and the chan will be closed.
// 'opts' can be: 'WithRev' and/or 'WithPrefix'.
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// Close closes the watcher and cancels all watch requests.
Close() error
}
```
Watch 方法返回一个WatchChan 类似的变量,该通道传递WatchResponse类型
```
//创建监听
wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
//range 监听事件
for v := range wc {
for _, e := range v.Events {
log.Printf("type:%v kv:%v val:%v prevKey:%v \n ", e.Type, string(e.Kv.Key),e.Kv.Value, e.PrevKv)
}
}
```
有疑问加站长微信联系(非本文作者))