在上一篇文章中,我们介绍了Gmqtt的基本特性以及钩子函数的基本使用方法。本篇我们来详细介绍Gmqtt的插件机制,以及如何编写插件。
前文回顾Gmqtt——Go语言实现的MQTT broker
项目地址:https://github.com/DrmagicE/gmqtt
session的生命周期
根据MQTT协议规范,每个客户端连接都会有一个与之对应的session
,客户端可以指定该session
是否需要持久化。对于一个持久化session
,即使客户端离线,broker也会为其保留订阅信息以及与其订阅匹配的消息,当客户端重新上线后,broker会把这些消息投递给客户端。这使得在网络频繁闪断的环境下,也不会丢失消息。Gmqtt提供了钩子函数注入到session的生命周期中,使得插件可以管理session
生命周期的变化,涉及生命周期的钩子函数有:
- OnBasicAuth——收到
CONNECT
报文后调用,用作基本鉴权。 - OnEnhancedAuth——增强型鉴权(针对V5协议的支持)
- OnConnected——客户端连接成功
- OnSessionCreated——客户端新建session成功
- OnSessionResumed——客户端从session中恢复
- OnSessionTerminated——session终止
- OnClosed——客户端断开
下面我们以基本鉴权为例,图解这些钩子函数在整个session生命周期中的位置。
session建立
session删除
非持久化session
会在连接关闭时删除,而对于持久化session
,一般会设置一个超时时间,当session
超时后,会被删除。
对于V3.1.1协议,客户端通过CONNECT
报文的cleanSession
字段来控制session
是否需要持久化;
而在V5协议中,cleanSession
字段更名成为了cleanStart
字段,客户端通过设置超时时间来控制期望的session
的保留时间。
session
的删除过程如下图所示:
主题订阅/取消订阅流程
主题订阅和取消流程,涉及以下几个钩子函数:
- OnSubscribe——常用钩子函数之一,当收到
SUBSCRIBE
报文后触发,可用作权限控制,主题改写等功能。 - OnSubscribed——当成功订阅一个主题后触发。
- OnUnsubscribe——当收到
UNSUBSCRIBE
报文时触发。可用作权限控制,主题改写等功能。 - OnUnsubscribed——当取消订阅一个主题后触发。
订阅流程
取消订阅流程
消息发布流程
消息发布流程涉及以下几个钩子函数:
- OnMsgArrived——常用钩子函数之一,当收到
PUBLISH
报文后触发,可用作权限控制,消息改写等功能。 - OnDelivered——当消息投递到客户端后触发。(这个投递成功是从broker的角度,并不保证客户端一定收到)
- OnMsgDropped——当消息被丢弃时触发。(可能由于队列满,消息超时等原因)
wrapper
模式
wrapper
模式,也叫做包装器或装饰器模式,是Go语言中非常流行的一种设计模式,常用于实现各样的middleware
中间件。例如下面这个简易的打印日志HTTP中间件:
type HTTPWrapper func(h http.HandlerFunc) http.HandlerFunc
// LogMiddleware 在请求前后打印日志。
func LogMiddleware(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
log.Println("开始处理请求")
h(w, req)
log.Println("请求处理完毕")
}
}
func main() {
var hdl http.HandlerFunc = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
}
http.HandleFunc("/", LogMiddleware(hdl))
http.ListenAndServe(":8080", nil)
}
以上程序的示例输出:
2020/12/20 15:17:03 开始处理请求
2020/12/20 15:17:03 处理请求
2020/12/20 15:17:03 请求处理完毕
在Gmqtt中,所有的钩子函数都有其对应的wrapper
函数。插件需要声明本插件需要使用的wrapper
函数,以鉴权插件为例:
https://github.com/DrmagicE/g...
// HookWrapper 返回Auth插件需要关心的wrapper函数
func (a *Auth) HookWrapper() server.HookWrapper {
return server.HookWrapper{
// Auth 鉴权插件只关心 OnBasicAuthWrapper
OnBasicAuthWrapper: a.OnBasicAuthWrapper,
}
}
func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
// 处理前一个插件的OnBasicAuth逻辑
err = pre(ctx, client, req)
if err != nil {
return err
}
// ... 处理本插件的鉴权逻辑
}
}
一个钩子函数可以被多个插件使用。插件利用wrapper
模式,对钩子函数进行层层包装,最终将一个包装好的钩子函数注入到Gmqtt对应的挂载点上。例如在上述的代码示例中,Auth
插件“包装了”上一个插件的钩子函数(pre server.OnBasicAuth
),Auth
插件选择先执行前一个插件的钩子函数,如果前一个插件返回失败,那么就直接返回失败,跳过本插件的鉴权逻辑。Auth
插件也可以选择先执行本插件的鉴权逻辑:
func (a *Auth) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
// ... 处理本插件的鉴权逻辑
// 如果鉴权失败,则返回拒绝连接
// 如果校验通过,再根据前一个插件的鉴权结果决定是否允许连接
return pre(ctx, client, req)
}
}
可以注意到,在这个例子中Auth
掌握着前一个插件对应钩子函数的控制权,Auth
可以自由的在前一个插件的OnBasicAuth
执行前后注入任何逻辑。也就是说,虽然一个钩子函数可以同时被多个插件所使用,但是他们还是有主次之别的,这跟插件的加载顺序息息相关。
插件的加载顺序
插件的加载顺序受配置文件中plugin_order
的控制:
# plugin loading orders
plugin_order:
- auth
- prometheus
- admin
plugin_order
保存的是插件的名称,数组顺序表示的就是插件的加载顺序。在Gmqtt中,越先加载的插件拥有越大的控制权。例如我们有A
,B
,C
三个插件,都使用OnBasicAuth
钩子函数,他们的加载顺序为A->B->C
。 那么,在A
插件的wrapper
函数OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth
中, pre
包装了B
和C
两个插件的OnBasicAuth
实现。B
插件的wrapper
包含了插件C
的OnBasicAuth
实现,而C
中的wrapper
只包含一个由Gmqtt指定的默认OnBasicAuth
实现,A
,B
,C
对OnBasicAuth
的层层包装的关系如下图所示:
借助于wrapper
模式,开发者可以通过多个组合多个插件来完成一系列控制。
如何编写插件
只要实现了Gmqtt的server.Plugin
接口,就是一个Gmqtt的插件。为了简化插件开发,Gmqtt提供了插件模板生成工具,通过命令行可以快速的生成插件模板,令开发者可以更专注于业务实现。
使用gmqctl
命令行工具
安装命令行工具:
$ go install github.com/DrmagicE/gmqtt/cmd/gmqctl
目前,gmqctl
还只有生成插件模板这一个功能,可以通过gmqctl gen plugin --help
查看基本使用方法:
$ gmqctl gen plugin --help
code generator
Usage:
gmqctl gen plugin [flags]
Examples:
The following command will generate a code template for the 'awesome' plugin, which makes use of OnBasicAuth and OnSubscribe hook and enables the configuration in ./plugins directory.
gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true -o ./plugins
Flags:
-c, --config Whether the plugin needs a configuration.
-h, --help help for plugin
-H, --hooks string The hooks use by the plugin, multiple hooks are separated by ','
-n, --name string The plugin name.
-o, --output string The output directory.
我们在Gmqtt的项目根目录下运行:
gmqctl gen plugin -n awesome -H OnBasicAuth,OnSubscribe -c true
上述命令会在plugin
目录下生成如下几个文件:
$ tree ./plugin/awesome
./plugin/awesome
├── awesome.go
├── config.go # 编写配置项相关逻辑
└── hooks.go # 编写钩子函数相关逻辑
我们逐个文件来分析,首先是awesome.go
:
package awesome
import (
"go.uber.org/zap"
"github.com/DrmagicE/gmqtt/config"
"github.com/DrmagicE/gmqtt/server"
)
var _ server.Plugin = (*Awesome)(nil)
const Name = "awesome"
func init() {
// 注册本插件的构造函数。
server.RegisterPlugin(Name, New)
// 由于我们指定了-c true,表示本插件需要配置项。
// 这里注册默认的配置项,当配置文件配置缺省时,启用默认配置。
config.RegisterDefaultPluginConfig(Name, &DefaultConfig)
}
// New 是本插件的构造函数
func New(config config.Config) (server.Plugin, error) {
panic("implement me")
}
var log *zap.Logger
// 实现Plugin接口的结构体。
type Awesome struct {
}
// Load 由Gmqtt按插件的导入顺序,依次执行。
// Load主要的作用就是把server.Server接口传递给插件。
func (a *Awesome) Load(service server.Server) error {
log = server.LoggerWithField(zap.String("plugin", Name))
panic("implement me")
}
// Unload 当broker退出时调用,可以做一些清理操作。
func (a *Awesome) Unload() error {
panic("implement me")
}
func (a *Awesome) Name() string {
return Name
}
然后是config.go
:
package awesome
// Config is the configuration for the awesome plugin.
type Config struct {
// 在这里,添加你需要的配置
}
// Validate validates the configuration, and return an error if it is invalid.
func (c *Config) Validate() error {
// Validate方法用于校验配置是否合法。
// Gmqtt会在导入配置阶段,执行每个插件的Validate方法,
// 如果校验失败,则报错并停止启动。
panic("implement me")
}
// DefaultConfig is the default configuration.
var DefaultConfig = Config{
// 这里定义默认配置,当配置文件缺省时,使用默认配置。
}
func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
// Gmqtt使用yaml作为配置文件,为了实现动态插拔,每个插件都要自定义自己的yaml解析逻辑。
// 具体实现方式,可以参考其他内置插件。
panic("implement me")
}
最后是hook.go
:
package awesome
import (
"github.com/DrmagicE/gmqtt/server"
)
func (a *Awesome) HookWrapper() server.HookWrapper {
return server.HookWrapper{
OnBasicAuthWrapper: a.OnBasicAuthWrapper,
OnSubscribeWrapper: a.OnSubscribeWrapper,
}
}
// 在刚才的命令中,我们声明要使用OnBasicAuth和OnSubscribe两个钩子函数,
// gmqctl已自动生成了模板,开发者自行填入业务逻辑即可。
func (a *Awesome) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
panic("impermanent me")
}
func (a *Awesome) OnSubscribeWrapper(pre server.OnSubscribe) server.OnSubscribe {
panic("impermanent me")
}
import插件并重新编译
新增的插件需要重新编译才能使用。Gmqtt统一的插件import文件: cmd/gmqttd/plugins.go:
package main
import (
// 在这里import所有的插件(为了调用对应的init方法)
_ "github.com/DrmagicE/gmqtt/plugin/admin"
_ "github.com/DrmagicE/gmqtt/plugin/auth"
_ "github.com/DrmagicE/gmqtt/plugin/prometheus"
_ "path/to/your/plugin"
)
修改启动顺序
上文提到,插件的启动顺序收配置文件控制,只有在plugin_order
中添加的插件才会被加载:
# plugin loading orders
plugin_order:
- auth
- prometheus
- admin
- your_plugin
插件配置修改
如果插件声明了使用配置,Gmqtt会从配置文件中为其加载配置,插件的配置存放在配置文件中的plugin.插件名
空间下:
plugins:
prometheus:
path: "/metrics"
listen_address: ":8082"
admin:
http:
enable: true
addr: :8083
grpc:
addr: 8084
auth:
# Password hash type. (plain | md5 | sha256 | bcrypt)
# Default to MD5.
hash: md5
# The file to store password. Default to $HOME/gmqtt_password.yml
# password_file:
至此,一个新的插件就开发完成了。大伙可以参考内置插件,以及项目的example目录获取更多示例。
如果你对本项目感兴趣,欢迎start支持,留言交流。
有疑问加站长微信联系(非本文作者)