需要在Go应用程序中异步处理任务? Asynq,简单高效的任务队列实现。
最近发现了一个很好的Go简单高效的异步任务处理库:[Asyqn](https://github.com/hibiken/asynq), 开发自谷歌员工。
* * *
#### 安装
要安装asynq库和asynqmon命令行工具,请运行以下命令:
```go
go get -u github.com/hibiken/asynq
go get -u github.com/hibiken/asynq/tools/asynqmon
```
#### 入门
在本asynq教程中,我们将创建两个程序。
`producer.go `将创建并定时要由consumer异步处理的任务。
`consumer.go` 将处理producer创建的任务。
**假定在上运行Redis服务器localhost:6379**。在开始之前,请确保已安装并运行Redis。
我们需要做的第一件事是创建两个主文件:
```go
mkdir producer consumer
touch producer/producer.go consumer/consumer.go
```
导入`asynq`两个文件:
```go
import "github.com/hibiken/asynq"
```
Asynq使用Redis作为消息代理。使用一种`RedisConnOpt`类型来指定如何连接到Redis。我们这里将使用`RedisClientOpt`:
```go
// both in producer.go and consumer.go
var redis = &asynq.RedisClientOpt{
Addr: "localhost:6379",
// Omit if no password is required
Password: "mypassword",
// Use a dedicated db number for asynq.
// By default, Redis offers 16 databases (0..15)
DB: 0,
}
```
在`producer.go`,我们将创建一个`Client`实例来创建和定时任务。
在`asynq`,要执行的工作单元被封装在称为的结构中Task。其中有两个字段:`Type`和`Payload`。
```go
// Task represents a task to be performed.
type Task struct {
// Type indicates the type of task to be performed.
Type string
// Payload holds data needed to perform the task.
Payload Payload
}
```
要创建任务,请使用`NewTask`函数,并为任务传递类型和有效负载。
可以通过`Client.Schedule`传入任务和需要处理的时间来计划任务。
```go
// producer.go
func main() {
client := asynq.NewClient(redis)
// Create a task with typename and payload.
t1 := asynq.NewTask(
"send_welcome_email",
map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask(
"send_reminder_email",
map[string]interface{}{"user_id": 42})
// Process the task immediately.
err := client.Schedule(t1, time.Now())
if err != nil {
log.Fatal(err)
}
// Process the task 24 hours later.
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
if err != nil {
log.Fatal(err)
}
}
```
在`consumer.go`,创建一个`Background`例来处理任务。
`NewBackground`函数需要`RedisConnOp`t和`Config`。
您可以查看有关文档,Config以查看可用的选项。
在此示例中,我们仅指定并发。
```go
// consumer.go
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(handler)
}
```
参数t`(*asynq.Background).Run`是`asynq.Handler`具有一种方法的接口`ProcessTask`。
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
ProcessTask(*Task) error
}
```
实现处理程序的最简单方法是定义一个具有相同`type`的函数,并`asynq.HandlerFunc`在将其传递给时使用适配器类型`Run`。
```go
func handler(t *asynq.Task) error {
switch t.Type {
case "send_welcome_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
case "send_reminder_email":
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Reminder Email to User %d\n", id)
default:
return fmt.Errorf("unexpected task type: %s", t.Type)
}
return nil
}
func main() {
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
// Use asynq.HandlerFunc adapter for a handler function
bg.Run(asynq.HandlerFunc(handler))
}
```
我们可以继续向该处理函数添加案例,但是在实际应用中,在单独的函数中为每种案例定义逻辑很方便。为了重构我们的代码,让我们创建一个简单的调度程序,将任务类型映射到其处理程序:
```go
// consumer.go
// Dispatcher is used to dispatch tasks to registered handlers.
type Dispatcher struct {
mapping map[string]asynq.HandlerFunc
}
// HandleFunc registers a task handler
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
d.mapping[taskType] = fn
}
// ProcessTask processes a task.
//
// NOTE: Dispatcher satisfies asynq.Handler interface.
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
fn, ok := d.mapping[task.Type]
if !ok {
return fmt.Errorf("no handler registered for %q", task.Type)
}
return fn(task)
}
func main() {
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
d.HandleFunc("send_reminder_email", sendReminderEmail)
bg := asynq.NewBackground(redis, &asynq.Config{
Concurrency: 10,
})
bg.Run(d)
}
func sendWelcomeEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
func sendReminderEmail(t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send Welcome Email to User %d\n", id)
return nil
}
```
现在我们既有任务生产者又有消费者,我们可以运行这两个程序。
```go
go run producer.go
```
这将创建两项任务:一项应立即处理,另一项将在24小时后处理。
让我们使用`asynqmon`工具检查任务。
```go
asynqmon stats
```
你应该能看到,有一个任务**Enqueued**状态,另一个在**Scheduled**状态。
注意:如需了解每种状态的含义,请参阅Wiki页面上[Life of Task](https://github.com/hibiken/asynq/wiki/Life-of-a-Task)。
让我们运行`asynqmon`与`watch`命令,以便我们能够连续运行的命令看到的变化。
```go
watch -n 3 asynqmon stats # Runs `asynqmon stats` every 3 seconds
```
最后,让我们启动consumer程序来处理定时的任务。
```go
go run consumer.go
```
**注意**:在您发送信号终止程序之前,此操作不会退出。有关如何安全终止后台处理的最佳实践,请参见[Signal Wiki页面](https://github.com/hibiken/asynq/wiki/Signals)。
您应该能够看到在终端上打印的文本,表明该任务已成功处理。
这是一次asynq基础的快速教程。要了解有关其所有功能(如**[优先级队列](https://github.com/hibiken/asynq/wiki/Priority-Queues)**和**[自定义重试](https://github.com/hibiken/asynq/wiki/Task-Retry)**)的更多[信息](https://github.com/hibiken/asynq/wiki),请参见的[Wiki页面](https://github.com/hibiken/asynq/wiki)。
#### **命令行工具**
Asynq附带了一个命令行工具来检查队列和任务的状态。
要安装,请运行以下命令:
```go
go get github.com/hibiken/asynq/tools/asynqmon
```
完成!
例图:![asynqmon_stats.gif](https://upload-images.jianshu.io/upload_images/20999185-513004b2b31192ef.gif?imageMogr2/auto-orient/strip)
详情请参考:[Asyqn-https://github.com/hibiken/asynq](https://github.com/hibiken/asynq)
交流群:[https://gitter.im/go-asynq/community](https://gitter.im/go-asynq/community)
有疑问加站长微信联系(非本文作者))