Micro In Action(四):Pub/Sub

polaris · · 2247 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

![Micro In Action](https://s1.ax1x.com/2020/03/17/8a2858.png) > 本文作者:Che Dan > > 原文链接:https://medium.com/@dche423/micro-in-action-pub-sub-cn-ce010bffe1c 本文是[Micro](https://micro.mu/ "Micro")系列文章的第四篇。我们将以实际开发微服务为主线,顺带解析相关功能。从最基本的话题开始,逐步转到高级特性。 接下来谈谈异步消息处理。要构建一个可伸缩、高容错、高并发的系统, 异步消息处理是一个关键技术。这种技术虽然强大, 但开发起来也相当麻烦, 远没有同步请求那样简单直接。 好在 Micro 对这个编程模型作了非常好的抽象与封装,供我们便利地使用。 除此之外, 借助 Micro 的接口抽象, 我们可以透明(或者说几乎透明)地支持各种消息服务器。Micro 默认提供了基于 HTTP 的消息服务器实现。同时也以插件形式提供了多种主流消息服务系统的支持。包括 Kafka,RabbitMQ,Nats,MQTT,NSQ,Amazon SQS 等。你可以到[插件主页](https://github.com/micro/go-plugins/tree/master/broker "插件主页")了解更多详细说明。 这使得我们在因业务需要而切换消息服务时,可以几乎不修改任何业务代码。 Micro 支持以两种不同方式处理异步消息, 一种是[Pub/Sub](https://en.wikipedia.org/wiki/Publish–subscribe_pattern "Pub/Sub"),另一种是使用`micro.Broker`接口进行消息收发。 前者相对简单,后者则能提供更大灵活度。 Micro 内置的 Pub/Sub 功能统一并简化了异步消息的收、发、编码和解码。这把开发者从底层技术细节中解放出来,去专注于创造业务价值。多数情况下我们应优先选择此方式。 下面我们将以实例解析一套 Pub/Sub 系统的开发和运行。 --- ## Sub,订阅消息 在本系列[第一篇文章](https://studygolang.com/articles/27111 "第一篇文章")中, 我们创建了一个示例项目,其中已经包含了订阅相关的代码。 首先定义消息处理 Handler, **./subscriber/hello.go** 代码如下: ```go package subscriberimport ( "context" "github.com/micro/go-micro/util/log" hello "hello/proto/hello" )type Hello struct{}func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error { log.Log("Handler Received message: ", msg.Say) return nil }func Handler(ctx context.Context, msg *hello.Message) error { log.Log("Function Received message: ", msg.Say) return nil } ``` 接收消息的代码可以是一个函数,也可以是对象的方法, 其签名为 `func(context.Context, v interface{}) error`。 注意在示例中方法的第二个参数是 `*hello.Message`, 此类型在.proto 文件中定义。Micro 框架会自动完成消息的解码。我们在 Handler 中可以直接使用。 准备好消息 Handler 以后, 需要进行注册, **./main.go** 相关代码如下: ```go ... // Register Struct as Subscriber micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), new(subscriber.Hello))// Register Function as Subscriber micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler) ... ``` 上述代码分别将对象和函数注册为消息处理 Handler, 接收 “**com.foo.srv.hello**” 这个主题(Topic )下的消息。 如果想更详细地控制订阅策略, 需要为`micro.RegisterSubscriber`方法传递更多参数。 我们先们看一下此方法的签名: ```go func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error ``` 第一个参数代表订阅的 Topic。 第二个参数是`server.Server`,其实例可从 service 中取得。 第三个参数是消息处理 Handler。 最后一个可选参数, 用于控制订阅行为,它的类型是`server.SubscriberOption`。目前 Micro 内置提供有 4 个选项: 1. **server.DisableAutoAck() SubscriberOption ,** 禁用自动确认。 2. **server.SubscriberContext(ctx context.Context) SubscriberOption**, 指定订阅 Context。 3. **server.InternalSubscriber(b bool) SubscriberOption,**内部订阅, 不把此订阅者信息广播到注册中心。 4. **server.SubscriberQueue(n string) SubscriberOption**,指定队列名。 **注:个人认为框架暴露出来的选项还是太少了**。 如果有稍高要求,就不得不去使用 Broker 接口。例如控制消息的持久化,控制出错重发的策略这些都是比较常见的需求。希望在后续版本中这一点可以得到扩展。 在上述几个选项中,`server.SubscriberQueue` 值得单独说明一下。 我们知道在 Pub/Sub 模型中有 Queue (或 Channel) 的概念, 如果一个 Topic 的多个订阅者各自拥有自己的 Queue, 那么消息会被复制分发到不同 Queue 中, 使得每个订阅者都可以接收到全部消息。 Micro 默认会为每个订阅者实例创建一个全局唯一 Queue。如果想要不同订阅者或单一订阅者的多个实例共享一个队列, 这时就需要用`server.SubscriberQueue`来明确指定队列名称了: ```go micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar")) ``` 这样当有多个订阅者节点运行时, 大家就会共享一个 Queue。因此消息会被分发到某一个节点进行处理, 避免相同的消息被重复处理。考虑到在分布系统中单一服务多节点运行是很常见的场景, 所以我的建议是: 除非你知道自己在作什么, 否则**永远明确指定队列 — — 哪怕目前只有一个订阅实例**。 最常见的作法是让队列与 Topic 同名。 至此, Pub/Sub 模型中的 Sub 部分就准备好了,下面开始编写 Pub 代码。 --- ## Pub,发布消息 我们创建一个发布消息的项目, 其结构如下: ```bash . ├── main.go ├── plugin.go ├── proto/hello │ └── hello.proto │ └── hello.pb.go │ └── hello.pb.micro.go ├── go.mod ├── go.sum ``` 其中除了**main.go** 内容有所不同以外,其它文件的内容与含义均与[《Micro In Action(二)》](https://studygolang.com/articles/27173 "《Micro In Action(二)》")所述一致,此处不再赘述。 **main.go** 文件代码如下: ```go package main import ( "context" "log" "time" "github.com/micro/go-micro" hello "hello/proto/hello" ) func main() { // New Service service := micro.NewService( micro.Name("com.foo.srv.hello.pub"), // name the client service ) // Initialise service service.Init() // create publisher pub := micro.NewPublisher("com.foo.srv.hello", service.Client()) // publish message every second for now := range time.Tick(time.Second) { if err := pub.Publish(context.TODO(), &hello.Message{Say: now.String()}); err != nil { log.Fatal("publish err", err) } } } ``` - 首先创建并初始化`micro.Service`实例, 将其命名为**com.foo.srv.hello.pub**。这个名字并没有特殊意义,在真实项目中很可能完全不同。 - 然后指定发送消息的目标 Topic,创建`micro.Publisher`实例。 - 接下来每秒钟送一条消息,消息类型为`*hello.Message`, 框架会自动对消息进行编码。 与订阅功能类似, 发布接口也支持可选选项,此选项可以用来控制发送行为。`Publisher`接口的定义如下: ```go // Publisher is syntactic sugar for publishing type Publisher interface { Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error } ``` 目前 Micro 框架仅提供了一个内置的发布选项: - **client.WithExchange(e string) PublishOption**,它用于控制消息的 Exchange(此概念将在后续文章中展开说明)。 --- ## 运行 在 pub 项目准备以后,首先运行 hello server,然后运行 pub 项目。 之后我们将在 hello server 的控制台中看到每秒追加的接收消息日志: ```bash $ go run main.go plugin.go2020-02-14 14:18:24.368336 I | Transport [http] Listening on [::]:56970 2020-02-14 14:18:24.368429 I | Broker [http] Connected to [::]:56971 2020-02-14 14:18:24.368680 I | Registry [mdns] Registering node: com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 2020-02-14 14:18:24.370575 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello 2020-02-14 14:18:24.370784 I | Subscribing com.foo.srv.hello-14b7ea99-167f-4136-ad11-ae22d45ed302 to topic: com.foo.srv.hello 2020-02-14 14:18:40.415610 I | Handler Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205 2020-02-14 14:18:40.415651 I | Function Received message: 2020-02-14 14:18:40.309255 +0800 CST m=+1.007480205 2020-02-14 14:18:41.310969 I | Handler Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968 2020-02-14 14:18:41.310999 I | Function Received message: 2020-02-14 14:18:41.310352 +0800 CST m=+2.008611968 ... ``` --- ## 总结 Micro 对异步消息支持很完备。 既支持高层次的 Pub/Sub 模式, 也支持面向 Broker 接口的底层收发操作。 其中 Pub/Sub 功能极大地简化了异步消息系统的开发, 使得我们可以忽略技术细节更聚焦在业务之上。 开发者只需定义好发送方,接收方以及消息内容, 其它工作全部由框架完成。 再不用考虑异步消息系统中常见的问题, 比如消息的路由、重发、接收确认, 也不用考虑消息内容的编码与解码。 当然这个简化也带来了一些局限, 如果 Pub/Sub 不能满足你的需求, 那么请关注本系列下篇文章:Message Broker。

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

2247 次点击  ∙  1 赞  
加入收藏 微博
被以下专栏收入,发现更多相似内容
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传