1. MQTT简介
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:
- 至多一次:
消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。 - 至少一次:
确保消息送达订阅者,但消息可能重复,适用于幂等性操作。 - 只有一次:
最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。
MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。
MQTT有一个底层覆盖网:它将建立客户端到服务器的连接,提供两者之间有序、无损、基于字节流的双向传输。当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。
客户端与消息总线建立连接后就是一个会话(Session),它们之间有周期性的状态交互,且可跨越多个连续的网络连接。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:
- Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
- payload,可以理解为消息的内容,是指订阅者具体要使用的内容。
订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:
- connect:客户端建立与服务器的连接
- disconnect:等待客户端完成工作后,端口与总线的会话
- subscribe:客户端向消息总线注册订阅主题
- unsubscribe:客户端等待消息总线取消所注册的订阅
- publish:客户端向消息总线发送某主题的消息
2. 消息总线EMQX
EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,官网地址在此,配套有丰富的文档和使用说明。我们可以使用docker来快速部署,官网也有简洁明了的安装说明,这里不再冗述。
docker部署后,绑定了1883端口和18083端口。其中1883端口是服务总线对外提供服务的端口,客户端通过该端口与消息总线通讯。18083端口是网页版后台监控端口,打开localhost:18083,默认的管理员名为Admin,密码为public。监控界面可以看到所有连接到总线的客户节点、注册的主题、传递消息的统计数据等都一目了然,还提供了测试工具,可以测试连接消息总线并订阅主题,也可向总线发送主题消息测试客户端是否能收到订阅内容。
Docker部署后,系统已具备基本使用条件。关于如何使用插件、配置安全认证等,在官方文档有详细步骤说明,可按图索翼深入研究,这里不再展开,以后有需要再行探索。
3. 客户端Paho
Paho是Eclipse基金会支持的MQTT客户端开源实现,官网地址在此。其提供了如下语言库:java、Python、JavaScript、GoLang、C/C++/C#、Rust、Android Service等。
3.1 最简搭建
这里,我选择用Python基于Paho快速搭建一个最简单的MQTT客户端。
- 安装Paho模块
先利用virtualenv创建python虚拟环境,在该虚拟环境下使用pip安装Paho:
pip install paho-mqtt
- 编写一个publish客户端:publish.py
import paho.mqtt.client as mqtt
client=mqtt.Client()
client.connect('127.0.0.1',1883,600)
client.loop_start()
while True:
topic=input('Enter the topic name: ')
message=input('Enter the message to send: ')
client.publish(topic,payload=message,qos=0)
- 编写一个subscribe客户端:subscribe.py
import paho.mqtt.client as mqtt
def on_message(client,userdata,msg):
print(msg.topic+" "+str(msg.payload))
client=mqtt.Client()
client.on_message=on_message
client.connect('127.0.0.1',1883,600)
topic=input('Enter the topic you want to subscribe: ')
client.subscribe(topic,qos=0)
client.loop_forever()
先运行subscribe.py,进行持续监听。再另起一个终端,进入虚拟环境,并运行publish.py,输入对应的主题和消息内容,之前的客户端就能收到消息了。
也可以到EMQX的监控后台中,通过测试工具发送消息和订阅消息,也可在监控视图中查看到当前连接的客户端、消息统计数据等。
3.2 常用方法
Paho for python的官方使用手册在此,其中列出了详细说明。此处列出其中的常用方法,并简要说明。
- username_pw_set(username, password=None)
在 connect()之前设置client的用户名和密码,依据MQTT配置的mqtt_acl与mqtt_user表中的ACL规则与用户信息进行用户验证。若MQTT开启了ACL验证,就必须登录验证。 - connect( host, port=1883, keepalive=60 bind_address="" )
以阻塞的方式进行连接
host:消息总线的主机名或IP
port:端口
keepalive:无通信的最长时间(秒), 如果没有交换其他消息,则控制客户端向总线发送ping的速率
bind_address:多个接口绑定本地ip地址 - connect_async( host, port=1883, keepalive=60, bind_address="" )
以异步非阻塞的方式进行连接 - disconnect()
断开连接,将不会等待所有排队的消息被发送。如果想确保所有消息在断开连接前都已发送,则需要使用wait_for_publish()函数来发送消息,使用方法同publis()函数一致。 - publish(topic, payload=None, qos=0, retain=False)
向客户端代理发送消息
topic:消息的主题
payload:消息内容,字节流形式,若想发送int/float等类型数据,可通过struct.pack()方法来创建payload,并在接收端通过struct.unpack()方法恢复数据,关于struct的使用方法见文末。
qos:服务质量(0,1,2分别对应本文开头所说的三种质量类型)
retain:若设置为True,则该条消息为保留消息,消息总线会保存每个Topic的最后一条保留消息及其QoS,当订阅该Topic的客户新上线,则总线会将该消息递送给它。类似于微信公众号新关注者收到一条默认消息。删除该保留消息的方法是发送空消息体的保留消息或发送最新的保留消息进行覆盖。 - subscribe( topic, qos=0 )
订阅topic,可以使用元组数组的方式订阅多个主题:subscribe([("my/topic1",0),("my/topic2",0)...]) - unsubscribe(topic)
取消订阅 - loop( timeout=1.0, max_packets=1 )
定期调用网络处理
while True:
client.loop()
- loop_start() / loop_stop()
实现网络循环的线程接口, 可以控制线程的启动和结束。loop_start()可以在connect()之前或之后调用,调用后会启动一个后台线程,自动执行loop函数,从而将主线程从上文的while True死循环中释放出来。
client.connect("mqtt.eclipse.org")
client.loop_start()
while True:
temperature = sensor.blocking_read()
client.publish("paho/temperature", temperature)
- loop_forever()
网络循环的阻塞形式,在调用 disconnect() 之前不会停止。 - on_connect(client, userdata, flags, rc)
回调函数,当总线响应我们连接请求时被调用。这是一个被@property和@on_connect.setter修饰的访问器。
client:回调该函数的client实例
userdata:在 Client()\user_data_set()中设置的私有用户数据
flags:总线的响应标志,是一个 dict 字典,flags['session present'],当客户端重新连接到它先前已连接的总线时,此标志指示总线是否仍然具有该客户端的会话信息。 如果为1,则会话仍然存在。
rc:连接状况
0:连接成功
1:连接拒绝-不正确的协议版本
2:连接拒绝-无效客户标识符
3:连接拒绝-服务器不可用
4:连接拒绝-错误用户名或密码
5:连接拒绝-未授权
6-255:当前未使用。
def on_connect(client, userdata, flags, rc):
print("Connection returned result: "+connack_string(rc))
mqttc.on_connect = on_connect
...
- on_disconnect(client, userdata, rc)
当客户端断开连接时使用
rc:断开状态, 如果是 0 , 则是调用 disconnect()断开的, 如果是其他任何值, 则表示意外断开
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
client.on_disconnect = on_disconnect
-
on_message(client, userdata, message)
当在订阅的主题上收到了消息, 并且与现在的主题筛选器回调不匹配时使用。使用message_callback_add()可以定义一个特殊主题筛选器的回调函数,on_message()在筛选器无匹配时被调用。
message 是一个 MQTTMessage实例, 它的属性有topic, payload,qos,retain。
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
client.on_message = on_message
-
message_callback_add(sub, callback)
定义具有特殊主题筛选器的回调函数,可以使用通配符匹配多个主题,比如sensors/#可以匹配sensors/temperature和sensors/humidity。
sub:过滤器,一个过滤器字符串只能对应一个callback。若能匹配上,则on_message不会再被调用。若多个匹配器匹配,则这多个匹配器对应的callback都会被调用。
callback:与on_message函数格式一致。 - message_callback_remove(sub)
移除过滤器 - on_publish(client, userdata, mid)
发布的回调函数 - on_subscribe(client, userdata, mid, granted_qos)
订阅的回调函数 - on_unsubscribe(client, userdata, mid)
取消订阅的回调函数 - on_log(client, userdata, level, buf)
客户端有日志信息时调用, 能与Python Log同时使用
level等级包括:MQTT_LOG_INFO, MQTT_LOG_NOTICE, MQTT_LOG_WARNING, MQTT_LOG_ERR, MQTT_LOG_DEBUG
4. 附录:struct用法
python中struct是用来处理结构数据的,可将结构数据转换为字节流,再将字节流恢复为结构数据。在转换过程中,需要使用一个格式化字符串,该字符串中的格式需与所处理的结构数据格式一一对应。
- struct.pack(format,v1,v2,...)
- struct.unpack(format,string)
其中格式字符串由一个或多个格式字符组成,格式字符参照如下:
字节流存在大小端问题,在格式字符串首位,有一个可选字符来决定是大端还是小端,参考如下。默认为@,即使用本机字符顺序。
这里有个示例:
import struct
buffer1 = struct.pack( "ihb" , 1 , 2 , 3 )
buffer2 = struct.unpack( "ihb" , buffer )
data = [ 1 , 2 , 3 ]
buffer3 = struct.pack( "!ihb" , *data)
buffer4 = struct.unpack( "!ihb" , buffer3 )
有疑问加站长微信联系(非本文作者)