## 前言
我们在使用go语言来构建分布式应用集群的时候,通常会选择一个分布式协调框架来协调整个分布式集群正常工作(如:服务注册/发现、选主等)。Java语言有hadoop体系中zookeeper(不得不说现在zookeeper太过于臃肿),现在容器(docker)盛行的时代 k8s采用的是etcd来作为自己的协调框架。现在越来越多的项目开始采用etcd来作为自己的服务协调框架。go语言中目前发现基于Raft算法中有[Etcd](https://github.com/etcd-io/etcd)和[Consul](https://github.com/hashicorp/consul)。不得不说下这个hashicorp公司自己的产品中使用[Etcd](https://github.com/etcd-io/etcd) 而不是自己的[Consul](https://github.com/hashicorp/consul)。那我们今天了解下这个大名鼎鼎的[Etcd](https://github.com/etcd-io/etcd) 怎么使用的,本篇涉及到etcd的KV、Lease、Tx等内容。
## 开始
#### 1.初始化
```go
// 定义一个etcd客户端结构体
type Etcd struct {
endpoints []string
client *clientv3.Client
kv clientv3.KV
timeout time.Duration
}
// 定义key变更事件常量
const (
KeyCreateChangeEvent = iota
KeyUpdateChangeEvent
KeyDeleteChangeEvent
)
// key 变化事件
type KeyChangeEvent struct {
Type int
Key string
Value []byte
}
// 监听key 变化响应
type WatchKeyChangeResponse struct {
Event chan *KeyChangeEvent
CancelFunc context.CancelFunc
Watcher clientv3.Watcher
}
type TxResponse struct {
Success bool
LeaseID clientv3.LeaseID
Lease clientv3.Lease
Key string
Value string
}
```
##### 1.1.初始化函数
```go
// create a etcd
func NewEtcd(endpoints []string, timeout time.Duration) (etcd *Etcd, err error) {
var (
client *clientv3.Client
)
conf := clientv3.Config{
Endpoints: endpoints,
DialTimeout: timeout,
}
if client, err = clientv3.New(conf); err != nil {
return
}
etcd = &Etcd{
endpoints: endpoints,
client: client,
kv: clientv3.NewKV(client),
timeout: timeout,
}
return
}
```
#### 2.Get操作
##### 2.1.根据key获取value
```go
// get value from a key
func (etcd *Etcd) Get(key string) (value []byte, err error) {
var (
getResponse *clientv3.GetResponse
)
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
if getResponse, err = etcd.kv.Get(ctx, key); err != nil {
return
}
if len(getResponse.Kvs) == 0 {
return
}
value = getResponse.Kvs[0].Value
return
}
```
##### 2.2.根据key前缀获取value列表
```go
// get values from prefixKey
func (etcd *Etcd) GetWithPrefixKey(prefixKey string) (keys [][]byte, values [][]byte, err error) {
var (
getResponse *clientv3.GetResponse
)
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
if getResponse, err = etcd.kv.Get(ctx, prefixKey, clientv3.WithPrefix()); err != nil {
return
}
if len(getResponse.Kvs) == 0 {
return
}
keys = make([][]byte, 0)
values = make([][]byte, 0)
for i := 0; i < len(getResponse.Kvs); i++ {
keys = append(keys, getResponse.Kvs[i].Key)
values = append(values, getResponse.Kvs[i].Value)
}
return
}
```
##### 2.3.根据key前缀获取指定条数
```go
// get values from prefixKey limit
func (etcd *Etcd) GetWithPrefixKeyLimit(prefixKey string, limit int64) (keys [][]byte, values [][]byte, err error) {
var (
getResponse *clientv3.GetResponse
)
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
if getResponse, err = etcd.kv.Get(ctx, prefixKey, clientv3.WithPrefix(), clientv3.WithLimit(limit)); err != nil {
return
}
if len(getResponse.Kvs) == 0 {
return
}
keys = make([][]byte, 0)
values = make([][]byte, 0)
for i := 0; i < len(getResponse.Kvs); i++ {
keys = append(keys, getResponse.Kvs[i].Key)
values = append(values, getResponse.Kvs[i].Value)
}
return
}
```
#### 3.Put操作
##### 3.1.put一个值
```go
// put a key
func (etcd *Etcd) Put(key, value string) (err error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
if _, err = etcd.kv.Put(ctx, key, value); err != nil {
return
}
return
}
```
##### 3.2.Put一个不存在的值
```go
// put a key not exist
func (etcd *Etcd) PutNotExist(key, value string) (success bool, oldValue []byte, err error) {
var (
txnResponse *clientv3.TxnResponse
)
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
txn := etcd.client.Txn(ctx)
txnResponse, err = txn.If(clientv3.Compare(clientv3.Version(key), "=", 0)).
Then(clientv3.OpPut(key, value)).
Else(clientv3.OpGet(key)).
Commit()
if err != nil {
return
}
if txnResponse.Succeeded {
success = true
} else {
oldValue = make([]byte, 0)
oldValue = txnResponse.Responses[0].GetResponseRange().Kvs[0].Value
}
return
}
```
##### 3.3.更新一个已经存在的值
```go
func (etcd *Etcd) Update(key, value, oldValue string) (success bool, err error) {
var (
txnResponse *clientv3.TxnResponse
)
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
txn := etcd.client.Txn(ctx)
txnResponse, err = txn.If(clientv3.Compare(clientv3.Value(key), "=", oldValue)).
Then(clientv3.OpPut(key, value)).
Commit()
if err != nil {
return
}
if txnResponse.Succeeded {
success = true
}
return
}
```
#### 4.Delete操作
##### 4.1.根据key删除
```go
func (etcd *Etcd) Delete(key string) (err error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
_, err = etcd.kv.Delete(ctx, key)
return
}
```
##### 4.2.根据一个key前缀删除
```go
// delete the keys with prefix key
func (etcd *Etcd) DeleteWithPrefixKey(prefixKey string) (err error) {
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
_, err = etcd.kv.Delete(ctx, prefixKey, clientv3.WithPrefix())
return
}
```
#### 5.Watch
##### 5.1.watch 一个key
```go
// watch a key
func (etcd *Etcd) Watch(key string) (keyChangeEventResponse *WatchKeyChangeResponse) {
watcher := clientv3.NewWatcher(etcd.client)
watchChans := watcher.Watch(context.Background(), key)
keyChangeEventResponse = &WatchKeyChangeResponse{
Event: make(chan *KeyChangeEvent, 250),
Watcher: watcher,
}
go func() {
for ch := range watchChans {
if ch.Canceled {
goto End
}
for _, event := range ch.Events {
etcd.handleKeyChangeEvent(event, keyChangeEventResponse.Event)
}
}
End:
log.Println("the watcher lose for key:", key)
}()
return
}
// handle the key change event
func (etcd *Etcd) handleKeyChangeEvent(event *clientv3.Event, events chan *KeyChangeEvent) {
changeEvent := &KeyChangeEvent{
Key: string(event.Kv.Key),
}
switch event.Type {
case mvccpb.PUT:
if event.IsCreate() {
changeEvent.Type = KeyCreateChangeEvent
} else {
changeEvent.Type = KeyUpdateChangeEvent
}
changeEvent.Value = event.Kv.Value
case mvccpb.DELETE:
changeEvent.Type = KeyDeleteChangeEvent
}
events <- changeEvent
}
```
##### 5.2.watch一个key前缀
```go
// watch with prefix key
func (etcd *Etcd) WatchWithPrefixKey(prefixKey string) (keyChangeEventResponse *WatchKeyChangeResponse) {
watcher := clientv3.NewWatcher(etcd.client)
watchChans := watcher.Watch(context.Background(), prefixKey, clientv3.WithPrefix())
keyChangeEventResponse = &WatchKeyChangeResponse{
Event: make(chan *KeyChangeEvent, 250),
Watcher: watcher,
}
go func() {
for ch := range watchChans {
if ch.Canceled {
goto End
}
for _, event := range ch.Events {
etcd.handleKeyChangeEvent(event, keyChangeEventResponse.Event)
}
}
End:
log.Println("the watcher lose for prefixKey:", prefixKey)
}()
return
}
// handle the key change event
func (etcd *Etcd) handleKeyChangeEvent(event *clientv3.Event, events chan *KeyChangeEvent) {
changeEvent := &KeyChangeEvent{
Key: string(event.Kv.Key),
}
switch event.Type {
case mvccpb.PUT:
if event.IsCreate() {
changeEvent.Type = KeyCreateChangeEvent
} else {
changeEvent.Type = KeyUpdateChangeEvent
}
changeEvent.Value = event.Kv.Value
case mvccpb.DELETE:
changeEvent.Type = KeyDeleteChangeEvent
}
events <- changeEvent
}
```
#### 6.Lease的使用
##### 6.1.创建一个指定时间的临时key
```go
func (etcd *Etcd) TxWithTTL(key, value string, ttl int64) (txResponse *TxResponse, err error) {
var (
txnResponse *clientv3.TxnResponse
leaseID clientv3.LeaseID
v []byte
)
lease := clientv3.NewLease(etcd.client)
grantResponse, err := lease.Grant(context.Background(), ttl)
leaseID = grantResponse.ID
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
txn := etcd.client.Txn(ctx)
txnResponse, err = txn.If(
clientv3.Compare(clientv3.Version(key), "=", 0)).
Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID))).Commit()
if err != nil {
_ = lease.Close()
return
}
txResponse = &TxResponse{
LeaseID: leaseID,
Lease: lease,
}
if txnResponse.Succeeded {
txResponse.Success = true
} else {
// close the lease
_ = lease.Close()
v, err = etcd.Get(key)
if err != nil {
return
}
txResponse.Success = false
txResponse.Key = key
txResponse.Value = string(v)
}
return
}
```
###### 6.2.创建一个不间断续约的临时key
```go
func (etcd *Etcd) TxKeepaliveWithTTL(key, value string, ttl int64) (txResponse *TxResponse, err error) {
var (
txnResponse *clientv3.TxnResponse
leaseID clientv3.LeaseID
aliveResponses <-chan *clientv3.LeaseKeepAliveResponse
v []byte
)
lease := clientv3.NewLease(etcd.client)
grantResponse, err := lease.Grant(context.Background(), ttl)
leaseID = grantResponse.ID
if aliveResponses, err = lease.KeepAlive(context.Background(), leaseID); err != nil {
return
}
go func() {
for ch := range aliveResponses {
if ch == nil {
goto End
}
}
End:
log.Printf("the tx keepalive has lose key:%s", key)
}()
ctx, cancelFunc := context.WithTimeout(context.Background(), etcd.timeout)
defer cancelFunc()
txn := etcd.client.Txn(ctx)
txnResponse, err = txn.If(
clientv3.Compare(clientv3.Version(key), "=", 0)).
Then(clientv3.OpPut(key, value, clientv3.WithLease(leaseID))).
Else(
clientv3.OpGet(key),
).Commit()
if err != nil {
_ = lease.Close()
return
}
txResponse = &TxResponse{
LeaseID: leaseID,
Lease: lease,
}
if txnResponse.Succeeded {
txResponse.Success = true
} else {
// close the lease
_ = lease.Close()
txResponse.Success = false
if v, err = etcd.Get(key); err != nil {
return
}
txResponse.Key = key
txResponse.Value = string(v)
}
return
}
```
有疑问加站长微信联系(非本文作者))