golang实现延时队列基本原理

成木2016 · · 925 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

主要参考了文章:
https://www.cnblogs.com/jkko123/p/7239420.html
更进一步的设计需要参考(未完成)
https://tech.youzan.com/queuing_delay/

package main

import (
    "errors"
    "fmt"
    "time"
)

const cicleSectionNum = 10

type TaskFunc func(args ...interface{})

//任务
type Task struct {
    runTime  time.Time //初次运行时间
    cycleNum int       //需要第几圈
    curIndex int       //当前运行到第几格
    //执行的函数
    exec   TaskFunc
    params []interface{}
}

type DelayMessage struct {
    cycleNum  int //当前运行到第几圈了
    curIndex  int //当前运行到第几格
    slots     [cicleSectionNum]map[string]*Task
    closed    chan bool
    taskClose chan bool
    timeClose chan bool
    startTime time.Time
}

func NewDelayMessage() *DelayMessage {
    dm := &DelayMessage{
        cycleNum:  0,
        curIndex:  0,
        closed:    make(chan bool),
        taskClose: make(chan bool),
        timeClose: make(chan bool),
        startTime: time.Now(),
    }
    for i := 0; i < cicleSectionNum; i++ {
        dm.slots[i] = make(map[string]*Task)
    }
    return dm
}

func (dm *DelayMessage) Start() {
    go dm.taskLoop()
    go dm.timeLoop()
    select {
    case <-dm.closed:
        dm.taskClose <- true
        dm.timeClose <- true
        break
    }
}

func (dm *DelayMessage) Stop() {
    dm.closed <- true
}

func (dm *DelayMessage) taskLoop() {
    defer func() {
        fmt.Println("任务遍历结束!")
    }()
    for {
        select {
        case <-dm.taskClose:
            return
        default:
            {
                tasks := dm.slots[dm.curIndex]
                if len(tasks) > 0 {
                    for k, v := range tasks {
                        if v.cycleNum == dm.cycleNum {
                            go v.exec(v.params...)
                            delete(tasks, k)
                        }
                    }
                }
            }
        }

    }
}

func (dm *DelayMessage) timeLoop() {
    defer func() {
        fmt.Println("时间遍历结束!")
    }()
    tick := time.NewTicker(time.Second)
    for {
        select {
        case <-dm.timeClose:
            return
        case <-tick.C:
            fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
            dm.curIndex = (dm.curIndex + 1) % cicleSectionNum
            if dm.curIndex == 0 {
                dm.cycleNum += 1
            }
            fmt.Println("当前循环时间", dm.cycleNum, dm.curIndex)
        }
    }

}

//添加任务
func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {
    if dm.startTime.After(t) {
        return errors.New("时间错误")
    }
    //当前时间与指定时间相差秒数
    subSecond := t.Unix() - dm.startTime.Unix()
    //计算循环次数
    cycleNum := int(subSecond / cicleSectionNum)
    //计算任务所在的slots的下标
    ix := subSecond % cicleSectionNum
    //把任务加入tasks中
    tasks := dm.slots[ix]
    if _, ok := tasks[key]; ok {
        return errors.New("该slots中已存在key为" + key + "的任务")
    }
    tasks[key] = &Task{
        runTime:  t,
        cycleNum: cycleNum,
        curIndex: int(ix),
        exec:     exec,
        params:   params,
    }
    return nil
}

func main() {
    fmt.Println("abc")
    dm := NewDelayMessage()
    //添加任务
    dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...interface{}) {
        fmt.Println(args...)
    }, []interface{}{1, 2, 3})

    dm.AddTask(time.Now().Add(time.Second*3), "test1", func(args ...interface{}) {
        fmt.Println(args...)
    }, []interface{}{2, 2, 3})

    dm.AddTask(time.Now().Add(time.Second*3), "test2", func(args ...interface{}) {
        fmt.Println(args...)
    }, []interface{}{3, 2, 3})

    dm.AddTask(time.Now().Add(time.Second*11), "test11", func(args ...interface{}) {
        fmt.Println(args...)
    }, []interface{}{11, 2, 3})

    dm.AddTask(time.Now().Add(time.Second*12), "test11", func(args ...interface{}) {
        fmt.Println(args...)
    }, []interface{}{11, 2, 3})

    //40秒后关闭
    time.AfterFunc(time.Second*40, func() {
        dm.Stop()
    })
    dm.Start()
}

运行测试:


image.png

有疑问加站长微信联系(非本文作者)

本文来自:简书

感谢作者:成木2016

查看原文:golang实现延时队列基本原理

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

925 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传