本章节阐述micro消息订阅和发布相关内容
# 阅读本文前你可能需要进行如下知识储备
+ [golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境,](https://www.techidea8.com/archives/49)
+ [golang微服务框架go-micro 入门笔记2.1 micro工具之micro api](https://www.techidea8.com/archives/53)
+ [golang微服务框架go-micro 入门笔记2.2 micro工具之micro web](https://www.techidea8.com/archives/58)
# broker代理
微服务之间需要通过broker来传递消息,go-micro支持http/nats/memory三种broker,其中http是默认的broker。
同时,go-micro以强大的插件形式,提供如下几种常见的broker。
```shell
$ls
gocloud/ googlepubsub/ grpc/ kafka/ mqtt/ nats/ nsq/ proxy/ rabbitmq/ redis/ snssqs/ sqs/ stan/ stomp/
```
## http
HTTP Broker 是基于HTTP的异步broker,源代码在`github.com\micro\go-micro@v1.9.1\broker\broker.go`中,默认DefaultBroker为http
```go
var (
DefaultBroker Broker = newHttpBroker()
)
```
httpbroker实际上就是一个结构体
```go
type httpBroker struct {
id string //微服务ID
address string //主机地址
opts Options //一些配置
mux *http.ServeMux //通过这个监听其他端发送的http请求
c *http.Client //通过这个发送请求到其他端
r registry.Registry
sync.RWMutex
subscribers map[string][]*httpSubscriber //订阅
running bool
exit chan chan error
// offline message inbox
mtx sync.RWMutex
inbox map[string][][]byte //数据缓存
}
```
通过`http.Client`发送请求,通过`http.ServeMux`实现请求监听,通过`inbox`存储数据
## redis
redis初始化代码如下
```golang
//main.go
//初始化URL格式redis://密码@主机:端口/
b := redis.NewBroker(
broker.Addrs("redis://user:secret@localhost:6379/"),
)
//初始化
b.Init()
//连接
b.Connect()
// 新建service
service := grpc.NewService(
micro.Name("go.micro.web.config"),
micro.Version("latest"),
micro.Broker(b),
)
//初始化service
service.Init()
//启动,运行,监听
service.Run()
```
启动应用程序需要指定broker为redis
```shell
go run main.go --broker=redis
```
## grpc 初始化
初始化过程如下
```go
//main.go
import (
"github.com/micro/go-plugins/broker/grpc"
)
// 建立连接
b := grpc.NewBroker()
b.Init()
b.Connect()
// 订阅事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()
// 发布事件
b.Publish("events", &broker.Message{
Headers: map[string]string{"type": "event"},
Body: []byte(`an event`),
})
```
启动应用程序需要指定broker为grpc
```shell
go run main.go --broker=grpc
```
## rabbitmq 初始化
初始化过程如下
```go
//main.go
import (
"github.com/micro/go-plugins/broker/grpc"
)
b := rabbitmq.NewBroker(
broker.Addrs("amqp://用户名:密码@主机host:端口port"),
)
b.Init()
b.Connect()
```
启动应用程序需要指定broker为rabbitmq
```shell
go run main.go plugin.go --broker=rabbitmq
```
## mqtt
初始化过程如下
```go
//main.go
import (
"github.com/micro/go-micro"
"github.com/micro/go-plugins/broker/mqtt"
)
func main() {
service := micro.NewService(
micro.Name("my.service"),
micro.Broker(mqtt.NewBroker()),
)
//...
}
```
启动应用程序需要指定broker为mqtt
```shell
go run main.go plugin.go --broker=mqtt
```
## 其他
其他可以阅读代码
```
$GOPATH/src/github.com/micro/go-plugins/broker
```
# 消息订阅和发布
## 通过micro.RegisterSubscriber实现消息订阅
消息订阅主要API接口如下,第一个参数标识消息主题,第二个参数表示服务实例。
```go
// Register Struct as Subscriber
micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv))
// Register Function as Subscriber
micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)
```
重点注意第三个参数,第三个参数是处理函数,可以是函数,也可以是实现了
`func Handler(ctx context.Context, msg *testsrv.Message) error `方法的结构体,micro内部会根据参数类型自动适配。结构体中可以实现多个`func Handler(ctx context.Context, msg *testsrv.Message) error`类型方法
## 通过broker.Subscribe实现订阅
Broker提供如下接口
```go
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
```
+ Subscribe 订阅事件,topic代表主题,h事件处理函数
+ Publish 发布事件
## 消息处理函数Handler 定义
在上述涉及到处理函数handler,具体含义如下
```go
type Handler func(Event) error
// Event is given to a subscription handler for processing
type Event interface {
Topic() string
Message() *Message
Ack() error
}
type Message struct {
Header map[string]string
Body []byte
}
```
## 通过broker.Publish实现发布
举例如下
```go
// 建立连接
b := grpc.NewBroker()
b.Init()
b.Connect()
// 订阅事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()
// 发布事件
b.Publish("events", &broker.Message{
Headers: map[string]string{"type": "event"},
Body: []byte(`an event`),
})
```
## 通过micro publish实现发布
举例如下
```shell
micro publish "go.micro.web.config" "hello"
```
# 实战和代码
## 效果
下载代码`broker.zip` 解压到`techidea8.com/microapp/broker`下运行,效果图忑
![效果](https://www.techidea8.com/wp-content/uploads/2019/08/2019082910032374.png)
+ 发布消息需要注意json格式字符串
```shell
micro publish go.micro.srv.broker "{\"say\":\"这是测试消息\"}"
```
## 获得代码
关注公众号`betaidea`回复`micro-broker`即可获得
![公众号betaidea](https://www.techidea8.com/wp-content/uploads/2019/08/2019082910032449.png)
## 推荐阅读
[扫微信二维码实现网站登陆提供体验地址和源代码](https://www.techidea8.com/archives/46)
[开源项目golang go语言后台管理框架restgo-admin](https://www.techidea8.com/archives/34)
[支持手势触摸,可左右滑动的日历插件](https://www.techidea8.com/archives/22)
[你必须知道的18个互联网业务模型](https://www.techidea8.com/archives/31)
有疑问加站长微信联系(非本文作者))