![Micro In Action](https://s1.ax1x.com/2020/03/17/8a2858.png)
> 本文作者:Che Dan
>
> 原文链接:https://medium.com/@dche423/micro-in-action-pub-sub-cn-ce010bffe1c
本文是[Micro](https://micro.mu/ "Micro")系列文章的第四篇。我们将以实际开发微服务为主线,顺带解析相关功能。从最基本的话题开始,逐步转到高级特性。
接下来谈谈异步消息处理。要构建一个可伸缩、高容错、高并发的系统, 异步消息处理是一个关键技术。这种技术虽然强大, 但开发起来也相当麻烦, 远没有同步请求那样简单直接。
好在 Micro 对这个编程模型作了非常好的抽象与封装,供我们便利地使用。
除此之外, 借助 Micro 的接口抽象, 我们可以透明(或者说几乎透明)地支持各种消息服务器。Micro 默认提供了基于 HTTP 的消息服务器实现。同时也以插件形式提供了多种主流消息服务系统的支持。包括 Kafka,RabbitMQ,Nats,MQTT,NSQ,Amazon SQS 等。你可以到[插件主页](https://github.com/micro/go-plugins/tree/master/broker "插件主页")了解更多详细说明。 这使得我们在因业务需要而切换消息服务时,可以几乎不修改任何业务代码。
Micro 支持以两种不同方式处理异步消息, 一种是[Pub/Sub](https://en.wikipedia.org/wiki/Publish–subscribe_pattern "Pub/Sub"),另一种是使用`micro.Broker`接口进行消息收发。 前者相对简单,后者则能提供更大灵活度。
Micro 内置的 Pub/Sub 功能统一并简化了异步消息的收、发、编码和解码。这把开发者从底层技术细节中解放出来,去专注于创造业务价值。多数情况下我们应优先选择此方式。
下面我们将以实例解析一套 Pub/Sub 系统的开发和运行。
---
## Sub,订阅消息
在本系列[第一篇文章](https://studygolang.com/articles/27111 "第一篇文章")中, 我们创建了一个示例项目,其中已经包含了订阅相关的代码。
首先定义消息处理 Handler, **./subscriber/hello.go** 代码如下:
```go
package subscriberimport (
"context"
"github.com/micro/go-micro/util/log" hello "hello/proto/hello"
)type Hello struct{}func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
log.Log("Handler Received message: ", msg.Say)
return nil
}func Handler(ctx context.Context, msg *hello.Message) error {
log.Log("Function Received message: ", msg.Say)
return nil
}
```
接收消息的代码可以是一个函数,也可以是对象的方法, 其签名为 `func(context.Context, v interface{}) error`。
注意在示例中方法的第二个参数是 `*hello.Message`, 此类型在.proto 文件中定义。Micro 框架会自动完成消息的解码。我们在 Handler 中可以直接使用。
准备好消息 Handler 以后, 需要进行注册, **./main.go** 相关代码如下:
```go
...
// Register Struct as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), new(subscriber.Hello))// Register Function as Subscriber
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler)
...
```
上述代码分别将对象和函数注册为消息处理 Handler, 接收 “**com.foo.srv.hello**” 这个主题(Topic )下的消息。
如果想更详细地控制订阅策略, 需要为`micro.RegisterSubscriber`方法传递更多参数。 我们先们看一下此方法的签名:
```go
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
```
第一个参数代表订阅的 Topic。 第二个参数是`server.Server`,其实例可从 service 中取得。 第三个参数是消息处理 Handler。
最后一个可选参数, 用于控制订阅行为,它的类型是`server.SubscriberOption`。目前 Micro 内置提供有 4 个选项:
1. **server.DisableAutoAck() SubscriberOption ,** 禁用自动确认。
2. **server.SubscriberContext(ctx context.Context) SubscriberOption**, 指定订阅 Context。
3. **server.InternalSubscriber(b bool) SubscriberOption,**内部订阅, 不把此订阅者信息广播到注册中心。
4. **server.SubscriberQueue(n string) SubscriberOption**,指定队列名。
**注:个人认为框架暴露出来的选项还是太少了**。 如果有稍高要求,就不得不去使用 Broker 接口。例如控制消息的持久化,控制出错重发的策略这些都是比较常见的需求。希望在后续版本中这一点可以得到扩展。
在上述几个选项中,`server.SubscriberQueue` 值得单独说明一下。
我们知道在 Pub/Sub 模型中有 Queue (或 Channel) 的概念, 如果一个 Topic 的多个订阅者各自拥有自己的 Queue, 那么消息会被复制分发到不同 Queue 中, 使得每个订阅者都可以接收到全部消息。
Micro 默认会为每个订阅者实例创建一个全局唯一 Queue。如果想要不同订阅者或单一订阅者的多个实例共享一个队列, 这时就需要用`server.SubscriberQueue`来明确指定队列名称了:
```go
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar"))
```
这样当有多个订阅者节点运行时, 大家就会共享一个 Queue。因此消息会被分发到某一个节点进行处理, 避免相同的消息被重复处理。考虑到在分布系统中单一服务多节点运行是很常见的场景, 所以我的建议是: 除非你知道自己在作什么, 否则**永远明确指定队列 — — 哪怕目前只有一个订阅实例**。 最常见的作法是让队列与 Topic 同名。
至此, Pub/Sub 模型中的 Sub 部分就准备好了,下面开始编写 Pub 代码。
---
## Pub,发布消息
我们创建一个发布消息的项目, 其结构如下:
```bash
.
├── main.go
├── plugin.go
├── proto/hello
│ └── hello.proto
│ └── hello.pb.go
│ └── hello.pb.micro.go
├── go.mod
├── go.sum
```
其中除了**main.go** 内容有所不同以外,其它文件的内容与含义均与[《Micro In Action(二)》](https://studygolang.com/articles/27173 "《Micro In Action(二)》")所述一致,此处不再赘述。
**main.go** 文件代码如下:
```go
package main
import (
"context"
"log"
"time"
"github.com/micro/go-micro"
hello "hello/proto/hello"
)
func main() {
// New Service
service := micro.NewService(
micro.Name("com.foo.srv.hello.pub"), // name the client service
)
// Initialise service
service.Init()
// create publisher
pub := micro.NewPublisher("com.foo.srv.hello", service.Client())
// publish message every second
for now := range time.Tick(time.Second) {
if err := pub.Publish(context.TODO(), &hello.Message{Say: now.String()}); err != nil {
log.Fatal("publish err", err)
}
}
}
```
- 首先创建并初始化`micro.Service`实例, 将其命名为**com.foo.srv.hello.pub**。这个名字并没有特殊意义,在真实项目中很可能完全不同。
- 然后指定发送消息的目标 Topic,创建`micro.Publisher`实例。
- 接下来每秒钟送一条消息,消息类型为`*hello.Message`, 框架会自动对消息进行编码。
与订阅功能类似, 发布接口也支持可选选项,此选项可以用来控制发送行为。`Publisher`接口的定义如下:
```go
// Publisher is syntactic sugar for publishing
type Publisher interface {
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
```
目前 Micro 框架仅提供了一个内置的发布选项:
- **client.WithExchange(e string) PublishOption**,它用于控制消息的 Exchange(此概念将在后续文章中展开说明)。
---
## 运行
在 pub 项目准备以后,首先运行 hello server,然后运行 pub 项目。
之后我们将在 hello server 的控制台中看到每秒追加的接收消息日志:
```bash
$ go run main.go plugin.go2020-02-14 14:18:24.368336 I | Transport [http] Listening on [::]:56970
2020-02-14 14:18:24.368429 I | Broker [http] Connected to [::]:56971
2020-02-14 14:18:24.368680 I | Registry [mdns] Registering node: com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302
2020-02-14 14:18:24.370575 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:24.370784 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello
2020-02-14 14:18:40.415610 I | Handler Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:40.415651 I | Function Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205
2020-02-14 14:18:41.310969 I | Handler Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
2020-02-14 14:18:41.310999 I | Function Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968
...
```
---
## 总结
Micro 对异步消息支持很完备。 既支持高层次的 Pub/Sub 模式, 也支持面向 Broker 接口的底层收发操作。
其中 Pub/Sub 功能极大地简化了异步消息系统的开发, 使得我们可以忽略技术细节更聚焦在业务之上。
开发者只需定义好发送方,接收方以及消息内容, 其它工作全部由框架完成。 再不用考虑异步消息系统中常见的问题, 比如消息的路由、重发、接收确认, 也不用考虑消息内容的编码与解码。
当然这个简化也带来了一些局限, 如果 Pub/Sub 不能满足你的需求, 那么请关注本系列下篇文章:Message Broker。
有疑问加站长微信联系(非本文作者))