前段时间,因为业务需要,实现了单协程、单timer的多定时任务管理器,其特点在于:
1、能够添加一次性、重复性任务,并能在其执行前撤销或频繁更改。
2、支持同一时间点,多个任务提醒。
3、适用于中等密度,大跨度的单次、多次定时任务。
4、支持10万次/秒的定时任务执行、提醒、撤销或添加操作,平均延迟10微秒内
5、支持注册任务的函数调用,及事件通知。
代码比较简单,也分享到github,有兴趣的可以看看:定时器源代码。目前版本还处于维护阶段,接口可能会有点变化,或许还会根据我们的业务调整增加第三方数据的支持,还请理解。
问题背景
在业务中,我们经常需要基于定时任务来触发来实现各种功能。比如TTL会话管理、锁、定时任务(闹钟)或更复杂的状态切换等等。
由于go语言协程非常轻,很容易开启多个并发。如果结合time包,能够非常easy的实现一次或多次的定时提醒。我们只需要几行关键代码,就可以session的变量过期设置。以下为忽略了更新功能的session ttl功能代码演示:
var(
cache=make(map[string]bool)
mut sync.mutex
)
//添加Token,输入时间端将决定当前之后多久,token将会从cache中移除。
func AddToken(token string,timeout time.Duration)bool{
if timeout<=0{
return false
}
mut.Lock()
defer mut.Unlock()
cache[token]=true
//定时注销模块
go func(){
time.Sleep(timeout)
mut.Lock()
delete(cache,token)
mut.Unlock()
}
return true
}
实际业务中的需求,要更加复杂,我们需要:
- 定时执行的任务,分为1次、N次、无数次。第一种允许设置间隔时间(段)或具体的时间点。
- 定时任务的密度不均匀,快的间隔几秒就会执行,慢的通常是2小时,少部分会在、几天甚至几周以后执行。
- 任务执行前,可能会被撤销。
- 大量任务执行后,会通过广播及时提醒客户端,必须尽量准确,最好控制在毫秒级,极限不超过100毫秒。(参考游戏中,玩家基地被攻击的场景)
- 性能至少支持1k QPS,越高越好,但任务容量要能支持500k。
探索
数据存储&性能
最初,直接利用多个go的协程+time.Sleep的方式执行,将其统一交付给系统管理,但创建了20万条任务,就会系统卡死。于是,只有考虑通过单一协程+timer的方式,自己维护定时任务。
简单评估了一下常用的基础数据结构,考虑过list、切片、小堆、甚至外部的redis、sqlite等:
结构 | 描述 | 结论 |
---|---|---|
切片 | 支持有序,但变更、删除时间复杂度为O(n)。 | 不考虑 |
list | 支持有序,但变更、删除时间复杂度为O(n)。 | 不考虑 |
map | 无序,不支持最近的时间点执行,无法满足需求。 | 不考虑 |
redis、sqlite | 功能满足,但需要额外部署。 | 尽量不采用 |
时间轮 | 定时任务最常用的结构,对于频率不确定、时间间隔不确定、且跨度大的任务空间浪费大;更改撤销的效率不高 | 不考虑 |
堆 | 支持有序,获取最小值非常快速。 | 尝试 |
红黑树 | 支持排序、插入、删除。获取最小值方便,数据更新的时候,需要reblance,但性能比较稳定。 | 尝试 |
跳跃表 | 排序方便,支持有序。查询、插入、删除时间复杂度与红黑树一样。内存消耗略多,但删除操作更局部,通常优于红黑树。 | 尝试 |
前期优先考虑堆来存放待执行的任务,主要原因在于有基本库支持。但在业务上比较纠结:
- 内部需要多维护一个属性index,以定位堆中的序号,以在撤销、更改的时候方便使用。
- 如果新增任务执行时间靠后,需要更改大量堆数组的index,且时间复杂度不可控。
- 空间占用上,必须连续,且释放不方便。
最终,综合业务的“频繁更改”这个特性,还是采用了红黑树来存储定时任务,更改后:
-优势
- 更新操作的开销很稳定,时间复杂度可控。
- 工程上,增删改代码简单。
- 劣势
- 任务更改不频繁时,性能大概是15万/秒,比小堆慢一倍。
- 需要借助单独的红黑树实现。
功能&接口
按照背景的描述,功能出来后,发现有些业务除了传递了函数外,还需要事件提醒,考虑到闭包使用不是很方便,因此,参考了go标准库rpc client的结果返回的异步封装方式(go的csp应用的一个亮点就在于此,具体参见之前的文章:Call回调)。我们为任务增加msgChan来进行了支持:
// jobItem implementation of "Job" and "rbtree.Item"
type jobItem struct {
id uint64 //唯一键值,内部由管理器生成,以区分同一时刻的不同任务事件
times uint64 //允许执行的最大次数
count uint64 //计数器,表示已执行(或触发)的次数
intervalTime time.Duration //间隔时间
createTime time.Time //创建时间
actionTime time.Time //计算得出的此次执行时间点,有误差
fn func() //事件函数
msgChan chan Job //消息通道,执行时,控制器通过该通道向外部传递消息
}
jobItem之所以考虑私有,是因为内部存储结构为二叉树,为了屏蔽判等接口实现的方法,所以定义了Job接口,这样在使用的时候就不会被无效方法困扰。接口定义如下:
// Job External access interface for timed tasks
type Job interface {
C() <-chan Job //C Get a Chan,which can get message if Job is executed
Count() uint64 //计数器,表示已执行(或触发)的次数
Times() uint64 //允许执行的最大次数
}
应用示例
有了定时器,再实现上百万容量的支持TTL的会话控制就非常简单。1.首先定义两个结构体,其中Session是容器,也是一个控制器;tokenjob是一个内部元素,起到token-内部定时器的定向作用:
//Session support ttl release
type Session struct {
sync.Mutex
cache map[string]tokenjob
clock *clock.Clock
}
// tokenjob binding clock by internal property "jobid"
type tokenjob struct {
token string
jobid uint64 //<<
}
其中,tokenjob的token是一个字符串,实际业务中是可以为任何类型,更改到不难。由于go还不支持泛型,例子中要写成通用的,则累赘,这里暂时用字符串。
2.然后,为session提供:
- 一个Add方法,通常情况下,添加的token已经存在,则更新其update值。
- 一个移除方法,当时间抵达时,将移除对应的token。
代码中的打印语句是为了后面的执行程序查看运行效果。
// AddToken add token string which can release after seconds
// @interval: TTL seconds
// return:
// @added: if add when inserted successful;else updated release time
// @error: if interval==0
func (s *Session) AddToken(token string, interval uint64) (added bool, err error) {
if interval == 0 {
err = errors.New("interval cannot be zero!")
return
}
s.Lock()
defer s.Unlock()
item, founded := s.cache[token]
if founded {
s.clock.UpdateJobTimeout(item.job, time.Duration(interval)*time.Second)
added = false //update token
} else {
job, _ := s.clock.AddJobWithTimeout(time.Duration(interval)*time.Second, func() { s.RemoveToken(token) })
item := tokenjob{
token: token,
job: job,
}
s.cache[token] = item
added = true
}
return
}
func (s *Session) RemoveToken(token string) {
s.Lock()
defer s.Unlock()
fmt.Println("token:", token, " is removed!@", time.Now().Format("15:04:05:00")) //just for watching
delete(s.cache, token)
}
3.最后,写一个执行代码,做两个非常简单的测试:
- 第一个测试:插入一个token,查看判断指定时间后是否还存在。
- 第二个测试:插入一个token后,在ttl之前更改时间,查看生效。但要确保更改之前,token还存在,否则就会变成新增,而不是更新。
func main() {
session := NewSession()
fmt.Println("test add token,and ttl can action")
session.AddToken("alex023", 3)
for i := 0; i < 3; i++ {
time.Sleep(time.Second * 2)
fmt.Printf("%v|session have %2d tokens,found token=alex023 %v \n", time.Now().Format("15:04:05"), session.GetTokenNum(), session.GetToken("alex023"))
}
fmt.Println()
fmt.Println("test add token and update it")
session.AddToken("alex023_2", 4)
for i := 0; i < 5; i++ {
time.Sleep(time.Second * 1)
if i == 1 {
session.AddToken("alex023_2", 5)
}
fmt.Printf("%v|session have %2d tokens,found token=alex023_2 %v \n", time.Now().Format("15:04:05"), session.GetTokenNum(), session.GetToken("alex023_2"))
}
}
以上代码的完整连接,可以查看github:示例文件
有疑问加站长微信联系(非本文作者)