教程:Asynq 实现Go中异步定时任务处理

paigel · · 3592 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

需要在Go应用程序中异步处理任务? Asynq,简单高效的任务队列实现。 最近发现了一个很好的Go简单高效的异步任务处理库:[Asyqn](https://github.com/hibiken/asynq), 开发自谷歌员工。 * * * #### 安装 要安装asynq库和asynqmon命令行工具,请运行以下命令: ```go go get -u github.com/hibiken/asynq go get -u github.com/hibiken/asynq/tools/asynqmon ``` #### 入门 在本asynq教程中,我们将创建两个程序。 `producer.go `将创建并定时要由consumer异步处理的任务。 `consumer.go` 将处理producer创建的任务。 **假定在上运行Redis服务器localhost:6379**。在开始之前,请确保已安装并运行Redis。 我们需要做的第一件事是创建两个主文件: ```go mkdir producer consumer touch producer/producer.go consumer/consumer.go ``` 导入`asynq`两个文件: ```go import "github.com/hibiken/asynq" ``` Asynq使用Redis作为消息代理。使用一种`RedisConnOpt`类型来指定如何连接到Redis。我们这里将使用`RedisClientOpt`: ```go // both in producer.go and consumer.go var redis = &asynq.RedisClientOpt{ Addr: "localhost:6379", // Omit if no password is required Password: "mypassword", // Use a dedicated db number for asynq. // By default, Redis offers 16 databases (0..15) DB: 0, } ``` 在`producer.go`,我们将创建一个`Client`实例来创建和定时任务。 在`asynq`,要执行的工作单元被封装在称为的结构中Task。其中有两个字段:`Type`和`Payload`。 ```go // Task represents a task to be performed. type Task struct { // Type indicates the type of task to be performed. Type string // Payload holds data needed to perform the task. Payload Payload } ``` 要创建任务,请使用`NewTask`函数,并为任务传递类型和有效负载。 可以通过`Client.Schedule`传入任务和需要处理的时间来计划任务。 ```go // producer.go func main() { client := asynq.NewClient(redis) // Create a task with typename and payload. t1 := asynq.NewTask( "send_welcome_email", map[string]interface{}{"user_id": 42}) t2 := asynq.NewTask( "send_reminder_email", map[string]interface{}{"user_id": 42}) // Process the task immediately. err := client.Schedule(t1, time.Now()) if err != nil { log.Fatal(err) } // Process the task 24 hours later. err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) if err != nil { log.Fatal(err) } } ``` 在`consumer.go`,创建一个`Background`例来处理任务。 `NewBackground`函数需要`RedisConnOp`t和`Config`。 您可以查看有关文档,Config以查看可用的选项。 在此示例中,我们仅指定并发。 ```go // consumer.go func main() { bg := asynq.NewBackground(redis, &asynq.Config{ Concurrency: 10, }) bg.Run(handler) } ``` 参数t`(*asynq.Background).Run`是`asynq.Handler`具有一种方法的接口`ProcessTask`。 ```go // ProcessTask should return nil if the processing of a task // is successful. // // If ProcessTask return a non-nil error or panics, the task // will be retried. type Handler interface { ProcessTask(*Task) error } ``` 实现处理程序的最简单方法是定义一个具有相同`type`的函数,并`asynq.HandlerFunc`在将其传递给时使用适配器类型`Run`。 ```go func handler(t *asynq.Task) error { switch t.Type { case "send_welcome_email": id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Welcome Email to User %d\n", id) case "send_reminder_email": id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Reminder Email to User %d\n", id) default: return fmt.Errorf("unexpected task type: %s", t.Type) } return nil } func main() { bg := asynq.NewBackground(redis, &asynq.Config{ Concurrency: 10, }) // Use asynq.HandlerFunc adapter for a handler function bg.Run(asynq.HandlerFunc(handler)) } ``` 我们可以继续向该处理函数添加案例,但是在实际应用中,在单独的函数中为每种案例定义逻辑很方便。为了重构我们的代码,让我们创建一个简单的调度程序,将任务类型映射到其处理程序: ```go // consumer.go // Dispatcher is used to dispatch tasks to registered handlers. type Dispatcher struct { mapping map[string]asynq.HandlerFunc } // HandleFunc registers a task handler func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) { d.mapping[taskType] = fn } // ProcessTask processes a task. // // NOTE: Dispatcher satisfies asynq.Handler interface. func (d *Dispatcher) ProcessTask(task *asynq.Task) error { fn, ok := d.mapping[task.Type] if !ok { return fmt.Errorf("no handler registered for %q", task.Type) } return fn(task) } func main() { d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)} d.HandleFunc("send_welcome_email", sendWelcomeEmail) d.HandleFunc("send_reminder_email", sendReminderEmail) bg := asynq.NewBackground(redis, &asynq.Config{ Concurrency: 10, }) bg.Run(d) } func sendWelcomeEmail(t *asynq.Task) error { id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Welcome Email to User %d\n", id) return nil } func sendReminderEmail(t *asynq.Task) error { id, err := t.Payload.GetInt("user_id") if err != nil { return err } fmt.Printf("Send Welcome Email to User %d\n", id) return nil } ``` 现在我们既有任务生产者又有消费者,我们可以运行这两个程序。 ```go go run producer.go ``` 这将创建两项任务:一项应立即处理,另一项将在24小时后处理。 让我们使用`asynqmon`工具检查任务。 ```go asynqmon stats ``` 你应该能看到,有一个任务**Enqueued**状态,另一个在**Scheduled**状态。 注意:如需了解每种状态的含义,请参阅Wiki页面上[Life of Task](https://github.com/hibiken/asynq/wiki/Life-of-a-Task)。 让我们运行`asynqmon`与`watch`命令,以便我们能够连续运行的命令看到的变化。 ```go watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds ``` 最后,让我们启动consumer程序来处理定时的任务。 ```go go run consumer.go ``` **注意**:在您发送信号终止程序之前,此操作不会退出。有关如何安全终止后台处理的最佳实践,请参见[Signal Wiki页面](https://github.com/hibiken/asynq/wiki/Signals)。 您应该能够看到在终端上打印的文本,表明该任务已成功处理。 这是一次asynq基础的快速教程。要了解有关其所有功能(如**[优先级队列](https://github.com/hibiken/asynq/wiki/Priority-Queues)**和**[自定义重试](https://github.com/hibiken/asynq/wiki/Task-Retry)**)的更多[信息](https://github.com/hibiken/asynq/wiki),请参见的[Wiki页面](https://github.com/hibiken/asynq/wiki)。 #### **命令行工具** Asynq附带了一个命令行工具来检查队列和任务的状态。 要安装,请运行以下命令: ```go go get github.com/hibiken/asynq/tools/asynqmon ``` 完成! 例图:![asynqmon_stats.gif](https://upload-images.jianshu.io/upload_images/20999185-513004b2b31192ef.gif?imageMogr2/auto-orient/strip) 详情请参考:[Asyqn-https://github.com/hibiken/asynq](https://github.com/hibiken/asynq) 交流群:[https://gitter.im/go-asynq/community](https://gitter.im/go-asynq/community)

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

3592 次点击  ∙  1 赞  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传