前言
最近有个延迟执行的任务需求,比如发了一个定时红包,服务器不能相信客户端的一切,所以就得做时间的同步,但是PHP相对来讲不是很适合做这种“XX秒后去执行一个什么样的动作这类的行为”,但是这个功能又是不可缺少的,然后就周末花时间调研了下相关的实现。大致有如下几种:
- 借助Redis的sorted_set和hash结构
- 自己写一个定时器,不断“轮询”触发
- 借助语言的异步库
- 借助消息队列等服务。
下面针对这几点一一做下简单的实现, 然后考虑到可维护性, 数据丢失后怎么恢复,服务监控等一系列问题。最后选择一个场景上来说更合适的吧。
借助Redis实现
在正式使用Redis来实现这一延迟需求之前,我还了解到Redis的key notification事件提醒,可以在某一个key过期的时候触发一个动作,这对于我们做延迟任务来讲,的确是很好的一个契机,但是打开了它就会不可避免的造成效率上的降低,而且线上服务器一般不会再去修改了,因此这个特性,自己了解下,玩玩就行了。具体的实现还是得老老实实设计数据结构了。
结构涉及
我的做法是 QUEUE 加上 CONTAINER。即会有一个根据时间不断往前移动的时间轴作为我们的队列,然后在队列上每一个时间戳,作为一个链表往外散发,保存多个task。
生产者productor.php
guo@Server218:/tmp$ cat productor.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
// 模拟生产延迟消息
for($index=0; $index<10; $index++) {
// 每秒可能会产生多条数据,但是只要“当秒”有数据,就需要添加到queue中
$ts = time();
$cursecond = rand(0, 9) % 2 == 0;
$tasklength = rand(0, 9) % 3;
if($cursecond == true) {
// 当前秒有task
$redis->zadd($QUEUE, $ts, $ts);
if($tasklength > 0) {
for($i=0; $i<$tasklength;$i++) {
$key = "2614677&".rand(0, 100000);
$redis->hset($SERIALIZER.":".$ts, $key, $key);
echo "[{$ts}] cursecond:{$cursecond}, KEY:{$key}\n";
}
}
}
sleep($tasklength);
}
消费者consumer.php
guo@Server218:/tmp$ cat consumer.php
<?php
$redis = new Redis();
$redis->connect("localhost", 6379);
$redis->select(2);
$QUEUE = "asyncqueue:zset";
$SERIALIZER = "serialize:hash";
$counter = 0;
while(true) {
$ts = 1542596034 + $counter;
$counter++;
$ret = $redis->zrangebyscore($QUEUE, $ts, $ts, array("WITHSCORES"=>true));
// 获取下具体的task并执行
$items = $redis->hgetall($SERIALIZER.":".$ts);
foreach($items as $key=>$member) {
echo "CONSUMER[{$ts}]\t[{$key}]\t{$member}\n";
}
if($counter>=10) {
break;
}
}
测试
先来看看生产的具体内容。
guo@Server218:/tmp$ vim productor.php
[1542596034] cursecond:1, KEY:2614677&46685
[1542596034] cursecond:1, KEY:2614677&99086
[1542596036] cursecond:1, KEY:2614677&38241
[1542596037] cursecond:1, KEY:2614677&74988
[1542596038] cursecond:1, KEY:2614677&69443
[1542596038] cursecond:1, KEY:2614677&25523
[1542596040] cursecond:1, KEY:2614677&29642
[1542596040] cursecond:1, KEY:2614677&15928
[1542596042] cursecond:1, KEY:2614677&91626
[1542596042] cursecond:1, KEY:2614677&7382
Press ENTER or type command to continue
然后看看消费者是否正确消费。
Press ENTER or type command to continue
CONSUMER[1542596034] [2614677&46685] 2614677&46685
CONSUMER[1542596034] [2614677&99086] 2614677&99086
CONSUMER[1542596036] [2614677&38241] 2614677&38241
CONSUMER[1542596037] [2614677&74988] 2614677&74988
CONSUMER[1542596038] [2614677&69443] 2614677&69443
CONSUMER[1542596038] [2614677&25523] 2614677&25523
CONSUMER[1542596040] [2614677&29642] 2614677&29642
CONSUMER[1542596040] [2614677&15928] 2614677&15928
CONSUMER[1542596042] [2614677&91626] 2614677&91626
CONSUMER[1542596042] [2614677&7382] 2614677&7382
Press ENTER or type command to continue
谈谈看法
- 利用Redis来实现,可以看出对Redis服务器的QPS会有一个微幅提升,这个问题可以通过multi管道来稍微优化下,这里就不多说了。
- 数据不会丢,这样即便是服务挂掉也能将未消费的任务进行恢复。
- 服务监控以及可维护性尚佳,基于Redis,稳定性能得到保证。
- 不用切换语言,易于实现,也无需增加额外的中间件,减少了维护工作。
定时器⏲
原理
在网上搜索相关实现的时候,搜到一篇不错的文章。golang实现延迟消息的原理与方法 不错的文章,核心思路就在于下面这张图了。
代码实现
原文代码中有一个bug,就是在执行任务轮询的时候没有做休眠,会导致服务一直全速前进,这不太好。修改后的代码如下:
➜ asyncdemos cat delayring.go
package main
import (
"time"
"errors"
"fmt"
"github.com/kataras/iris"
"net/http"
"bytes"
"log"
"io/ioutil"
"encoding/json"
"github.com/garyburd/redigo/redis"
"strconv"
)
const (
TASK_TYPE_INTERVAL = 1
TASK_TYPE_DELAY = 2
QUEUE_LENGTH = 10
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=b716e1f9f2f7d4fb93d4bb79db65a117d589f886d1757"
)
//延迟消息
type DelayMessage struct {
//当前下标
curIndex int;
//环形槽
slots [QUEUE_LENGTH]map[string]*Task;
//关闭
closed chan bool;
//任务关闭
taskClose chan bool;
//时间关闭
timeClose chan bool;
//启动时间
startTime time.Time;
}
//执行的任务函数
type TaskFunc func(args ...interface{});
//任务
type Task struct {
//循环次数
cycleNum int;
//执行的函数
exec TaskFunc;
params []interface{};
catagory int
}
//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
dm := &DelayMessage{
curIndex: 0,
closed: make(chan bool),
taskClose: make(chan bool),
timeClose: make(chan bool),
startTime: time.Now(),
};
for i := 0; i < QUEUE_LENGTH; i++ {
dm.slots[i] = make(map[string]*Task);
}
return dm;
}
//启动延迟消息
func (dm *DelayMessage) Start() {
go dm.taskLoop();
go dm.timeLoop();
select {
case <-dm.closed:
{
dm.taskClose <- true;
dm.timeClose <- true;
break;
}
};
}
//关闭延迟消息
func (dm *DelayMessage) Close() {
dm.closed <- true;
}
//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {
defer func() {
fmt.Println("taskLoop exit");
}();
for {
// TODO 看看怎么优化比较合适,要不加这个的话,程序会执行超过一次
time.Sleep(time.Second)
select {
case <-dm.taskClose:
{
return;
}
default:
{
//取出当前的槽的任务
tasks := dm.slots[dm.curIndex];
if len(tasks) > 0 {
//遍历任务,判断任务循环次数等于0,则运行任务
//否则任务循环次数减1
for k, v := range tasks {
if v.cycleNum == 0 {
fmt.Printf("\t\t\t\t\tCURINDEX[%v], key: %v, cyclenum: %v\n", dm.curIndex, k, v.cycleNum)
go v.exec(v.params...);
//删除运行过的任务 对于catagory=1的周期性任务不予删除
if v.catagory != TASK_TYPE_INTERVAL {
delete(tasks, k)
}
} else {
v.cycleNum--;
}
}
}
}
}
}
}
//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
defer func() {
fmt.Println("timeLoop exit");
}();
tick := time.NewTicker(time.Second);
for {
select {
case <-dm.timeClose:
{
return;
}
case <-tick.C:
{
fmt.Printf("%v, [%v]\n", time.Now().Format("2006-01-02 15:04:05"), dm.curIndex);
//fmt.Println(dm.slots)
//判断当前下标,如果等于3599则重置为0,否则加1
if dm.curIndex == QUEUE_LENGTH - 1 {
dm.curIndex = 0;
} else {
dm.curIndex++;
}
}
}
}
}
//添加任务
//func (dm *DelayMessage) AddTask(t time.Time, key string, catagory int, exec TaskFunc, params []interface{}) error {
func (dm *DelayMessage) AddTask(seconds int, key string, catagory int, exec TaskFunc, params []interface{}) error {
//if dm.startTime.After(t) {
// return errors.New("时间错误");
//}
//当前时间与指定时间相差秒数
//subSecond := t.Unix() - dm.startTime.Unix();
//subSecond := int(t.Unix() - time.Now().Unix());
subSecond := seconds
//计算循环次数
cycleNum := int(subSecond / QUEUE_LENGTH);
//计算任务所在的slots的下标
ix := (subSecond + dm.curIndex ) % QUEUE_LENGTH ;
fmt.Printf("\t\t\t\t\t key: %v, cycle: %v, index: %v , curIndex: %v, subseconds: %v\n", key, cycleNum, ix, dm.curIndex, subSecond)
//把任务加入tasks中
tasks := dm.slots[ix];
if _, ok := tasks[key]; ok {
return errors.New("该slots中已存在key为" + key + "的任务");
}
tasks[key] = &Task{
cycleNum: cycleNum,
exec: exec,
params: params,
catagory: catagory,
};
// TODO 持久化部分,这样即便中途crash,下次重启也能得到及时的恢复
return nil;
}
func (dm *DelayMessage) DeleteTask(key string) error {
tasks := dm.slots[dm.curIndex]
if _, ok := tasks[key]; ok {
delete(tasks, key)
}
return nil
}
//func main() {
//创建延迟消息
//dm := NewDelayMessage();
////添加任务
//dm.AddTask(time.Now().Add(time.Second*2), "test1", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{1, 2, 3});
//dm.AddTask(time.Now().Add(time.Second*4), "test2", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{4, 5, 6});
//dm.AddTask(time.Now().Add(time.Second*12), "test3", TASK_TYPE_DELAY, func(args ...interface{}) {
// fmt.Println(args...);
//}, []interface{}{"hello", "world", "test"});
//dm.AddTask(time.Now().Add(time.Second), "test4", TASK_TYPE_INTERVAL, func(args ...interface{}) {
// fmt.Printf("操你妈", args...)
//}, []interface{}{1, 2, 3});
//
////40秒后关闭
////time.AfterFunc(time.Second*2, func() {
//// //dm.Close();
////});
//dm.Start();
//}
var mamager DelayMessage
//func publish(manager *DelayMessage, seconds int, key string, exec TaskFunc, params []interface{}) error {
// manager.AddTask(time.Now().Add(time.Second * time.Duration(seconds)), key, TASK_TYPE_DELAY, func(args... interface{}) {
// fmt.Println("key: " + key)
// }, params)
//
// return nil
//}
func httpPost(msg string, webhook string) {
formatter := `{
"msgtype": "text",
"text": {
"content":"%s",
},
"at": {
"atMobiles":[],
"isAtAll": false
}
}
`
content := fmt.Sprintf(formatter, msg + "[" + time.Now().String() + "]")
payload := []byte(content)
resp, err := http.Post(webhook, "application/json", bytes.NewBuffer(payload))
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
fmt.Println(string(body))
}
type Message struct {
Sessionid string
Anchorid string
Msg string
}
func RedisPublish(info string) {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Fatal(err)
return
}
defer client.Close()
resp, err := client.Do("publish", "channel", info)
if err != nil {
// TODO 跳转到监控报警
log.Fatal(err)
return
}
fmt.Println(resp)
}
func main() {
manager := NewDelayMessage()
go manager.Start()
app := iris.New()
app.Get("/hello", func(context iris.Context) {
context.WriteString("pong")
})
app.Get("/publish", func(context iris.Context) {
msg := context.FormValue("msg")
seconds, _ := strconv.Atoi(context.FormValue("seconds"))
catagory, _ := strconv.Atoi(context.FormValue("catagory"))
fmt.Println("get params: " + msg)
if catagory != TASK_TYPE_DELAY || catagory != TASK_TYPE_INTERVAL {
catagory = TASK_TYPE_DELAY
}
manager.AddTask(seconds, "test1", catagory, func(args ...interface{}) {
httpPost(args[0].(string), DINGTALK_WEBHOOK)
}, []interface{}{msg})
context.WriteString("Added Succeed!" + time.Now().String())
})
app.Get("/delay", func(ctx iris.Context) {
message := Message{
Sessionid: ctx.FormValue("sessionid"),
Anchorid: ctx.FormValue("anchorid"),
Msg:ctx.FormValue("msg"),
}
jsondata, err := json.Marshal(&message)
if err != nil {
ctx.WriteString(err.Error())
}
RedisPublish(string(jsondata))
ctx.WriteString(string(jsondata))
})
app.Run(iris.Addr(":8080"))
}% ➜
测试
开启服务go run delayring.go
, 然后在浏览器中访问服务,大致含义是3秒后触发一个timeout事件,触发钉钉机器人消息推送。
➜ asyncdemos go run delayring.go
Now listening on: http://localhost:8080
Application started. Press CMD+C to shut down.
2018-11-19 11:35:24, [0]
get params: 难受
key: test1, cycle: 0, index: 4 , curIndex: 1, subseconds: 3
2018-11-19 11:35:25, [1]
2018-11-19 11:35:26, [2]
2018-11-19 11:35:27, [3]
CURINDEX[4], key: test1, cyclenum: 0
{"errmsg":"ok","errcode":0}
2018-11-19 11:35:28, [4]
2018-11-19 11:35:29, [5]
2018-11-19 11:35:30, [6]
2018-11-19 11:35:31, [7]
^C[ERRO] 2018/11/19 11:35 http: Server closed
谈谈感受
- 仔细看测试结果,发现时间戳和对应执行时间戳还是可以对的上的。但是有一个极大的弊端就是数据。万一服务挂掉了,数据就会全部丢掉,这是不能容忍的。
- 代码可维护性也较低,当然了,代码没做啥涉及,封装的不够完善。
- 引入了额外的服务, 导致整个系统的可维护性降低,增大了服务宕机的危险。
- 语言相关性较强,对非golang的业务程序有一定的门槛。
借助第三方库
python的tornado一向以异步高效率著称,异步对它来说就是个普通的业务。所以我们无需考虑具体的实现细节,专注于业务逻辑即可。那么今天咱也来试试水。
代码实现
很幸运的一下子就搜到了对应的demo,如下:
➜ asyncdemos cat demo.py
#coding: utf8
__author__ = "郭 璞"
__email__ = "marksinoberg@gmail.com"
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
import tornado.httpclient
import tornado.gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
import time
from tornado.options import define, options
define("port", default=8002, help="run on the port", type=int)
class SleepHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(2)
def get(self):
seconds = self.request.arguments.get("seconds", 10)
tornado.ioloop.IOLoop.instance().add_callback(self.sleep, seconds)
self.write("when i sleep")
@run_on_executor
def sleep(self, seconds):
print(time.time())
time.sleep(5)
print("yes", seconds)
print(time.time())
return seconds
if __name__ == "__main__":
# tornado.options.parse_command_line()
app = tornado.web.Application(
handlers=[(r"/sleep", SleepHandler), ])
http_server = tornado.httpserver.HTTPServer(app)
http_server.listen(8002)
tornado.ioloop.IOLoop.instance().start()%
运行服务: python demo.py
, 然后访问服务:
查看下输出结果
这里使用了默认值参数,所以可以看出也是正确的,服务在第三秒后得到了触发并进行了对应的执行操作。
谈谈感受
- 库支持,无需考虑底层细节,专注于业务流程即可。
- 面临着和自己写定时器一样的问题,那就是数据的同步,以及错误恢复等。
- 引入了第三方服务,系统可维护性以及宕机的可能性变大。
借助开源软件
在和周围人的讨论中,发现延迟执行的一个解决方案就是采用消息队列。比如beanstalk和rabbitMQ等。我没去调研rabbitMQ怎么用,这块内容挺大的,光是那一大坨的配置文件就让人头大,所以我倾向于使用beanstalk。
配置环境
用之前进行安装, 启动即可。
# 安装
sudo apt-get install beanstalkd
# 启动, 并后台运行。如果觉得不保险,还可以用nohup的形式
beanstalkd -l 127.0.0.1 -p 12345 &
使用的细节可以参考下面的这篇文章。PHP使用Beanstalkd消息队列
我这里问了方便自己看下原型效果,就用python简单写写了。开始之前记得安装beanstalk的依赖库beanstalkc,
pip install beanstalkc
代码实现
先来看看生产者。
guo@Server218:/tmp$ cat beanstalkdemo.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
print(conn.tubes())
print(conn.stats())
conn.use("default")
ts = time.time()
handletime = ts + 10
conn.put("helloworld" + str(ts) + ", handletime:" + str(handletime), 1, 10)
print("putted")
再来看看消费者。
guo@Server218:/tmp$ cat consumer.py
#!/usr/bin python
import beanstalkc
import time
conn = beanstalkc.Connection(host="localhost", port=12345)
conn.use("default")
job = conn.reserve()
print(job.body)
job.delete()
ts = time.time()
print("CONSUME DONE: " + str(ts))
测试
- 先运行生产者。
guo@Server218:/tmp$ python beanstalkdemo.py
['default']
{'current-connections': 1, 'max-job-size': 65535, 'cmd-release': 0, 'cmd-reserve': 0, 'pid': 8384, 'cmd-bury': 0, 'current-producers': 0, 'total-jobs': 0, 'current-jobs-ready': 0, 'cmd-peek-buried': 0, 'current-tubes': 1, 'id': 'b0b7cf3b44c2e296', 'current-jobs-delayed': 0, 'uptime': 2, 'cmd-watch': 0, 'hostname': 'Server218', 'job-timeouts': 0, 'cmd-stats': 1, 'rusage-stime': 0.0, 'version': 1.1, 'current-jobs-reserved': 0, 'current-jobs-buried': 0, 'cmd-reserve-with-timeout': 0, 'cmd-put': 0, 'cmd-pause-tube': 0, 'cmd-list-tubes-watched': 0, 'cmd-list-tubes': 1, 'current-workers': 0, 'cmd-list-tube-used': 0, 'cmd-ignore': 0, 'binlog-records-migrated': 0, 'current-waiting': 0, 'cmd-peek': 0, 'cmd-peek-ready': 0, 'cmd-peek-delayed': 0, 'cmd-touch': 0, 'binlog-oldest-index': 0, 'binlog-current-index': 0, 'cmd-use': 0, 'total-connections': 1, 'cmd-delete': 0, 'binlog-max-size': 10485760, 'cmd-stats-job': 0, 'rusage-utime': 0.0, 'cmd-stats-tube': 0, 'binlog-records-written': 0, 'cmd-kick': 0, 'current-jobs-urgent': 0}
putted
- 跑一下消费者,看看效果。
guo@Server218:/tmp$ python consumer.py
helloworld1542605160.63, handletime:1542605170.63
CONSUME DONE: 1542605172.33
从上面可以看出,延迟执行的目标已经实现了。
谈谈感受
- 引入了第三方服务,造成了维护成本的增加。
- 解耦性比较好,语言无关。
- 数据可较好的保存,不至于丢失数据,容错性好。
总结
调研了这么多,发现每一个都有自己的优缺点吧,没有说哪一个是最好的选择,只能算是合适的场景选择合适的服务。
且行且思
有疑问加站长微信联系(非本文作者)