本人最近新学GO,为了能够深入了解GO,于是计划使用GO实现一个高并发、IO密集操作的ETL程序。闲话不多说,今天完成了ETL中最基本的要素,定时器。
首先了解到GO中的time.Ticker不适用到高精度的定时器,因为ETL任务周期设定是1分钟,故此处无需高精度的定时器(对于高精度的定时器,在网上看到有人使用时间轮实现timewheel,我也看了实现,并且也按其思路实现了一版,此处略去)。
封闭的类型:TimeTicker,使用示例代码如下:
func main() {
var tt *timeticker.TimeTicker = timeticker.New(3*time.Second) //创建一个3秒钟执行一次任务的定时器
e := tt.AddJob(job, true) //向此定时器中增加一个任务,返回此任务的指针(此指标在后续删除任务时使用)
go tt.Start() //开一个协程,让此定时器单独去执行任务
time.Sleep(6*time.Second)
//主线程睡6秒后再加一个任务到定时器的任务队列,之后每周期会执行两个任务
tt.AddJob(test_job, true)
time.Sleep(10*time.Second)
tt.RemoveTask(e) //主线程睡10秒后,清除掉第一个job,任务队列就只剩一个任务了
time.Sleep(10*time.Second)
tt.Stop() //停止此定时器,此后会发现定时器不再执行
time.Sleep(5*time.Second) //睡五秒后退出
}
func test_job(...interface{}) {
fmt.Print("test_job is call ...")
fmt.Println(time.Now())
}
func job(...interface{}) {
fmt.Print("job is call ...")
fmt.Println(time.Now())
}
程序执行结果:
job is call ...2018-04-11 22:21:42.0179853 +0800 CST m=+0.005000301
job is call ...2018-04-11 22:21:45.0181569 +0800 CST m=+3.005171901
test_job is call ...2018-04-11 22:21:48.0183285 +0800 CST m=+6.005343501
job is call ...2018-04-11 22:21:48.0183285 +0800 CST m=+6.005343501
job is call ...2018-04-11 22:21:51.0185001 +0800 CST m=+9.005515101
test_job is call ...2018-04-11 22:21:51.0185001 +0800 CST m=+9.005515101
test_job is call ...2018-04-11 22:21:54.0186717 +0800 CST m=+12.005686701
job is call ...2018-04-11 22:21:54.0186717 +0800 CST m=+12.005686701
test_job is call ...2018-04-11 22:21:57.0188433 +0800 CST m=+15.005858301
job is call ...2018-04-11 22:21:57.0188433 +0800 CST m=+15.005858301
test_job is call ...2018-04-11 22:22:00.0180149 +0800 CST m=+18.005029901
test_job is call ...2018-04-11 22:22:03.0181865 +0800 CST m=+21.005201501
test_job is call ...2018-04-11 22:22:06.0183581 +0800 CST m=+24.005373101
从上述运行结果可以看出,此定时器到秒级还算精准,但时间精度再小就会有偏差了,这就是开头说的不适用于高精度的定时器。上面的使用示例代码中的注释已经说明的很清楚了,下面献上封装的定时器的代码(特意加了部分注释,方便需要的同学理解):
//************************code begin********************************
package timeticker
import (
"fmt"
"time"
//"reflect"
"container/list"
)
type Task struct {
interval time.Duration //当前任务延迟多长时间后执行
job func(...interface{}) //回调函数(任务的具体执行方法)
isCycle bool //任务是否只执行一次(true,执行一次后就会从执行队列中清除掉
}
//定时器类型
type TimeTicker struct {
interval time.Duration //定时器间隔多长时间执行一次
ticker *time.Ticker
taskQueue *list.List
closeChan chan bool //向此通道中输入true值,则会关闭此定时器
}
func New(interval time.Duration) *TimeTicker {
if interval <= 0 {
return nil
}
tt := &TimeTicker {
interval: interval,
taskQueue: list.New(),
closeChan: make(chan bool),
}
return tt
}
//向定时器的任务队列中增加任务
//参数:task--增加的任务
// isCycle--是否循环执行, true,每次执行完后依然保留在任务队列中,false,执行完一次后就会被清除掉
//考虑增加一组任务的方法
func (tt *TimeTicker) AddTask(task Task, isCycle bool) {
tt.taskQueue.PushBack(task)
}
func (tt *TimeTicker) AddTaskList(task Task, isCycle bool) {
}
//增加一个执行任务 参数:job--具体执行的函数
// isCycle--是否循环执行,true:循环执行,每个周期均执行一次, false:只执行一次,然后从任务队列中清除
func (tt *TimeTicker) AddJob(job func(...interface{}), isCycle bool) *list.Element {
task := &Task {
interval: 0,
job: job,
isCycle: isCycle,
}
e := tt.taskQueue.PushBack(task)
return e
}
func (tt *TimeTicker) RemoveTask(e *list.Element) {
tt.taskQueue.Remove(e)
//fmt.Println(tt.taskQueue.Len())
}
func (tt *TimeTicker) Stop() {
tt.ticker.Stop()
tt.closeChan <- true
}
//启动定时器,如果不传参数表示立刻启动,否则delay时间后启动
func (tt *TimeTicker) Start(delay ...time.Duration) {
if len(delay) > 0 { //延迟delay[0]时间后启动
} else { //立刻启动
tt.ticker = time.NewTicker(tt.interval)
tt.executeTaskQueue()
for {
select {
case <- tt.ticker.C:
tt.executeTaskQueue()
case <- tt.closeChan:
return
}
}
}
}
func (tt *TimeTicker) executeTaskQueue() {
if tt.taskQueue.Len() == 0 {
fmt.Println("任务队列为空,没有可执行的任务......")
return
}
for iter := tt.taskQueue.Front(); iter != nil; iter = iter.Next() {
t := iter.Value
task,ok := t.(*Task)
if ok {
go task.job()
if !task.isCycle {
iter_tmp := iter
iter = iter.Prev()
tt.taskQueue.Remove(iter_tmp)
if iter == nil {
return
}
}
}
}
}
//设定超时机制,当设定此函数后,定时器会从任务启动时间开始计时,
//如果达到此时间后,任务队列中的任务达仍然未被执行完,则中止中剩余的任务,并发出警告
func (tt *TimeTicker) SetTimeOut(setTime time.Duration) {
}
//************************code end********************************
大部分函数前都加了很详细的注释,方便大家阅读。还有一些设计好的函数并未实现,但主体功能都已经出来了,并不影响使用,且最主要的原因是ETL中没有这些(未实现的函数)功能的需求,就偷个赖了。
有疑问加站长微信联系(非本文作者)