一、分布式 id 生成器
在高并发场景中,通常需要类似 MySQL 自增 id 一样不断增长且不会重复的 id。
比如某电商双 11 时,在 0:00 开始,会有千万到亿级的订单涌入,每秒要处理 10w+ 的订单。在将订单插入数据库前,我们需要给订单一个唯一的 id 再插入数据库内。也正因为订单量大,一个无意义的纯数字 id 在对数据库进行增删改查时不能起到优化作用。此 id 内应该包含一些时间信息,这样即使后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序。
Twitter 的 snowflake 算法是这种场景下的一个典型解法,原理如图:
首先确定的是,id 数值长度是64位,int64 类型,被分为四个部分(不含开头的符号 / unused ):
- 41 位来表示收到请求时的时间戳,单位为毫秒
- 5 位表示数据中心的 id
- 5 位表求机器的实例 id
- 12 位为循环自增 id
- 到达
1111,1111,1111
后归0
- 到达
这样的机制可以支持一台机器在一毫秒内能够产生4096
条消息。一秒共 409.6w 条消息。从值域上来讲完全够用。
数据中心 id 加上实例 id 共有 10 位,每个数据中心可以部署 32 台实例,搭建 32 个数据中心,所以可以一共部署 1024 台实例。
41 位的时间戳(毫秒为单位)能够使用 69 年。
1 worker_id 分配
timestamp
,datacenter_id
,worker_id
和sequence_id
这四个字段中,timestamp
和sequence_id
是由程序在运行期生成的。但datacenter_id
和worker_id
需要在部署阶段就能够获取得到,并且一旦程序启动之后,就是不可更改的了。如果可以随意更改,可能被不慎修改,造成最终生成的 id 有冲突。
一般不同数据中心的机器,会提供对应的获取数据中心 id 的 API,所以datacenter_id
我们可以在部署阶段轻松地获取到。而 worker_id 是我们逻辑上给机器分配的一个 id,这个要怎么办呢?比较简单的想法是由能够提供这种自增 id 功能的工具来支持,比如 MySQL:
mysql> insert into a (ip) values("10.1.2.101");
Query OK, 1 row affected (0.00 sec)
mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
| 2 |
+------------------+
1 row in set (0.00 sec)
从 MySQL 中获取到worker_id
之后,就把这个worker_id
直接持久化到本地,以避免每次上线时都需要获取新的worker_id
。让单实例的worker_id
可以始终保持不变。
当然,使用 MySQL 相当于给我们简单的 id 生成服务增加了一个外部依赖。依赖越多,我们的服务的可运维性就越差。
考虑到集群中即使有单个id生成服务的实例挂了,也就是损失一段时间的一部分 id,所以我们也可以更简单暴力一些,把worker_id
直接写在 worker 的配置中,上线时,由部署脚本完成worker_id
字段替换。
2 开源实例
2.1 标准 snowflake
github.com/bwmarrin/snowflake
是一个相当轻量化的 snowflake 的 Go 实现。其文档对各位使用的定义如下图所示:
此库和标准的 snowflake 实现方式全完一致,使用也比较简单:
package main
import (
// 示例代码只导入提供核心功能的package,其他内置package自行导入,下同
"github.com/bwmarrin/snowflake"
)
func main() {
node, err := snowflake.NewNode(1)
if err != nil {
println(err.Error())
os.Exit(1)
}
for i := 0; i < 20; i++ {
id := node.Generate()
fmt.Printf("Int64 ID: %d\n", id)
fmt.Printf("String ID: %s\n", id)
fmt.Printf("Base2 ID: %s\n", id.Base2())
fmt.Printf("Base64 ID: %s\n", id.Base64())
fmt.Printf("ID Time : %d\n", id.Time())
fmt.Printf("ID Node : %d\n", id.Node())
fmt.Printf("ID Step : %d\n", id.Step())
fmt.Println("---------------------------------------------")
}
}
这个库因为是单文件,所以也方便定制,其源码本身就预留了一些可定制字段:
var (
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC in milliseconds
// You may customize this to set a different epoch for your application.
Epoch int64 = 1288834974657
// NodeBits holds the number of bits to use for Node
// Remember, you have a total 22 bits to share between Node/Step
NodeBits uint8 = 10
// StepBits holds the number of bits to use for Step
// Remember, you have a total 22 bits to share between Node/Step
StepBits uint8 = 12
)
Epoch
为起始时间,NodeBits
是实例 id 的位长,StepBits
是自增 id 的位长。
2.2 sonyflake
sonyflake 同样是受 Twitter 的 snowflake 启发而来,但 sonyflake 侧重于多主机/实例的生命周期和性能,所以与 snowflake 使用不同的位分配:
sonyflake 的优点和缺点:
- 比 snowflake 更长的生命周期,174年 > 69年
- 能运行在更多的实例上, >
- 生成 ID 速度比 snowflake 慢,10 毫秒内最多生成 个(snowflake 1 毫秒内最多生成 个)
sonyflake 在启动阶段需要配置参数:
func NewSonyflake(st Settings) *Sonyflake
Settings
数据结构如下:
type Settings struct {
StartTime time.Time
MachineID func() (uint16, error)
CheckMachineID func(uint16) bool
}
-
StartTime
起始时间,默认值为2014-09-01 00:00:00 +0000 UTC
-
MachineID
是一个返回实例 ID 的函数,如果不定义此函外,默认用本机 ip 的低16位 -
CheckMachineID
验证实例 ID / 计算机 ID 的唯一性,返回true
时才创建
使用示例:
package main
import (
"github.com/sony/sonyflake"
)
func getMachineID() (uint16, error) {
var machineID uint16 = 6
return machineID, nil
}
func checkMachineID(machineID uint16) bool {
existsMachines := []uint16{1, 2, 3, 4, 5}
for _, v := range existsMachines {
if v == machineID {
return false
}
}
return true
}
func main() {
t, _ := time.Parse("2006-01-02", "2021-01-01")
settings := sonyflake.Settings{
StartTime: t,
MachineID: getMachineID,
CheckMachineID: checkMachineID,
}
sf := sonyflake.NewSonyflake(settings)
for i := 0; i < 10; i++ {
id, err := sf.NextID()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
fmt.Println(id)
}
}
二、分布式锁
单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。如果不加锁,得到的结果将不准确,如:
package main
import (
"sync"
)
// 全局变量
var counter int
func main() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++
}()
}
wg.Wait()
println(counter)
}
多次运行结果不同:
➜ go run main.go
884
➜ go run main.go
957
➜ go run main.go
923
1 进程内加锁
想要得到正确的结果,要把计数器的操作代码部分加上锁:
var wg sync.WaitGroup
var lock sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
lock.Lock()
counter++
lock.Unlock()
}()
}
wg.Wait()
println(counter)
这样能够得到正确结果:
➜ go run main.go
1000
2 trylock
在某些场景,我们只希望一个任务有单一的执行者,而不像计数器一样,所有的 Goroutine 都成功执行。后续的 Goroutine 在抢锁失败后,需要放弃执行,这时候就需要尝试加锁 / trylock。
尝试加锁,在加锁成功后执行后续流程,失败时不可以阻塞,而是直接返回加锁的结果。
在 Go 语言中可以用大小为 1 的 Channel 来模拟 trylock:
// Lock try lock
type Lock struct {
c chan struct{}
}
// Lock try lock, return lock result
func (l Lock) Lock() bool {
result := false
select {
case <-l.c:
result = true
default:
}
return result
}
// Unlock the try lock
func (l Lock) Unlock() {
l.c <- struct{}{}
}
// NewLock generate a try lock
func NewLock() Lock {
var l Lock
l.c = make(chan struct{}, 1)
l.c <- struct{}{}
return l
}
func main() {
var lock = NewLock()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if !lock.Lock() {
println("lock failed")
return
}
counter++
println("current counter: ", counter)
lock.Unlock()
}()
}
wg.Wait()
}
每个 Goruntine 只有成功执行了Lock()
才会继续执行后续代码,因此在Unlock()
时可以保证 Lock
结构体里的 Channel 一定是空的,所以不会阻塞也不会失败。
在单机系统中,trylock 并不是一个好选择,因为大量的 Goruntine 抢锁会无意义地占用 cpu 资源,这就是活锁。
活锁指是的程序看起来在正常执行,但 cpu 周期被浪费在抢锁而非执行任务上,从而程序整体的执行效率低下。活锁的问题定位起来很麻烦,所以在单机场景下,不建议使用这种锁。
3 基于 Redis 的 setnx
在分布式场景中,也需要“抢占”的逻辑,可以用 Redis 的setnx
实现:
package main
import (
"github.com/go-redis/redis"
)
func setnx() {
client := redis.NewClient(&redis.Options{})
var lockKey = "counter_lock"
var counterKey = "counter"
// lock
resp := client.SetNX(lockKey, 1, time.Second*5)
lockStatus, err := resp.Result()
if err != nil || !lockStatus {
println("lock failed")
return
}
// counter++
getResp := client.Get(counterKey)
cntValue, err := getResp.Int64()
if err == nil || err == redis.Nil {
cntValue++
resp := client.Set(counterKey, cntValue, 0)
_, err := resp.Result()
if err != nil {
println(err)
}
}
println("current counter is ", cntValue)
// unlock
delResp := client.Del(lockKey)
unlockStatus, err := delResp.Result()
if err == nil && unlockStatus > 0 {
println("unlock success")
} else {
println("unlock failed", err)
}
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
setnx()
}()
}
wg.Wait()
}
运行结果:
➜ go run main.go
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
lock failed
current counter is 34
lock failed
unlock success
通过代码和执行结果可以看到,远程调用setnx
运行流程上和单机的 trolock 非常相似,如果获取锁失败,那么相关的任务逻辑就不会继续向后执行。
setnx
很适合高并发场景下用来争抢一些“唯一”的资源。比如交易摄合系统中卖家发起订单,多个买家会对其进行并发争抢。这种场景我们没有办法依赖具体的时间来判断先后,因为不同设备的时间不能保证使用的是统一的时间,也就不能保证时序。
所以,我们需要依赖于这些请求到达 redis 节点的顺序来做正确的抢锁操作。
如果用户的网络环境比较差,是必然抢不到的。
4 基于 ZooKeeper
基于 ZooKeeper 的锁与基于 Redis 的锁不同之处在于 Lock 成功之前会一直阻塞,这与单机场景中的mutex.Lock
很相似。
package main
import (
"github.com/go-zookeeper/zk"
)
func main() {
c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second)
if err != nil {
panic(err)
}
l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
err = l.Lock()
if err != nil {
panic(err)
}
println("lock success, do your business logic")
time.Sleep(time.Second * 10) // 模拟业务处理
l.Unlock()
println("unlock success, finish business logic")
}
其原理也是基于临时 Sequence 节点和 watch API,例如我们这里使用的是/lock
节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有 watch 该节点的程序。这时候程序会检查当前节点下最小的子节点的 id 是否与自己的一致。如果一致,说明加锁成功了。
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照 Google 的 Chubby 论文里的阐述,基于强一致协议的锁适用于粗粒度
的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。
5 基于 etcd
etcd 是与 ZooKeeper 类似的分布式组件,也能实现与 ZooKeeper 锁相似的功能:
package main
import (
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
func main() {
c := make(chan os.Signal)
signal.Notify(c)
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
defer client.Close()
lockKey := "/lock"
go func() {
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.Background()); err != nil {
panic("go1 get mutex failed " + err.Error())
}
fmt.Printf("go1 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(10) * time.Second)
m.Unlock(context.Background())
fmt.Printf("go1 release lock\n")
}()
go func() {
time.Sleep(time.Duration(2) * time.Second)
session, err := concurrency.NewSession(client)
if err != nil {
panic(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.Background()); err != nil {
panic("go2 get mutex failed " + err.Error())
}
fmt.Printf("go2 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(2) * time.Second)
m.Unlock(context.Background())
fmt.Printf("go2 release lock\n")
}()
<-c
}
三、延时任务系统
实时处理并反馈给用户是系统的主要内容,但有时也会遇到非实时的任务需求,比如在确定的时间点发布重要公告,或者需要在用户做了一件事情的几分钟后对其作出特定动作,比如发个优惠券。
如果业务规模比较小,有时也可以通过数据库配置轮询来对这种任务进行简单处理,但一旦上了规模,就要寻找更好的解决方案。
一般有两种思路来解决这个问题:
- 实现一套类似 crontab 的分布式定时任务管理系统
- 实现一个支持定时发送消息的消息队列
两种思路衍生出了一些不同的系统,但本质都差不多,都是需要实现一个定时器(timer)。
在单机的场景下定时器其实并不少见,如在和网络数据库连接时会调用SetReadDeadline()
函数,就是在本地创建一个定时器,在到达指定的时间后,我们会接收到定时器的通知,告诉我们时间已到,如果此时读取未完成,就可以认为是发生了网络问题,从而中断读取。
下面从定时器开始, 探究延时任务系统的实现。
1 定时器的实现
定时器(timer)的实现在工业界已经是有解的问题了,常见的就是时间堆(堆排序)和时间轮。
1.1 时间堆
堆是一种经过排序的完全二叉树,其中任一非终端节点的数据值均不大于(或不小于)其左子节点和右子节点的值。
最常见的时间堆一般用最小堆实现。
最小堆:根节点的键值是所有堆节点键值中最小者的堆
如下图:
对于定时器来说,如果堆顶元素比当前的时间还要大,说明堆项内所有元素都比当前时间大,进而说明这个时刻我们还没有必要对时间堆进行任何处理。定时检查的时间复杂度是 。
当我们发现堆项的元素小于当前时间时,说明可能已经有一批事件过期了,这时进行正常的弹出的堆调操作就好。每次堆调整的时间复杂度是 。
Go 自身的内置定时器就是用时间堆实现的,不过使用的不是二叉树堆,而是扁平一些的四叉堆:
性质:父节点比其4个子节点都小,子节点之间没有特别的大小关系要求。
四叉堆中元素超时和堆调整与二叉堆没有本质区别。
1.2 时间轮
用时间轮来实现定时器时,需要定义每一个格子的“刻度”,可以将时间想象成一个时钟,中心有秒针顺时针转动,每次转动到一个刻度时,需要去查看该刻度挂载的任务列表是否已经有到期的任务。
从结构上来讲,时间轮和哈希表很相似,如果把哈希算法定义为:触发时间 % 时间轮元素大小,那么这就是一个简单的哈希表。在哈希冲突时,采用链表挂载哈希冲突的定时器。
除了这种单层时间轮,还有一些时间轮采用多层实现。
用 Go 实现的时间轮项目不多,下面这个是其中性能较好的时间轮的延时任务示例:
package main
import (
"github.com/antlabs/timer"
)
// 一次性定时器
func after(tm timer.Timer) {
var wg sync.WaitGroup
wg.Add(2)
defer wg.Wait()
go func() {
defer wg.Done()
tm.AfterFunc(1*time.Second, func() {
log.Printf("after 1 second\n")
})
}()
go func() {
defer wg.Done()
tm.AfterFunc(10*time.Second, func() {
log.Printf("after 10 seconds\n")
})
}()
go func() {
defer wg.Done()
tm.AfterFunc(30*time.Second, func() {
log.Printf("after 30 seconds\n")
})
}()
go func() {
defer wg.Done()
tm.AfterFunc(time.Minute, func() {
log.Printf("after 1 minute\n")
})
}()
go func() {
defer wg.Done()
tm.AfterFunc(time.Minute+30*time.Second, func() {
log.Printf("after 1 minute and 30 seconds\n")
})
}()
go func() {
defer wg.Done()
tm.AfterFunc(2*time.Minute+45*time.Second, func() {
log.Printf("after 2 minutes and 45 seconds\n")
})
}()
}
// 周期性定时器
func schedule(tm timer.Timer) {
tm.ScheduleFunc(500*time.Millisecond, func() {
log.Printf("schedule 500 milliseconds\n")
})
tm.ScheduleFunc(time.Second, func() {
log.Printf("schedule 1 second\n")
})
tm.ScheduleFunc(20*time.Second, func() {
log.Printf("schedule 20 seconds\n")
})
tm.ScheduleFunc(1*time.Minute, func() {
log.Printf("schedule 1 minute\n")
})
}
func main() {
tm := timer.NewTimer()
defer tm.Stop()
// go after(tm)
go schedule(tm)
go func() {
time.Sleep(2*time.Minute + 50*time.Second)
tm.Stop()
}()
tm.Run()
}
2 任务分发
有了基本的定时器实现方案,如果我们开发的是单机系统,就可以开始干活了,但如果是分布式,还需要把“定时”或“延时”的任务分发出去。
示例图:
每个实例每隔一个小时,会从数据库里获取下一个小时需要处理的定时任务,获取的时候只取符合task_id % shard_count = shard_id
的任务。
当这些定时任务被触发之后需要通知用户侧,有两种思路实现:
- 将任务被触发的信息封装成一条消息,发往消息队列,由用户侧对消息队列进行监听
- 对用户预先配置的回调函数进行调用
两种方案各有优缺点,如果采用1
,那么如果消息队列出现故障会导致整个系统不可用,当然,现在的队列消息一般都是高可用,大多数时候不用担心这个问题。其次,一般业务流程中间走消息队列的话会导致延时增加,定时任务若必须在触发后的几十毫秒到几百毫秒内完成的话,采取消息队列就有一定的风险。
如果采用2
,会加重定时任务系统的负担。单机的定时器执行时最害怕的就是回调函数执行时间过长,这样会阻塞后续的任务执行。在分布式场景下,这种忧虑依然存在——一个不严谨的业务回调可能会直接拖垮整个定时任务系统。所以我们还要考虑在回调的基础上增加经过测试的超时时间设置,并且对由用户填入的超时时间做慎重的审核。
3 数据再平衡的幂等考量
当任务执行集群的机器故障时,需要对任务进行重新分配。按照之前的求模策略,对这台机器还没有处理的任务进行重新分配就比较麻烦,如果实际运行的线上系统,还要在故障时的任务平衡方面花更多的心思。
参考 Elasticsearch 的数据分布设计,每份任务数据都有多个副本,这里假设两副本,如下图所示:
一份数据虽然有两个持有者同,但持有者持有的副本会进行区分,比如持有的是主副本还是非主副本。主副本边框加粗,非主副本是正常线条。
一个任务只会在持有主副本的节点上被执行。
当有机器故障时,任务数据需要进行数据再平衡的工作,比如节点1挂了:
节点 1 的数据会被迁移到节点 2 和节点 3 上。
当然,也可以用稍微复杂一点的思路,比如对集群中的节点进行角色划分,由协调节点来做这种故障时的任务重新分配工作,考虑到高可用,协调节点可能也需要有 1 至 2 个备用节点以防不测。
使用消息队列时,很多队列不支持 exactly once 语义,这种情况下需要让用户自己来负责消息的去重或者消费的幂等处理。
四、分布式搜索引擎
MySQL 很脆弱,数据库系统本身要保证实时和强一致性,所以其功能设计上都是为了满足这种一致性需求。比如 write ahead log 的设计,基于 B+ 树实现的索引和数据组织,以及基于 MVCC 实现的事务等等。
关系型数据库一般被用于实现 OLTP 系统:
联机事务处理(OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以在线交易的方式处理一般即时性的作业资料,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP通常被运用于自动化的资料处理工作,如订单输入、金融业务…等反复性的日常性交易活动。 和其相对的是属于决策分析层次的联机分析处理(OLAP)。
在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。比如,在电商的 WMS 系统中,或者在大多数业务场景丰富的 CRM 或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的 WMS 中对一件货物的描述,可能有下面这些字段:
仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,SKU类型,产品品牌,产品分类,内件数量
除了上述信息,如果商品在仓库内有流转。可能还有关联的流程 id,当前的流转状态等。
如果经营一个大型电商,每天千万级别的订单,那么在这个数据库中查询和建立合适的索引都是一件非常难的事情。
在 CRM 或客服类系统中,常常有根据关键字进行搜索的需求,大型互联网公司每天接收数以万计的用户请求。考虑到事件溯源,用户的请求至少要保存 2~3 年,这些数据是千万级甚至上亿级的数据,如果根据关键字进行一次 like 查询,可能整个 MySQL 就崩溃了。
这时,就需要搜索引擎解决。
1 搜索引擎
Elasticsearch
是开源分布式搜索引擎的霸主,其依赖于 Lucene 实现,在部署和运维方面做了很多优化。当今建立一个分布式搜索引擎比起 Sphinx 的时代已经容易很多了,只需要简单配置客户端 IP 和端口就行。
1.1 倒排列表
虽然Elasticsearch
是针对搜索场景来定制的,但如前文所言,实际应用中常常用Elasticsearch
来作为数据库使用,就是因为倒排列表的特性。可以用比较朴素的观点来理解倒排索引:
对Elasticsearch
中的数据进行查询时,本质就是求多个排好序的序列求交集。非数值类型字段涉及到分词问题,大多数内部使用场景下,可以直接使用默认的 bi-gram 分词:
即将所有Ti
和T(i+1)
组成一个词(在Elasticsearch
中叫 term),然后再编排其倒排列表,这样倒排列表大概就是这样的:
当用户搜索“天气很好”时,其实就是求:天气、气很、很好三组倒排列表的交集。但这里的相等判断逻辑有些特殊,用伪代码表求:
func equal() {
if postEntry.docID of '天气' == postEntry.docID of '气很' &&
postEntry.offset + 1 of '天气' == postEntry.offset of '气很' {
return true
}
if postEntry.docID of '气很' == postEntry.docID of '很好' &&
postEntry.offset + 1 of '气很' == postEntry.offset of '很好' {
return true
}
if postEntry.docID of '天气' == postEntry.docID of '很好' &&
postEntry.offset + 2 of '天气' == postEntry.offset of '很好' {
return true
}
return false
}
多个有序列表求交集的时间复杂度是 , 为给定列表中元素数最小的集合, 为给定列表的个数。
在整个算法中起决定作用的一是最短的倒排列表的长度,其次是词数总和,一般词数不会很大,所以起决定性作用的,一般是所有倒排列表中最短的那一个的长度。
因此,文档总数很多的情况下,搜索词的倒排列表最短的那一个不长时,搜索速度也会很快。如果用关系型数据库,那就需要按照索引来慢慢扫描。
2 查询 DSL
Elasticsearch
定义了一套查询 DSL,当我们把Elasticsearch
当数据库使用时,需要用到其 bool 查询,如:
{
"query": {
"bool": {
"must": [
{
"match": {
"field_1": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"field_2": {
"query": "2",
"type": "phrase"
}
}
},
{
"match": {
"field_3": {
"query": "3",
"type": "phrase"
}
}
},
{
"match": {
"field_4": {
"query": "4",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
看起来比较麻烦,但表达的意思很简单:
if field_1 == 1 && field_2 == 2 && field_3 == 3 && field_4 == 4 {
return true
}
用 bool should query 可以表示 or 的逻辑:
{
"query": {
"bool": {
"should": [
{
"match": {
"field_1": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"field_2": {
"query": "3",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
这里表示的是类似:
if field_1 == 1 || field_2 == 2 {
return true
}
这些 Go 代码里 if 后面跟着的表达式在编程语言中有专有名词来表达Boolean Expression
:
4 > 1
5 == 2
3 < i && x > 10
Elasticsearch
的Bool Query
方案,就是用 json 来表达了这种程序语言中的 Boolean Expression,为什么可以这么做呢?因为 json 本身是可以表达树形结构的,程序代码在被编绎器 parse 后,也会变成 AST,而 AST 抽象语法树就是树形结构。理论上 json 能够完备地表达一段程序代码被parse 之后的结果。这里的 Boolean Expression 被编绎器 parse 后也会生成差不多的树形结构,而且只是整个编绎器实现的一个很小的子集。
3 基于client SDK开发
初始化:
// 选用 elastic 版本时
// 注意与自己使用的 elasticsearch 要对应
import (
elastic "gopkg.in/olivere/elastic.v3"
)
var esClient *elastic.Client
func initElasticsearchClient(host string, port string) {
var err error
esClient, err = elastic.NewClient(
elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
elastic.SetMaxRetries(3),
)
if err != nil {
// log error
}
}
插入:
func insertDocument(db string, table string, obj map[string]interface{}) {
id := obj["id"]
var indexName, typeName string
// 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type
// 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段
// 所以单个 index 内容过多会导致性能问题
// 在新版本中 type 已经废弃
// 为了让不同表的数据落入不同的 index,这里我们用 table+name 作为 index 的名字
indexName = fmt.Sprintf("%v_%v", db, table)
typeName = table
// 正常情况
res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
if err != nil {
// handle error
} else {
// insert success
}
}
获取:
func query(indexName string, typeName string) (*elastic.SearchResult, error) {
// 通过 bool must 和 bool should 添加 bool 查询条件
q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))
q = q.Should(
elastic.NewMatchPhraseQuery("name", "alex"),
elastic.NewMatchPhraseQuery("name", "xargin"),
)
searchService := esClient.Search(indexName).Type(typeName)
res, err := searchService.Query(q).Do()
if err != nil {
// log error
return nil, err
}
return res, nil
}
删除:
func deleteDocument(
indexName string, typeName string, obj map[string]interface{},
) {
id := obj["id"]
res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
if err != nil {
// handle error
} else {
// delete success
}
}
因为 Lucene 的性质,本质上搜索引擎内的数据是不可变的,所以如果要对文档进行更新,Lucene 内部是按照 id 进行完全覆盖(本质是取同一 id 最新的 segment 中的数据)的操作,所以与插入的情况是一样的。
使用Elasticsearch
作为数据库使用时,需要注意,因为Elasticsearch
有索引合并的操作,所以从数据插入到Elasticsearch
中到可以查询需要一段时间(由Elasticsearch
的 refresh_interval 决定)。所以千万不要把Elasticsearch
当成强一致的关系型数据库使用。
4 将 sql 转换为 DSL
比如一段 bool 表达式user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1)
,写成 SQL 是如下形式:
select * from xxx
where user_id = 1 and
(
product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1
)
写成Elasticsearch
的 DSL 是如下形式:
{
"query": {
"bool": {
"must": [
{
"match": {
"user_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"product_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"bool": {
"should": [
{
"match": {
"star_num": {
"query": "4",
"type": "phrase"
}
}
},
{
"match": {
"star_num": {
"query": "5",
"type": "phrase"
}
}
}
]
}
},
{
"match": {
"banned": {
"query": "1",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
Elasticsearch
的 DSL 虽然很好理解,但是手写起来非常费劲。前面提供了基于 SDK 的方式来写,但也不足够灵活。
SQL 的 where 部分就是 boolean expression。这种 bool 表达式被解析后,和Elasticsearch
的 DSL 结构长得差不多,所以可以直接将 SQL 转换为 DSL。
SQL 的 where 被 parse 之后的结构和Elasticsearch
的 DSL 结构对比:
既然结构上完全一致,逻辑上就可以相互转换。以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,参考:github.com/cch123/elasticsql
5 异构数据同步
在实际应用中,很少直接向搜索引擎中写入数据,更为常见方式是,将 MySQL 或其他关系型数据中的数据同步到搜索引擎中,而搜索引擎的使用方式只能对数据进行查询,无法进行修改和删除。
常见的同步方案有两种。
5.1 通过时间戳进行增量数据同步
这种同步方式与业务强绑定,倒如 WMS 系统中的出库单,并不需要非常实时,稍微有延迟也可以接受,每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入Elasticsearch
中,取数据的操作需要执行的逻辑可以表达为下面的 SQL:
select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);
当然,考虑到边界情况,可以让这个时间段的数据与前一次的有一些重叠:
select * from wms_orders where update_time >= date_sub(
now(), interval 11 minute
);
取最近 11 分钟有变动的数据覆盖更新到Elasticsearch
中,这种方案的缺点显而易见,必须要求业务数据严格遵守一定的规范。比如必须有update_time 字段,并且每次创建和更新都要保证该字段有正确的时间值,否则同步逻辑就会丢失数据。
5.2 通过 binlog 进行数据同步
业界使用较多的是阿里开源的 Canal 来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog,再以更容易解析的格式(如 json)发送到消息队列。
由下游的 kafka 消费者负责把上游数据表的自增主键作为Elasticsearch
的文档的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 Row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。
这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入Elasticsearch
的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去定制消费者的逻辑,但这不是通用系统讨论的范畴。
五、负载均衡
1 常见的负载均衡思路
如果不考虑均衡的话,现在有 n 个服务节点,完成业务流程只需要从这 n 个中挑选其中的一个,有几种思路:
- 按顺序挑:例如上次选了第一台,这次就选第二台,下次选第三台,依此类推。如果已经到了最后一台,那么下一次从第一台开始。这种情况下可以把服务节点信息都存储在数组中,每次请求完成下游之后,将一个索引后移即可,在移到尽头时再移回数组开头。
- 随机挑一个:每次都随机挑,真随机伪随机均可。假设选第 x 台机器,那么 x 可描述为
rand.Intn() % n
。 - 根据某种权重,对下游节点进行排序,选择权重最大或最小的那一个。
当然,实际场景不可能无脑轮询或者无脑随机,如果对下游请求失败了,还需要某种机制来进行重试。如果纯粹随机,存在一定的可能性让下一次选择的节点仍是问题节点。
2 基于洗牌算法的负载均衡
如果随机选取每次发送请求的节点,在遇到下游出问题时换其他节点重试,需要设计一个大小和节点数组大小一致的索引数组,每次来新的请求,对索引数组做洗牌,然后取第一个元素作为选中的服务节点。如果请求失败,那么选择下一个节点重试,依此类推:
var endpoints = []string {
"100.69.62.1:3232",
"100.69.62.32:3232",
"100.69.62.42:3232",
"100.69.62.81:3232",
"100.69.62.11:3232",
"100.69.62.113:3232",
"100.69.62.101:3232",
}
// 重点在这个 shuffle
func shuffle(slice []int) {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
}
func request(params map[string]interface{}) error {
var indexes = []int {0,1,2,3,4,5,6}
var err error
shuffle(indexes)
maxRetryTimes := 3
idx := 0
for i := 0; i < maxRetryTimes; i++ {
err = apiRequest(params, indexes[idx])
if err == nil {
break
}
idx++
}
if err != nil {
// logging
return err
}
return nil
}
循环一遍slice,随机生成两个索引,交换这两个索引对应的值,完成洗牌,看起来没有什么问题。
2.1 错误的洗牌导致的负载不均衡
但其实上述洗牌是有问题的,存在两个问题:
- 没有随机种子。在没有随机种子的情况下,
rand.Intn()
返回的伪随机数序列是固定的。 - 洗牌不均匀。不均匀会导致整个数组第一个节点有大概率被选中,所以多个节点的负载分布不均衡
第一个问题不用多说,简单但容易犯,特别注意即可规避。
第二个问题用概率简单解释。如果每次挑选都是真随机,那么这种洗牌的结果是 ,但我们需要的是每次选中的数字的概率相等,则应该有 种结果,823543 明显不是 5040 的倍数,所以肯定有某个数字以比其他数字更高的概率被选择,所以这种代码是错误的,计算概率的过程很复杂,直接上结论:索引为 0 的元素被选中的概率约为 22%,其他元素被选中的概率约为 13%。
2.2 修正洗牌算法
从数学上得到过证明的还是经典 fisher-yates 算法,主要思路为每次随机挑选一个值,放到数组末尾。然后在 n-1 个元素的数组中再随机挑选一个值放在数组末尾,依此类推。
func shuffle(indexes []int) {
for i:=len(indexes); i>0; i-- {
lastIdx := i - 1
idx := rand.Intn(i)
indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
}
}
Go标准库中内置了该算法:
func shuffle(n int) []int {
b := rand.Perm(n)
return b
}
在当前的场景下,只要用rand.Perm
就可以得到想要的索引数组了。
3 ZooKeeper集群的随机节点挑选问题
本节中的场景是从 N 个节点中选择一个节点发送请求,初始请求结束之后,后续请求会重新对数组洗牌,所以每两个请求之间没有关联。因此本节最开始的洗牌算法,不初始化随机库的种子理论上也没有什么问题。
但在一些特殊的场景下,例如使用 ZooKeeper 时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。之后客户端请求都会发往该节点,直到该节点不可用才会在节点列表中挑选下一个节点。在这种场景下,初始连接节点的选择就必须是真随机了,否则,所有客户端启动时,都会去连接同一个 ZooKeeper 的实例,根本无法起到负载均衡的目的。
如果在日常开发中,当前业务也是类似的场景,就必须考虑一下是否会发生类似的情况。
rand
设置种子:
rand.Seed(time.Now().UnixNano())
4 负载均衡算法效果验证
不考虑加权负载均衡的情况,对上述原shuffle
函数和改良过的shuffle
函数进行简单对比:
package main
import (
"fmt"
"math/rand"
"time"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
func shuffle1(slice []int) {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
}
func shuffle2(indexes []int) {
for i := len(indexes); i > 0; i-- {
lastIdx := i - 1
idx := rand.Intn(i)
indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
}
}
func main() {
var cnt1 = map[int]int{}
for i := 0; i < 1000000; i++ {
var sl = []int{0, 1, 2, 3, 4, 5, 6}
shuffle1(sl)
cnt1[sl[0]]++
}
var cnt2 = map[int]int{}
for i := 0; i < 1000000; i++ {
var sl = []int{0, 1, 2, 3, 4, 5, 6}
shuffle2(sl)
cnt2[sl[0]]++
}
fmt.Println(cnt1, "\n", cnt2)
}
结果:
map[0:223847 1:128878 2:129454 3:129201 4:129353 5:129609 6:129658]
map[0:142507 1:142003 2:143590 3:142715 4:142487 5:143064 6:143634]
六 分布式配置管理
在分布式系统中,常困扰我们的还有上线问题。
虽然目前有一些优雅的重启方案,但实际应用中可能受限于系统内部的运行情况而无法做到真正的优雅。比如为了对去下游的流量进行限制,在内存中堆积一些数据,并对堆积设定时间或总量的阈值。在任意阈值达到之后将数据纺一发送给下游,以避免频繁的请求超出下游的承载能力让下游崩溃。这种情况下重启要做到优雅就比较难了。
所以应该尽量避免采用或绕过上线的方式对线上程序做一些修改。比较典型的修改内容就是程序的配置项。
1 场景举例
1.1 报表系统
在一些偏 OLAP 或者离线的数据平台中,经过长期的迭代开发,整个系统的功能模块已经渐渐稳定,可变动的项只出现在数据层,而数据层的变动大多可以认为是 SQL 的变动。架构师们自然而然地会想着把这些变动项抽离到系统外部。
当业务提出了新的需求时,需要将新的 SQL 录入到系统内部,或者简单修改一个老的 SQL,不对系统进行上线,就可以直接完成修改。
1.2 业务配置
大公司的平台部门服务众多业务线,在平台内为各业务线分配唯一 id。平台本身也由多个模块组成,这些模块需要共享相同的业务线定义。当公司新开产品线时,需要能够在短时间内打通所有平台系统的流程,这时每个系统都走上线流程肯定来不及。另外需要对这种公共配置进行统一管理,同时对其增减逻辑也做统一管理。这些信息变更时,需要自动通知业务方的系统,而不需要人力介入或只需简单介入。
除业务管理外,很多互联网公司会按照城市来铺展自己的业务。在某个城市未开城之前,理论上所有模块都应该认为带有该城市 id 的数据都是脏数据并自动过滤。如果业务开城,就应该将这个新的城市 id 加入到白名单中,这样业务流程便可以自动运转。
互联网公司的运营系统中会有各种类型的运营活动,有些运营活动推出后可能出现超出预期的事件(比如公关危机),需要紧急将系统下线。这时会用到一些开关来快速关闭相应的功能,或者快速将想要剔除的活动 id 从白名单中剔除。
2 使用 etcd 实现配置更新
使用 etcd 实现一个简单的配置读取和动态更新流程,以此来了解线上的配置更新流程。
2.1 配置定义
简单的配置,可以将内容完全存储在 etcd 中,如:
etcdctl put /configs/remote_config.json -- '{"addr":"127.0.0.1:1080","aes_key":"01B345B7A9ABC00F0123456789ABCDAF","https":false,"secret":"","private_key_path":"","cert_file_path":""}'
etcdctl get /configs/remote_config.json
{
"addr" : "127.0.0.1:1080",
"aes_key" : "01B345B7A9ABC00F0123456789ABCDAF",
"https" : false,
"secret" : "",
"private_key_path" : "",
"cert_file_path" : ""
}
2.2 新建 etcd client
client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
2.3 获取配置
var resp *clientv3.GetResponse
resp, err = client.Get(context.Background(), "/configs/remote_config.json")
if err != nil {
log.Fatalln(err)
}
log.Println(resp)
client
结构体已完成kv
接口,可以直接调用Get()
方法获取值。
2.4 更新订阅
watch := client.Watch(context.Background(), "/configs/remote_config.json")
for wresp := range watch {
for _, ev := range wresp.Events {
log.Println("new values is ", string(ev.Kv.Value))
err = json.Unmarshal(ev.Kv.Value, &appConfig)
if err != nil {
log.Fatalln(err)
}
}
}
通过订阅 config 路径的变动事件,在该路径下内容发生变化时,客户端侧可以收到变动通知,并收到变动后的字符串值。
2.5 完整代码
导包时也只保留了核心包。
package main
import (
clientv3 "go.etcd.io/etcd/client/v3"
)
type Config struct {
Addr string `json:"addr"`
AesKey string `json:"aes_key"`
HTTPS bool `json:"https"`
Secret string `json:"secret"`
PrivateKeyPath string `json:"private_key_path"`
CertFilePath string `json:"cert_file_path"`
}
var (
appConfig Config
configPath = `/configs/remote_config.json`
client *clientv3.Client
err error
)
func init() {
client, err = clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
panic(err)
}
initConfig()
}
func watchAndUpdate() {
watch := client.Watch(context.Background(), configPath)
for wresp := range watch {
for _, ev := range wresp.Events {
log.Println("new values is ", string(ev.Kv.Value))
err = json.Unmarshal(ev.Kv.Value, &appConfig)
if err != nil {
log.Fatalln(err)
}
}
}
}
func initConfig() {
var resp *clientv3.GetResponse
resp, err = client.Get(context.Background(), configPath)
if err != nil {
panic(err)
}
json.Unmarshal(resp.Kvs[0].Value, &appConfig)
}
func getConfig() Config {
return appConfig
}
func main() {
defer client.Close()
watchAndUpdate()
}
上述代码在更新配置时进行了一系列操作:watch 响应、json 解析,这些操作都不具备原子性。当单个业务请求流程中多次获取 config 时,有可能因为中途 config 发生变化而导致单个请求前后逻辑不一致。因此,在使用类似方法进行更新配置时,需要在单个请求的生命周期内使用相同的配置。具体实现方式可以是只在请求开始时获取一次配置,然后依次向下透传。当然具体情况需要具体分析。
3 配置膨胀
随着业务的发展,配置系统本身所承载的压力可能也会越来越大,配置文件可能成千上万,客户端也可能成千上万,此时,将配置内容存储在 etcd 内部就不合适了。
随着配置文件数量的膨胀,除了存储系统本身的吞吐量问题,还有配置信息的管理问题。
需要对相应的配置进行权限管理,需要根据业务量进行配置存储的集群划分。如果客户端太多,导致配置存储系统无法承受瞬时大量的 QPS,可能还需要在客户端侧进行缓存优化等等。
这也是为什么大公司都会针对自己的业务额外开发一套复杂配置系统的原因。
4 配置版本管理
在配置管理过程中,难免出现用户误操作的情况,例如在更新配置时,输入了无法解析的配置。此时可以通过配置校验来解决。
有时错误的配置可能不是格式上的问题,而是逻辑上的问题。比如写 SQL 时少 select 了一个字段,更新配置时,不小心丢掉了 json 字符串里的一个 field 而导致程序无法理解新的配置而运行异常。为了快速止损,最快且最有效的办法就是进行版本管理,并支持按版本回滚。
在配置进行更新时,要为每份配置的新内容赋予一个版本号,并将修改前的内容和版本号记录下来,当发现新配置出现问题时,能够及时回滚。
常见的做法是,使用 MySQL 来存储配置文件或配置字符串的不同版本内容,在需要回滚时,只需进行简单查询即可。
5 客户端容错
在业务系统的配置被剥离到配置中心后,并不意味着系统就可以高枕无忧了。
当配置中心本身宕机时,也需要一定的容错能力,至少保证在其宕机期间,业务仍然可以运转。这要求系统能够在配置中心宕机时,也能够拿到需要的配置信息,即使这些信息不是最新版本。
具体来讲,在给业务提供配置读取的 SDK 时,最好能够将拿到的配置在业务机器的磁盘上也缓存一份。这样远程配置中心不可用时,可以直接用硬盘上的内容继续运行。当重新连接上配置中心时,再把相应的内容进行更新。
加入缓存之后,必须考虑数据一致性的问题。当有个别业务机器因为网络错误而与其他机器配置不一致时,应该能够从监控系统中知晓。
使用一种手段可能解决配置更新的痛点,但同时此手段也可能带来新的问题。实际开发中,要对每一步决策多多思考,以使自己能够在发生问题时不会手足无措。
有疑问加站长微信联系(非本文作者)