基于time的ticker封装的定时器

duy_2018 · · 1760 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

本人最近新学GO,为了能够深入了解GO,于是计划使用GO实现一个高并发、IO密集操作的ETL程序。闲话不多说,今天完成了ETL中最基本的要素,定时器。

首先了解到GO中的time.Ticker不适用到高精度的定时器,因为ETL任务周期设定是1分钟,故此处无需高精度的定时器(对于高精度的定时器,在网上看到有人使用时间轮实现timewheel,我也看了实现,并且也按其思路实现了一版,此处略去)。

封闭的类型:TimeTicker,使用示例代码如下:

func main() {
    var tt *timeticker.TimeTicker = timeticker.New(3*time.Second)  //创建一个3秒钟执行一次任务的定时器
    e := tt.AddJob(job, true)    //向此定时器中增加一个任务,返回此任务的指针(此指标在后续删除任务时使用)
    go tt.Start()    //开一个协程,让此定时器单独去执行任务
    time.Sleep(6*time.Second)

    //主线程睡6秒后再加一个任务到定时器的任务队列,之后每周期会执行两个任务
    tt.AddJob(test_job, true) 
    time.Sleep(10*time.Second)
    tt.RemoveTask(e)  //主线程睡10秒后,清除掉第一个job,任务队列就只剩一个任务了
    time.Sleep(10*time.Second)
    tt.Stop()    //停止此定时器,此后会发现定时器不再执行
    time.Sleep(5*time.Second)    //睡五秒后退出
}

func test_job(...interface{}) {
    fmt.Print("test_job is call ...")
    fmt.Println(time.Now())
}
func job(...interface{}) {
    fmt.Print("job is call ...")
    fmt.Println(time.Now())
}

程序执行结果:

job is call ...2018-04-11 22:21:42.0179853 +0800 CST m=+0.005000301
job is call ...2018-04-11 22:21:45.0181569 +0800 CST m=+3.005171901
test_job is call ...2018-04-11 22:21:48.0183285 +0800 CST m=+6.005343501
job is call ...2018-04-11 22:21:48.0183285 +0800 CST m=+6.005343501
job is call ...2018-04-11 22:21:51.0185001 +0800 CST m=+9.005515101
test_job is call ...2018-04-11 22:21:51.0185001 +0800 CST m=+9.005515101
test_job is call ...2018-04-11 22:21:54.0186717 +0800 CST m=+12.005686701
job is call ...2018-04-11 22:21:54.0186717 +0800 CST m=+12.005686701
test_job is call ...2018-04-11 22:21:57.0188433 +0800 CST m=+15.005858301
job is call ...2018-04-11 22:21:57.0188433 +0800 CST m=+15.005858301
test_job is call ...2018-04-11 22:22:00.0180149 +0800 CST m=+18.005029901
test_job is call ...2018-04-11 22:22:03.0181865 +0800 CST m=+21.005201501
test_job is call ...2018-04-11 22:22:06.0183581 +0800 CST m=+24.005373101
 

从上述运行结果可以看出,此定时器到秒级还算精准,但时间精度再小就会有偏差了,这就是开头说的不适用于高精度的定时器。上面的使用示例代码中的注释已经说明的很清楚了,下面献上封装的定时器的代码(特意加了部分注释,方便需要的同学理解):

//************************code begin********************************

package timeticker

import (
    "fmt"
    "time"
    //"reflect"
    "container/list" 
)

type Task struct {
    interval     time.Duration        //当前任务延迟多长时间后执行
    job            func(...interface{})                //回调函数(任务的具体执行方法)
    isCycle        bool            //任务是否只执行一次(true,执行一次后就会从执行队列中清除掉
}

//定时器类型
type TimeTicker struct {
    interval     time.Duration        //定时器间隔多长时间执行一次
    ticker       *time.Ticker
    taskQueue    *list.List
    closeChan    chan bool        //向此通道中输入true值,则会关闭此定时器
}

func New(interval time.Duration) *TimeTicker {
    if interval <= 0 {
        return nil
    }
    tt := &TimeTicker {
        interval:        interval,
        taskQueue:     list.New(),
        closeChan:      make(chan bool),
    }
    return tt
}

//向定时器的任务队列中增加任务
//参数:task--增加的任务
//            isCycle--是否循环执行, true,每次执行完后依然保留在任务队列中,false,执行完一次后就会被清除掉
//考虑增加一组任务的方法
func (tt *TimeTicker) AddTask(task Task, isCycle bool) {
    tt.taskQueue.PushBack(task)
}

func (tt *TimeTicker) AddTaskList(task Task, isCycle bool) {
    
}

//增加一个执行任务    参数:job--具体执行的函数      
//         isCycle--是否循环执行,true:循环执行,每个周期均执行一次,    false:只执行一次,然后从任务队列中清除
func (tt *TimeTicker) AddJob(job func(...interface{}), isCycle bool) *list.Element {
    task := &Task {
        interval:    0,
        job:        job,
        isCycle:    isCycle,
    }
    e := tt.taskQueue.PushBack(task)
    return e
}

func (tt *TimeTicker) RemoveTask(e *list.Element) {
    tt.taskQueue.Remove(e)
    //fmt.Println(tt.taskQueue.Len())
}

func (tt *TimeTicker) Stop() {
    tt.ticker.Stop()
    tt.closeChan <- true
}
//启动定时器,如果不传参数表示立刻启动,否则delay时间后启动
func (tt *TimeTicker) Start(delay ...time.Duration) {
    
    if len(delay) > 0 {        //延迟delay[0]时间后启动
        
    } else {            //立刻启动
        tt.ticker = time.NewTicker(tt.interval)    
        tt.executeTaskQueue()
        for {
            select {
                case <- tt.ticker.C:
                    tt.executeTaskQueue()
                case <- tt.closeChan:
                    return
            }
        }
        
    }
}

func (tt *TimeTicker) executeTaskQueue() {
    
    if tt.taskQueue.Len() == 0 {
        fmt.Println("任务队列为空,没有可执行的任务......")
        return
    }
    
    for iter := tt.taskQueue.Front(); iter != nil; iter = iter.Next() {
        t := iter.Value
        task,ok := t.(*Task)
        if ok {
            go task.job()
            if !task.isCycle {
                iter_tmp := iter
                iter = iter.Prev()
                tt.taskQueue.Remove(iter_tmp)
                if iter == nil {
                    return
                }
            }
        }
    }
}

//设定超时机制,当设定此函数后,定时器会从任务启动时间开始计时,
//如果达到此时间后,任务队列中的任务达仍然未被执行完,则中止中剩余的任务,并发出警告
func (tt *TimeTicker) SetTimeOut(setTime time.Duration) {
    
}

//************************code end********************************

大部分函数前都加了很详细的注释,方便大家阅读。还有一些设计好的函数并未实现,但主体功能都已经出来了,并不影响使用,且最主要的原因是ETL中没有这些(未实现的函数)功能的需求,就偷个赖了。

 

 


有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1760 次点击  ∙  1 赞  
加入收藏 微博
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传