微服务实战Go Micro v3 系列(四)- 事件驱动(Pub/Sub)

celverbamboo · · 1426 次点击 · 开始浏览    置顶
这是一个创建于 的主题,其中的信息可能已经有所发展或是发生改变。

事件驱动架构 理解起来比较简单,普遍认为好的软件架构都是解耦的,微服务之间不应该相互耦合或依赖。举个例子,我们在代码中调用微服务 go.srv.user-service 的函数,会先通过服务发现找到微服务的地址再调用,我们的代码与该微服务有了直接性的调用交互,并不算是完全的解耦。 <!--more--> ## 源码地址 * [源码地址](https://github.com/CleverBamboo/go-micro-examples) * [爱租房](https://github.com/CleverBamboo/renting) ## 系列文章 * [微服务实战Go Micro v3 系列(一)- 基础篇](https://cleverbamboo.github.io/2021/04/27/GO/微服务实战Go-Micro-v3-系列(一)-基础篇/#more) * [微服务实战Go Micro v3 系列(二)- HelloWorld](https://cleverbamboo.github.io/2021/04/27/GO/微服务实战Go-Micro-v3-系列(二)-HelloWorld/#more) * [微服务实战Go-Micro v3 系列(三)- 启动HTTP服务](https://cleverbamboo.github.io/2021/04/28/GO/微服务实战Go-Micro-v3-系列(三)-启动HTTP服务/#more) * [微服务实战Go Micro v3 系列(四)- 事件驱动(Pub/Sub)](https://cleverbamboo.github.io/2021/05/12/GO/微服务实战Go-Micro-v3-系列(四)-事件驱动-Pub-Sub/#more) * [微服务实战Go Micro v3 系列(五)- 注册和配置中心](https://cleverbamboo.github.io/2021/06/02/GO/%E5%BE%AE%E6%9C%8D%E5%8A%A1%E5%AE%9E%E6%88%98Go-Micro-v3-%E7%B3%BB%E5%88%97%EF%BC%88%E4%BA%94%EF%BC%89-%E6%B3%A8%E5%86%8C%E5%92%8C%E9%85%8D%E7%BD%AE%E4%B8%AD%E5%BF%83/#more) * [微服务实战Go Micro v3 系列(六)- 综合篇(爱租房项目)](https://cleverbamboo.github.io/2021/06/08/GO/%E5%BE%AE%E6%9C%8D%E5%8A%A1%E5%AE%9E%E6%88%98Go-Micro-v3-%E7%B3%BB%E5%88%97%EF%BC%88%E5%85%AD%EF%BC%89-%E7%BB%BC%E5%90%88%E7%AF%87%EF%BC%88%E7%88%B1%E7%A7%9F%E6%88%BF%E9%A1%B9%E7%9B%AE%EF%BC%89/#more) ## 发布与订阅模式 为了理解事件驱动架构为何能使代码完全解耦,先了解事件的发布、订阅流程。微服务 X 完成任务 x 后通知消息系统说 “x 已完成”,它并不关心有哪些微服务正在监听这个事件、事件发生后会产生哪些影响。如果系统发生了某个事件,随之其他微服务都要做出动作是很容易的。 举个例子,user-service 创建了一个新用户,email-service 要给该用户发一封注册成功的邮件,message-service 要给网站管理员发一条用户注册的通知短信。 ### 一般实现 在 user-service 的代码中实例化另两个微服务 Client 后,调用函数发邮件和短信,代码耦合度很高。如下图: ![](https://z3.ax1x.com/2021/05/12/gw54BQ.png) ### 事件驱动 在事件驱动的架构下,user-service 只需向消息系统发布一条 topic 为 “user.created” 的消息,其他两个订阅了此 topic 的 service 能知道有用户注册了,拿到用户信息后他们自行发邮件、发短信。如下图: ![](https://z3.ax1x.com/2021/05/12/gw5OjU.png) ## 代码实现 ### Publish 事件发布 ```go package main import ( "context" "github.com/asim/go-micro/v3" "github.com/asim/go-micro/v3/metadata" "github.com/asim/go-micro/v3/server" "github.com/asim/go-micro/v3/util/log" proto "go-micro-examples/pubsub/proto" ) // Sub All methods of Sub will be executed when // a message is received type Sub struct{} // Process Method can be of any name func (s *Sub) Process(ctx context.Context, event *proto.Event) error { md, _ := metadata.FromContext(ctx) log.Logf("[pubsub.1] Received event %+v with metadata %+v\n", event, md) // do something with event return nil } // Alternatively a function can be used func subEv(ctx context.Context, event *proto.Event) error { md, _ := metadata.FromContext(ctx) log.Logf("[pubsub.2] Received event %+v with metadata %+v\n", event, md) // do something with event return nil } func main() { // create a service service := micro.NewService( micro.Name("go.micro.srv.pubsub"), ) // parse command line service.Init() // register subscriber if err := micro.RegisterSubscriber("example.topic,pubsub.1", service.Server(), new(Sub)); err != nil { log.Fatal(err) } // register subscriber with queue, each message is delivered to a unique subscriber if err := micro.RegisterSubscriber("example.topic.pubsub.2", service.Server(), subEv, server.SubscriberQueue("queue.pubsub")); err != nil { log.Fatal(err) } if err := service.Run(); err != nil { log.Fatal(err) } } ``` 运行效果如下: ![](https://z3.ax1x.com/2021/05/12/gwIw80.png) 可以看到,直接使用 go-micro 的 **NewEvent** 订阅即可,成功订阅 **example.topic.pubsub.1**、**example.topic.pubsub.2** ### Subscribe 事件订阅 事件既然有订阅,那么当然会有订阅 ```go package main import ( "context" "fmt" "github.com/asim/go-micro/v3" "github.com/asim/go-micro/v3/util/log" "github.com/pborman/uuid" proto "go-micro-examples/pubsub/proto" "time" ) // send events using the publisher func sendEv(topic string, p micro.Publisher) { t := time.NewTimer(time.Second) for _ = range t.C { // crate new event ev := &proto.Event{ Id: uuid.NewUUID().String(), Timestamp: time.Now().Unix(), Message: fmt.Sprintf("Messaging you all day on %s", topic), } log.Logf("publishing %+v\n", ev) // publish an event if err := p.Publish(context.Background(), ev); err != nil { log.Logf("error publishing %v", err) } } } func main() { // create a service service := micro.NewService( micro.Name("go.micro.cli.pubsub"), ) // parse command line service.Init() // create publisher pub1 := micro.NewEvent("example.topic.pubsub.1", service.Client()) pub2 := micro.NewEvent("example.topic.pubsub.2", service.Client()) // pub to topic 1 go sendEv("example.topic.pubsub.1", pub1) // pub to topic 2 go sendEv("example.topic.pubsub.2", pub2) // block forever select {} } ``` 运行效果如下: ![](https://z3.ax1x.com/2021/05/12/gwomsU.png) 从图中可以看出,成功订阅事件并调用成功

有疑问加站长微信联系(非本文作者)

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

1426 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传