![Micro In Action](https://s1.ax1x.com/2020/03/18/8Da5pd.png)
> 本文作者:Che Dan
>
> 原文链接:https://medium.com/@dche423/micro-in-action-5-message-broker-d975c2f28a55
本文是[Micro](https://micro.mu/)系列文章的第五篇。我们将从最基本的话题开始,逐步转到高级特性。今天的话题是 Message Broker。
[上篇文章](https://studygolang.com/articles/27282)我们谈到了如何在Micro中使用Pub/Sub,它的优点是简单, 缺点是牺牲了一些灵活性。如果想控制收发消息的底层细节, 则需要使用`github.com/micro/go-micro/broker.Broker` 接口。
此接口是Micro中异步消息处理的核心,事实上Pub/Sub 功能也是用此接口构建起来的。
这是一个优雅的设计。每一个好的框架应有这样的特质: **既提供高层次抽象,又允许底层API访问**。高层次抽象适用于多数情况,可以大幅度简化问题处理。而底层API用于少数前者无法满足的场景,不对开发者作过多限制。
------
我们先来看一个用`broker.Broker` 收发消息的实例。
```go
package main
import (
"fmt"
"log"
"time"
"github.com/micro/go-micro"
"github.com/micro/go-micro/broker"
)
var (
topic = "com.foo.topic"
)
func pub(brk broker.Broker) {
i := 0
for range time.Tick(time.Second) {
// build a message
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
// publish it
if err := brk.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
func sub(brk broker.Broker) {
// subscribe a topic with queue specified
_, err := brk.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
}, broker.Queue(topic))
if err != nil {
fmt.Println(err)
}
}
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.broker.example"), // name the client service
)
// Initialise service
service.Init(micro.AfterStart(func() error {
brk := service.Options().Broker
go sub(brk)
go pub(brk)
return nil
}))
service.Run()
}
```
下面我们把代码的核心关键部分加以说明。
## main函数
```go
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.broker.example"), // name the client service
) // Initialise service
service.Init(micro.AfterStart(func() error {
brk := service.Options().Broker
go sub(brk)
go pub(brk)
return nil
}))
service.Run()
}
```
- 首先创建`micro.Service`实例, 将其命名为**com.foo.broker.example**
- 用`micro.AfterStart` 选项初始化 `service`实例, 确保在服务启动后执行回调函数。 使用这个选项的原因是, 只有`service`启动以后, 才能确保`broker`已经准备好了。
- 在回调函数中首先获取`Broker`接口实例, 然后将它作为参数传递给sub和pub两个函数,分别用于收接消息和发消息。
## sub函数, 用于订阅
```go
func sub(brk broker.Broker) {
// subscribe a topic with queue specified
_, err := brk.Subscribe(topic, func(p broker.Event) error {
fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
return nil
}, broker.Queue(topic))
if err != nil {
fmt.Println(err)
}
}
```
传入`broker.Broker` 接口实例`brk`,并调用接口的 `Subscribe`方法。这个方法签名如下**:**
```go
type Event interface {
Topic() string
Message() *Message
Ack() error
}type Handler func(Event) errortype Broker interface {
... Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)
...
}
```
第一个参数是订阅主题。
第二个参数是事件处理Handler,它是一个回调函数, 每当收到新事件时, 此函数会被执行。 回调函数的参数是Event接口。从Event接口中可以获取主题和消息对象 `*broker.Message`。 另外,Event接口提供了Ack方法, 用于手工确认消息处理完成(一般与`DisableAutoAck`选项一起使用,下一段会进一步解释)。
第三个参数是 `broker.SubscribeOption` ,代表订阅选项。 它的含义与[前文]https://studygolang.com/articles/27282)提到的 `server.SubscriberOption`一致。框架内置的三个选项 `broker.DisableAutoAck , broker.Queue , broker.SubscribeContext`的作用也与前文提到的选项相同。 不同的是,broker插件可以实现自己的 `broker.SubscribeOption`,扩展插件功能。
例如 [RabbitMQ 插件](https://github.com/micro/go-plugins/tree/master/broker/rabbitmq)就提供了DurableQueue 选项, 用以控制队列的持久性:
```go
...
import "github.com/micro/go-plugins/broker/rabbitmq"
..._, err := broker.Subscribe(topic, func(p broker.Event) error {
...
p.Ack()
return nil
}, broker.Queue(topic), broker.DisableAutoAck(), rabbitmq.DurableQueue(),)
```
注意当使用了`DisableAutoAck`选项以后, 我们在代码中调用了`Ack`方法,以便完成手工确认。
[前面文章](https://studygolang.com/articles/27282)提到, 除了RabbitMQ,还有Kafka,NSQ等多种broker插件可用。 使用也方式大同小异。
在本例中我们遵循[前文](https://studygolang.com/articles/27282)提到的最佳实践: 永远为队列显式命名。 所以我们用`broker.Queue(topic)`作为方法的第三个参数。
至此`sub` 函数完成了消息订阅。 当接收到新消息时, 会输出消息头和消息体的日志。
## **pub 函数, 用于消息发送**
```go
func pub(brk broker.Broker) {
i := 0
for range time.Tick(time.Second) {
// build a message
msg := &broker.Message{
Header: map[string]string{
"id": fmt.Sprintf("%d", i),
},
Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
}
// publish it
if err := brk.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
i++
}
}
```
这段代码里的核心函数是 `brk.Publish`**,其签名如下:**
```go
type Message struct {
Header map[string]string
Body []byte
}
type Broker interface {
... Publish(topic string, msg *Message, opts ...PublishOption) error ...
}
```
第一个参数代表消息发送的目标主题。
第二个参数代表消息对象,类型为 `*broker.Message。`每条消息都可以设置字符串类型的消息头,消息体是二进制数据。
第三个参数是可选参数,类型为 `borker.PublishOption`。它代表可在发送时提供的额外选项。 框架没有内置选项,但broker插件实现自己的选项。这些自定义选项与特定插件相关。
还是以RabbitMQ 插件为例,它提供了控制消息持久化的选项 `DeliveryMode` ,其中mode取值的含义来自于RabbitMQ。 我们可以用如下的方式使用它:
```go
...
import "github.com/micro/go-plugins/broker/rabbitmq"
...const mode = 2 // Transient (0 or 1) or Persistent (2)
if err := broker.Publish(topic, msg, rabbitmq.DeliveryMode(mode)); err != nil {
log.Printf("[pub] failed: %v", err)
} else {
fmt.Println("[pub] pubbed message:", string(msg.Body))
}
...
```
其它broker插件的自定义发布选项与此类似。
解释了 `broker.Publish` **以后,** `pub` **函数就很好理解了。**
利用Ticker每秒钟循环执行一次。 每次执行时先创建 `*broker.Message` ,为其设置名为**id**的头信息,把当前时间作为消息内容。然后发送出去。
## 运行
准备好代码以后, 运行 `go run main.go`,将看到如下日志,每秒追加新日志:
```bash
$ go run main.go
[pub] pubbed message: 0: 2020-02-16 19:26:50.006125 +0800 CST m=+1.004985784
[sub] received message: 0: 2020-02-16 19:26:50.006125 +0800 CST m=+1.004985784 header map[id:0]
[pub] pubbed message: 1: 2020-02-16 19:26:51.010234 +0800 CST m=+2.009129265
[sub] received message: 1: 2020-02-16 19:26:51.010234 +0800 CST m=+2.009129265 header map[id:1]
...
```
虽然本例中 pub 与 sub 在一个进程中运行, **但这不是必须的,**只是为了演示方便**。**Micro 已经将底层的跨进程通讯作了封装, 我们完全可以把发送和接收放在两个程序中运行而不用考虑通讯细节。
注: 本示例修改自[官方示例](https://github.com/micro/examples/blob/master/broker/main.go)。之所以对其进行修改,是因为我认为**官方示例并不恰当**,会给开发者带来误导。 与其对比你会发现, 我们的版本是更一致、更简单的: 任何时候以都`micro.Service`为核心, 不必考虑命令行解析问题,不必单独处理broker的连接和初始化。
## 结论
当Micro提供的Pub/Sub功能无法满足业务需求时, 我们可以直接使用`broker.Broker`接口完成底层API访问。
所谓“底层”, 在消息收发的过程中, 可以针对不同的消息服务器更多细节控制。`broker.Broker` 接口由不同broker插件实现,适配不同消息服务器,这为我们提供了非常大的灵活性。
`broker.Broker`接口和Pub/Sub共同为Micro开发者提供了完备的异步消息开发框架。
有疑问加站长微信联系(非本文作者))