首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Web技术(七):如何使用并实现MQTT 消息订阅-发布模型?

  • 24-03-03 17:01
  • 3373
  • 9996
blog.csdn.net

文章目录

  • 一、什么是发布-订阅消息模型?
  • 二、订阅-发布消息模型有哪些应用?
    • 2.1 应用于IP 物联网络中的消息传递
    • 2.2 应用于操作系统进程间的消息传递
    • 2.3 应用于MESH 自组网中的消息传递
  • 三、MQTT 如何实现订阅-发布消息模型?
    • 3.1 如何在本机实践MQTT 通信并抓包分析?
    • 3.2 MQTT 报文格式是怎样的?
    • 3.3 MQTT 如何管理连接?
    • 3.4 MQTT 如何传递消息?
  • 更多文章:

HTTP 超文本传输协议最开始是为人们互相分享HTML 超文本文件而设计的,使用Client / Server 模型交互数据,Client 向特定Server 请求获取或者提交网页数据(HTTP 的Get、Post、Put、Delete 方法),后来的HTTP/2 和QUIC 也为该目的进行了大量优化,大幅提高了有效数据的吞吐率。

到了物联网时代,需要接入网络的设备规模大幅增加,为了让物联网设备组成智能服务网络,资源受限的嵌入式物联网设备之间也需要相互通信,设备与设备之间并不需要传输像HTML 这么大的数据量,只需要传输指令、状态等少量数据。设备与设备之间的连接通信比人与设备的连接通信更加复杂,而且下发指令上传状态需要异步的双向通信协议,HTTP 就难以胜任这种应用场景了,该为物联网应用场景设计怎样的通信模型或协议呢?

一、什么是发布-订阅消息模型?

首先,我们考虑一个物联网设备既可以向某些设备发布消息(这里的消息可以是控制指令或状态数据),也可以从某几个设备中接收订阅的消息,如果消息的订阅者和发布者比较多,它们之间的耦合关系就比较复杂,后续再想增减变更订阅者或发布者会比较麻烦,HTTP 这种传统的Request / Response 模型就显得难以胜任了,我们该采用怎样的通信模型呢?

如果你熟悉Web 开发,回顾下MVVM(Model-View-ViewModel) 框架引入ViewModel 实现了View 与Model 的完全解耦,让View 和Model 的开发相互独立且更简单。如果你熟悉Linux 驱动开发,回顾下总线设备驱动模型,引入platform_bus 实现driver 和 device 的完全解耦,不仅简化了驱动开发而且大幅减少了无意义的冗余代码。

我们可以借鉴这个思路,为消息的订阅者Subscriber 和发布者Publisher 增设一个专门用来处理转发消息的代理者Broker,这就实现了订阅者和发布者的完全解耦,可以胜任比较复杂的物联网通信场景。目前比较流行的,采用订阅发布模型的物联网通信协议是MQTT,该协议的通信模型如下:
MQTT 订阅发布模型
上图中有三个角色:

  • Publisher(Client):消息发布者连接到消息代理Broker,并向其发布指定主题topic 的应用消息,每个消息都附加一个主题Topic(属于URL,可供其它设备订阅),发布者可以是传感器或开关设备等;
  • Subscriber(Client):消息订阅者连接到消息代理Broker,并向代理Broker 订阅感兴趣的主题topic,当Broker 收到订阅主题的消息后会将其分发给该topic 的订阅者,订阅者可以是执行器或显示器等;
  • Broker(Server):消息代理者同时连接订阅者和发布者,管理该网络中每个主题topic 的订阅列表,接收Publisher 发送过来的特定主题的消息,根据该消息的topic 查找其订阅列表,并将该消息发送给订阅列表中的所有Subscriber。

一个物联网设备可能同时支持向其它设备发送控制指令,且从其它设备接收状态数据,也即该设备同时承担Publisher 和Subscriber 两个角色。如果网络规模比较大,MQTT 代理Broker 需要处理的数据量比较大,可以将其放到服务端集中处理消息的接收和转发,因此Broker 可称为Server,相应的Publisher 和Subscriber 则可称为Client。

可以类比微信订阅号,每个订阅号都是一个topic,每篇文章都是一个消息,订阅号的作者是Publisher,订阅号的订阅者是Subscriber,微信后端服务器是Broker,作者在其订阅号发布一篇文章,微信服务端Broker 会将其分发给所有订阅该号的读者,作者和读者都是微信的客户。

消息发布-订阅模型中的Publisher 相当于消息的生产者,Subscriber 相当于消息的消费者,Broker 作为消息的存储分发者有点类似于总线的作用,因此这种消息发布-订阅模型也可称为消息总线,是一种很常用的消息传递模型。说到消息传递,我们很容易想到消息队列,也即消息的队列模型。消息的发布-订阅模型是为了满足更复杂的消息传递需求,在此基础上扩展而来的,二者的主要区别如下:

  • 消息的队列模型:队列这种先进先出的数据结构作为存放消息的容器(消息进出按照既定的顺序),它允许多个生产者往同一个队列发送消息(也即从队尾入队),但如果有多个消费者,某个消息只能被其中一个消费者接收(也即从队头出队),多个消费者之间是相互竞争关系。队列模型的应用比较广泛,比如任务阻塞队列、数据缓冲队列、线程安全的并发队列、线程或进程间通信的消息队列等;
    消息的队列模型
  • 消息的订阅-发布模型:存放消息的容器变成了一个个主题topic 的集合,每个topic 相当于一个消息队列,订阅者在接收消息之前需要先订阅一系列感兴趣的主题(每个topic 都生成并维护一个订阅者列表),当发布者向某个topic 发布消息后,消息代理broker 会将该消息分发给该主题的所有订阅者,同一topic 的订阅者接收到的消息是完全一样的,也即一份消息数据可以被多次消费。发布-订阅模型的应用也比较广泛,比如大量物联网设备和物联网云平台之间的消息传递、MESH 网络中各节点之间的消息传递、进程或服务之间的复杂消息传递等。
    消息的发布-订阅模型

二、订阅-发布消息模型有哪些应用?

消息的发布-订阅模型相比队列模型可以实现更复杂通信网络中消息的传递分发,该模型最大的特点就是借助Broker 实现Publisher 和Subscriber 的完全解耦,可以很方便的扩展通信网络中的节点数量。比如用于大规模物联网设备间的消息传递、MESH 自组网设备间的消息传递、嵌入式设备内进程间的消息传递等,下面逐个简单介绍。

2.1 应用于IP 物联网络中的消息传递

通过TCP/IP 协议接入互联网的嵌入式设备可以直接远程访问性能强大的服务器(现在兴起的云计算或云服务平台相当于是将服务器集群整合起来对外提供统一的计算资源或服务),消息发布-订阅模型中的Broker 需要强大的性能来传递分发大规模物联网设备之间的消息,这些云平台正好可以担任MQTT Broker 的角色,物联网设备则作为MQTT Client 可以订阅或发布特定主题的消息。

物联网设备的规模比先前移动互联网设备的规模大得多,各大互联网公司都为物联网设备的接入、管理、消息分发、数据分析等提供了相应的云服务平台(比如国内的阿里云、百度云、中国移动OneNET 等),借助IoT Cloud 和MQTT 管理物联网设备间消息传递分发的模型如下:
MQTT 用于IP网络的消息传递协议
上图左边就是作为MQTT Client 的各种物联网设备(MQTT-SN 是为Sensor Network 设计的运行在UDP 协议之上的MQTT 版本),可以订阅或发布特定主题的消息。中间是IoT Cloud,主要提供设备接入、消息分发(MQTT Broker)、数据管理等服务,为了方便开发者将物联网设备接入IoT Cloud,这些云服务商也提供了相应的SDK,将其整合进我们开发的工程代码中,配置好设备接入和身份认证所需的信息,就可以让物联网设备通过TCP/IP 接入IoT 云平台。我们可以通过PC 或手机登录IoT 云平台账户,管理我们接入的设备、构建我们的监控网络、统计分析监测数据等。

2.2 应用于操作系统进程间的消息传递

操作系统进程间的消息传递通常有消息队列、共享内存、命名管道、Socket 通信等,这几种进程间通信方式对比如下:

  • 共享内存:使用共享内存传递消息需要使用互斥锁或读写锁,保证多个进程对共享内存的互斥访问。如果有访问顺序要求,还需要使用信号量保证多个进程的有序访问;
  • 消息队列:消息队列出队入队是在临界区内进行的,因此是线程安全的,使用起来比较简单。但消息队列入队出队采用内存复制的方法,如果数据量较大会因内存复制降低数据吞吐率;
  • Socket 通信:Socket 通信需要TCP/IP 协议栈支持,还需要自己管理连接、对数据进行分割重组等,使用起来有点麻烦。Socket 通信不仅可以用于本主机内多进程间的通信,还可以跨主机、跨语言通信,比如RPC(Remote Procedure Call) 是消息队列和共享内存无法胜任的。

Socket 通信虽然可以跨主机进行RPC,但消息传递也是基于队列模型的。如果我们开发的嵌入式系统多个进程间的数据传递比较复杂,采用队列模型耦合度较高,就可以采用发布-订阅模型,让消息的生产者/发布者和消费者/订阅者完全解耦,专门有一个进程负责消息传递分发(也即MQTT Broker / 消息总线),可以很方便的支撑多进程间的复杂消息传递,通信模型如下:
嵌入式系统内多进程消息发布-订阅
在嵌入式系统内运行一个MQTT Broker 服务进程,其它需要相互传递消息的进程都只需要跟MQTT Broker 进程通信就可以了,MQTT Broker 起到了消息总线的作用,负责各进程间消息的传递分发。比如进程1 订阅主题topic1,进程2 订阅主题topic2,如果进程2 想向进程1 发送消息,只需要将消息发布到主题topic1(消息附上发布主题topic1 后发送给消息总线进程MQTT Broker),MQTT Broker 接收到该消息并将其传递给订阅该主题topic1 的进程1。

举个例子,假如需要开发一个采集大气中PM2.5、PM10 等污染物参数的环境监测系统,需要实现数据记录、UI 监控界面等功能。大气污染物数据采样硬件模块是第三方提供的,我们只需要通过该模块提供的串口协议去控制采样设备、接收采样数据即可。我们创建采样控制进程负责与采样模块的交互,UI 监控和数据记录设计成独立的进程,降低系统的复杂性,通信模型如下:
嵌入式系统多进程间消息发布-订阅示例
需要向外发送消息的进程都需要设定发布主题topic,比如采样控制进程发布主题topic_PM、UI 进程发布主题topic_UI,数据库进程和UI 进程都需要接收采样数据,只需要向MQTT Broker 进程订阅主题topic_PM 即可,UI 进程需要向采样控制进程发送指令数据,只需要让采样控制进程向MQTT Broker 进程订阅主题topic_UI 即可,UI 进程向主题topic_UI 发布指令消息,采用控制进程由于订阅了主题topic_UI 就可以顺利接收到该指令消息。

这个发布-订阅消息模型最大的好处就是解耦了消息的生产者与消费者后,可扩展性很强。假如后续需要增加一个新功能,采集大气中NO、SO2 等气体参数,只需要再增加一个第三方气体模块,创建一个气体参数进程负责跟气体模块的交互,采集的气体数据附上主题名topic_gas 发送给MQTT Broker 消息总线即可。数据库进程和UI 进程只需要新增一个订阅主题topic_gas,气体参数进程新增一个订阅主题topic_UI,即可完成新功能的添加。
嵌入式系统多进程间消息发布-订阅示例扩展
发布-订阅消息模型让消息生产者和消费者完全解耦,需要进行消息交互的进程只需要跟消息总线MQTT Broker 交互即可,这些进程间相互独立,可以由不同的人员并行开发,而且调试也更加方便。如果对通信安全要求较高,还可以配置哪些进程可接入消息总线MQTT Broker,拒绝无关进程连接上消息总线。

上面开发的环境监测设备大概率是需要接入IoT 云平台的,大量监测设备采集的数据汇总起来才更有价值,本地设备如何把数据上传到 IoT Cloud,并从云端接收控制指令呢?

只需要再创建一个进程MQTT Bridge 专门处理本地消息总线MQTT Broker 和 IoT Cloud 消息传递协议MQTT Broker 的数据交互即可,MQTT Bridge 进程的消息发布主题为topic_local,向 IoT Cloud MQTT Broker 订阅主题topic_cloud,就可以接收云端发来的指令消息;IoT Cloud 订阅主题topic_local,就可以接收本地MQTT Bridge 进程上传的采样数据。通信模型如下:
嵌入式系统内多进程消息发布-订阅扩展

2.3 应用于MESH 自组网中的消息传递

物联网时代,除了IP 组网之外,MESH 自组网也越来越流行。发布-订阅消息模型很适合应用于扩展性强的网络通信模型中,MESH 自组网也需要比较方便的新增、移除、变更通信节点,所以也可以采用发布-订阅模型来传递分发消息。

以BLE MESH 为例,MESH 组网与IP 组网的主要区别如下:

  • IP 组网:将物联网设备都接入IoT Cloud,构成中心化的星型网络,中心节点IoT Cloud 的处理能力很强,可以负责整个网络消息的传递分发;
  • MESH 组网:BLE MESH 组成的网络虽然有中继节点Relay node 转发消息,但各节点的地位差不多,构成的是网状网络,并没有性能强大的中心节点。

BLE MESH 组网缺少性能强大的中心节点,如何采用发布-订阅模型来传递分发消息呢?由哪些节点承担Broker 的角色呢?
BLE MESH 网络拓扑结构
BLE MESH 是基于广播通信的,每个节点向外广播经过加密的广播报文,中继节点Relay node 的作用是让整个MESH 网络中的节点都可以接收到广播报文(单个BLE 设备的广播距离有限)。

既然BLE MESH 是向全网广播的,并不具备消息路由的功能,自然不需要Broker 这个角色。每个节点node 包含几个元素element,每个元素包含几个模型model,每个模型都有一个发布地址(类比前面介绍的主题topic,这个发布地址也是全网唯一的),所以每个model 都可以向全网广播包含自己发布地址的消息报文。该MESH 网络中的其它节点,可以订阅自己感兴趣的发布地址,若接收到的消息报文中包含的发布地址正好是自己订阅的发布地址,则接收并处理该消息报文。

举个例子,下图展示了智能家居照明场景中的Switch node 和Light node 之间的BLE MESH 网络,Hallway门厅的两个灯节点订阅门厅开关节点的发布地址,当Hallway 开关发布开关指令消息后,订阅其发布地址的门厅灯节点会接收到开关指令并执行相应的动作,该场景的通信模型如下:
MESH 网络中消息的订阅与发布示例

三、MQTT 如何实现订阅-发布消息模型?

MQTT(Message Queuing Telemetry Transport) 最初由 IBM 于 20 世纪 90 年代末设计和开发,当时主要是为了解决如何远程监控输油管道泄露或损坏的问题。由此可见,MQTT 是为大量硬件性能低下的远程设备、在网络状态比较糟糕的环境中正常工作而设计的,支持各方异步通信的消息传递协议。这个异步消息传递协议采用发布-订阅消息模型将消息的发布者和订阅者完全解耦,可以方便物联网设备(也即消息发布者或订阅者)在不可靠的网络环境中扩展规模。

MQTT 跟HTTP 同属于应用层协议,工作在TCP/IP 协议簇之上(通常工作在TCP 协议之上,MQTT-SN 版本则工作在UDP 协议之上),若对网络安全要求较高,也可以在MQTT 与TCP 协议之间增加TLS 协议,借助TLS 实现设备身份认证和数据通信安全(运行在TLS 协议之上的MQTT 协议可称为MQTTS 协议,MQTT port 为1883,MQTTS port 为8883):
MQTT 协议层级
MQTT 的主要目标是最大限度的减少设备和网络资源占用,因此相比HTTP 协议算是轻量级协议,MQTT 的报文比HTTP 更短小。为了在资源受限的嵌入式设备上使用MQTT 协议,可以选用轻量级的TCP/IP 协议簇比如 lwip,轻量级的TLS 协议比如mbedTLS 等。

MQTT 协议在2014 年末发布了MQTT v3.1.1 版本 ,从该版本成为OASIS 开放标准,在2018 年发布了MQTT v5.0 版本,新增了会话/消息延时、原因码、主题别名、in-flight流控、属性、共享订阅、用于增强认证的AUTH报文等功能,目前MQTT 已经成为各大物联网平台(比如Amazon AWS、Microsoft Azure、Ali IoT Kit 等)的主流消息传递协议。

MQTT 分为Broker(Server) 和Client 两部分,MQTT Broker 一般运行在云端服务器中,MQTT Client 则运行在我们开发的物联网设备中。第一个开源的MQTT Broker 是发布于2008年的Mosquitto,在2014年成为Eclipse Mosquitto project。2012年,Eclipse Paho project 为C/C++、Java、JavaScript、Python 等各种编程语言提供了开源的MQTT Client 库,自此MQTT Client 的实现列表不算增多(MQTT Broker 和Client 的主要实现版本可参阅https://mqtt.org/software/或)。
MQTT 不同的实现版本对比

3.1 如何在本机实践MQTT 通信并抓包分析?

学习一种通信协议,最快的方式是通信实践和抓包分析,本文以Mosquitto 为例展示MQTT 通信过程,下篇文章使用Paho MQTT Client 让物联网设备接入 IoT Cloud 实现远程监控。

Mosquitto 是跨平台的,既可以安装在linux 上,也可以安装在windows 上,考虑到MQTT Broker 一般运行在linux 系统中,本文选择在ubuntu 上安装Mosquitto。如果不喜欢命令行交互方式,也可以安装使用MQTT.fx Desktop Client 菜单式交互工具。Ubuntu 安装mosquitto 的命令如下:

# 添加mosquitto 仓库,以便安装最新版本(若不添加该仓库则安装的为旧版本)
sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa
# 更新软件包
sudo apt-get update
# 安装mosquitto
sudo apt-get install mosquitto mosquitto-clients
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Mosquitto 可以在本机上启动一个MQTT Broker 服务,然后通过 mosquitto_sub 和mosquitto_pub 命令订阅发布主题topic 消息,此时MQTT Broker 和MQTT Client 都在本地主机上运行,通过回环接口loopback 通信(前面介绍的嵌入式系统内MQTT Broker 作为消息总线,以发布-订阅方式在多进程间传递分发消息也是使用loopback 回环接口)。命令mosquitto_sub 和mosquitto_pub 也可以连接远程的MQTT Broker,比如test.mosquitto.org(其它公共broker 可参阅public_brokers)。在本地主机启动一个MQTT Broker 服务的命令如下:
启动mosquitto 服务
再打开几个命令行终端窗口,每个窗口可以作为一个MQTT Client,需要订阅某个topic 的Client 使用mosquitto_sub 命令订阅感兴趣的topic 即可,需要向某个topic 发布消息的Client 使用mosquitto_pub 命令向指定topic 发送任意的消息即可,最简单的订阅发布命令如下:

# MQTT Client 1
> mosquitto_sub -t "mqtt/test"
Hello, IoT!

# MQTT Client 2
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用Wireshark 抓包查看MQTT Broker 和MQTT Client 交互的报文(选择捕获loopback 回环接口上的报文,可以设置过滤条件为"mqtt"),可以发现MQTT 通信在发出command / request 后一般都有ack / response 报文,主要分为连接报文Connect / Disconnect、订阅主题报文Subscribe、发布消息报文Publish、保活心跳报文Ping 等四类:
Wireshark 抓去MQTT 数据包示例1
从上图也可以看出,mosquitto 默认使用的MQTT 版本为v3.1.1(如果想使用MQTT v5 版本传输报文,只需要添加参数" -V mqttv5" 即可),源地址和目的地址都是本地地址。如果我们想连接远程MQTT Broker(比如 test.mosquitto.org),可以使用如下命令(命令mosquitto_sub 和mosquitto_pub 支持的参数可通过" --help" 查看):

# MQTT Client 1
> mosquitto_sub -t "mqtt/test" -h test.mosquitto.org -p 1883
Hello, IoT!

# MQTT Client 2
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!" -h test.mosquitto.org -p 1883
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用Wireshark 抓取的MQTT 报文源地址和目的地址显示(记得切换捕获网卡设备),确实是连接了远程MQTT Broker:
Wireshark 抓去MQTT 数据包示例2
互联网安全问题比较重要,比如HTTP/2 默认使用TLS 保证通信安全。MQTT 协议作为物联网的主流协议,通信安全尤为重要,如果我们想借助TLS 协议对通信内容进行加密传输,可以使用如下的命令(证书文件mosquitto.org.crt 从 https://test.mosquitto.org/ 下载获得):

# MQTT Client 1
> mosquitto_sub -t "mqtt/test" -h test.mosquitto.org -p 8883 --cafile mosquitto.org.crt
Hello, IoT!

# MQTT Client 2
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!" -h test.mosquitto.org -p 8883 --cafile mosquitto.org.crt
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用Wireshark 抓取的MQTTS 报文显示,确实是TLS 报文,端口号为8883,TLS 报文显示其应用层协议为MQTT:
Wireshark 抓去MQTT 数据包示例3
TLS 不仅可以对MQTT 报文进行加密和完整性校验,还可以借助X.509 数字证书来相互验证对方的身份(Client 可以验证Broker 的证书,Broker 也可以验证Client 的证书)。但很多物联网设备硬件资源很有限,即便是mbedTLS 占用的计算存储资源也不容忽视,所以很多物联网设备并没有使用TLS 和X.509 证书来保证信息安全和身份验证,MQTT 报文就相当于明文传输,很容易泄露信息或被攻击。如果因为硬件资源受限,无法使用TLS 协议,该如何保证通信安全和身份验证呢?

在应用层使用对称加密算法可以解决明文传输不安全的问题,使用消息认证码可以验证消息的完整性,不过应用层只能对MQTT 报文中的payload 加密,MQTT 头部字段依然是明文传输,安全性弱一些。如何解决客户端身份认证问题呢?

回顾HTTPS 协议的身份认证机制,除了TLS 客户端证书认证外,还有基于表单的验证,也就是用户名密码认证,MQTT 报文也是支持用户名和密码认证机制的。MQTT Broker 可以通过配置,要求MQTT Client 在连接之前提供有效的用户名和密码(这些信息需要加密传输),限制非预期设备的连接,实现对待连接MQTT Client 的身份验证。MQTT Broker 配置用户名密码身份认证的命令如下:

# 通过mosquitto_passwd 命令创建一个新的密码文件,并新增用户名“admin”,密码“password”
> sudo mosquitto_passwd -c /etc/mosquitto/passwd admin
Password:
Reenter password:

# 创建并打开一个默认配置文件default.conf,该配置文件会被mosquitto 调用
> sudo nano /etc/mosquitto/conf.d/default.conf

# 在配置文件default.conf 中新增如下配置项:不允许匿名连接,并配置允许连接的Client 密码文件。保存后退出
allow_anonymous false
password_file /etc/mosquitto/passwd

# 重新启动mosquitto 服务,让我们修改的配置文件生效
> sudo systemctl restart mosquitto
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

重启mosquitto 服务,让配置文件的修改生效,我们再次尝试订阅主题、发布消息,若不指定用户名和密码,会提示连接认证失败,使用刚才我们配置的用户名密码可以成功连接并订阅主题发布消息,命令如下:

# MQTT Client 1
> mosquitto_sub -t "mqtt/test"
Connection error: Connection Refused: not authorised.

# MQTT Client 2
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!"
Connection error: Connection Refused: not authorised.
Error: The connection was refused.

# MQTT Client 3
> mosquitto_sub -t "mqtt/test" -u "admin" -P "password"
Hello, IoT!

# MQTT Client 4
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!" -u "admin" -P "password"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

物联网系统的安全性,不仅要求对通信内容进行加密传输、对请求连接的客户端进行身份验证,还需要对客户端能访问的数据进行访问授权管理,让连接的客户端只能访问被允许的数据。MQTT Broker 通过配置password_file 可以只允许指定的MQTT Client 连接,通过配置acl_file 则可以控制哪些主题topic 可以被哪些Client 订阅或发布,从而限制每个连接的MQTT Client 只能订阅或发布被允许的主题topic。

3.2 MQTT 报文格式是怎样的?

前面通过mosquitto_sub 和mosquitto_pub 命令展示了MQTT 协议的工作机制,也通过Wireshark 抓取了MQTT Broker 和Client 之间交互的数据包,想继续深入了解MQTT 的工作原理,需要深入到MQTT 报文结构中了。

MQTT 的报文主要分为三部分:
MQTT Packet structure

  • Fixed header:固定头部是每个MQTT 报文都有的,主要包括长度为 1 字节的控制头部和长度为 1~4 字节的剩余报文长度字段(也即Variable header 和Payload 的总字节数),Control Header 包括MQTT 报文类型和相应的标识位两部分构成,MQTT 支持的报文类型如下(除了PUBLISH 报文外,其余的报文标识位均为Reserved):
    MQTT Fixed Header
  • Variable header:变长头部是可选的,包含哪些字段取决于消息类型,比如跟订阅发布消息有关的报文常包含Packet Identifier 字段,用于管理连接和报文传输的Properties 字段(不同报文支持的属性组合不同,因此还需要Property Length 字段),用于应答或响应的报文常包含Reason Code 字段(MQTT v5 新增的)等(不同报文支持哪些Properties 或Reason Code 可参阅 MQTT_v5_procotol);
  • Payload:有效数据载荷也是可选的,一般连接报文、订阅/取消订阅报文、发布报文等包含payload 部分,比如身份认证信息、遗嘱信息、需要发布的应用数据、订阅或取消订阅的主题列表等。受Fixed header 部分Packet Length 字段最长4 字节的限制,payload 理论上的最大载荷是256 MB,一般MQTT Broker 会设置更小的载荷限制。

MQTT 一共支持15 种控制报文,可以分为连接管理报文CONNECT / CONNACK / DISCONNECT / AUTH、心跳保活报文PINGREQ / PINGRESP、订阅管理报文SUBSCRIBE / SUBACK / UNSUBSCRIBE / UNSUBACK、发布消息报文PUBLISH / PUBACK / PUBREC / PUBREL / PUBCOMP 等几大类,下面分别从MQTT 协议功能角度分析各报文的作用和格式。

3.3 MQTT 如何管理连接?

MQTT 是工作在TCP / TLS 协议之上的,是基于连接的,因此在进行主题订阅或消息发布之前,需要先建立连接。MQTT Broker 为了保证网络安全,一般会对请求连接的Client 进行身份验证,因此CONNECT 报文可以携带身份验证信息。MQTT Client 与MQTT Broker 建立连接的过程如下:
MQTT Client 与Broker 建立连接过程
前面抓取的是MQTT v3.1.1 的数据包,下面改为抓取MQTT v5 的数据报文,同时增加身份认证和遗嘱设置选项,命令如下:

# MQTT Client 1
> mosquitto_sub -t "mqtt/test" -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test" --will-payload "Goodbye!"
Hello, IoT!

# MQTT Client 2
> mosquitto_pub -t "mqtt/test" -m "Hello, IoT!" -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test" --will-payload "Goodbye!"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

使用Wireshark 抓取的MQTT CONNECT 报文格式如下:
MQTT CONNECT 抓包示例1
MQTT Client 在建立连接时,可以设置遗嘱消息,也即当MQTT Broker 发现该 Client 断开连接时,会将其设置的遗嘱消息发送给设定的主题topic,让订阅该topic 的客户端及时知道该Client 连接断开的通知。CONNECT 报文中比较常用的参数字段如下:

CONNECT packet parametersDescription
Client IDMQTT Broker 辨识MQTT Client 的标识符,连接到Broker 的每个Client 的ID 应该是唯一的。
User Name / PasswordMQTT Broker 对请求连接的Client 进行身份验证和授权的依据。
Clean Session默认情况下MQTT Client 会与Broker 建立一个干净的会话,也即没有任何历史订阅信息的会话。若该项设置为0 且Broker 上存在相同Client ID 的会话,则会继续使用该Client ID 的订阅信息建立连接,相当于恢复已经存在的会话。
Will Topic / PayloadMQTT Client 可以在建立连接时设置遗嘱消息,当Broker 检测到该Client 意外断开连接时,会将Will Payload 转发给订阅Will Topic 的Client,通过DISCONNECT 正常断开连接时Broker 不会发送遗嘱消息。
Keep AliveMQTT Client 发送心跳报文PING 的时间间隔,默认为60 秒,若Broker 超过该时间间隔未收到PING 报文,则判断该Client 连接断开。若该Client 设置了遗嘱消息,Broker 则会按照设置转发遗嘱消息。

CONNACK 报文中常用的参数字段如下:

CONNACK packet parametersDescription
Session Present会话是否存在标识位,MQTT Client 请求建立连接时有个Clean Session 字段设置是否建立一个干净的会话,MQTT Broker 通过该标识位通知Client 是建立了一个干净的会话(该位值为0),还是恢复了一个存在的会话(该位值为1)。
Reason CodeMQTT Broker 收到来自Client 的CONNECT 连接报文后,会通过该响应码通知该Client 连接建立情况,若返回0 则表示连接成功,若连接失败也可以从该响应码获知连接失败的原因。

MQTT Client 在订阅主题等待接收消息的过程中,需要保持长连接状态,TCP 服务器为了合理管理客户端占用的资源,需要通过保活探测报文了解客户端的运行状况(保活探测报文间隔周期默认为两个小时)。MQTT 是为网络状况糟糕的通信环境设计的,MQTT Broker 需要及时知道主题订阅者的连接状态,因此也需要Client 周期性发送心跳保活报文,若Broker 超过保活间隔时间未收到某个Client 的心跳报文,则判断其已经断开连接。

MQTT 心跳保活报文PINGREQ 和PINGRESP 比较简单,没有变长头部和有效载荷payload,只有固定头部,没什么好介绍的。

MQTT Client 因为糟糕的网络状况断开连接后,也可以设置为自动重连。如果想恢复先前的会话,可以在CONNECT 报文中的Clean Session 字段设置为0 且Client ID 保持不变,若MQTT Broker 中该Client ID 的会话还存在,即可恢复先前的会话。

3.4 MQTT 如何传递消息?

MQTT 传递消息使用的发布-订阅模型,前面已经介绍过了,这里主要介绍订阅主题和消息发布的过程。MQTT Client 发送或者接收消息都是基于主题topic 的,什么是topic 呢?

主题Topic 类似于URL,也是一种资源定位符,比较常见的是类似于文件目录的形式,每个主题至少包含一个UTF-8 字符(区分大小写),不同的层级之间使用"/" 作为分隔符。为方便订阅一类主题,主题名也可以使用通配符“+” 和“#” 的,比较常见的几种topic 如下:

  • Topics beginning with $:MQTT Broker 常将以$ 开头的主题用作其它目的,比如$SYS 常被用来发布跟Broker 有关的消息,MQTT Client 不能使用以$ 开头的主题名发布消息;
  • Topic Names:主题名不能使用通配符,主题名至少包含一个UTF-8 字符(包括空格),大小写敏感,可以使用层级分隔符“/” ,比如"/"、“/house”、“sport/tennis/player1” 等都是合法的主题名;
  • Topic Filters:主题过滤器常包含通配符,方便客户端一次订阅多个主题。单层通配符“+” 只能用于单个层级主题的匹配,比如“sport/+”可以匹配“sport/player1”和“sport/player2”,但是不匹配“sport/player1/ranking” 或“sport”。多层通配符“#” 可以用于任意层级主题的匹配,但“#”必须是主题过滤器的最后一个字符,比如“sport/#” 可以匹配“sport” 、“sport/player1/ranking”、“sport/tennis/#” 等,“#” 表示订阅所有主题。

MQTT 传递消息是基于订阅-发布模型的,Client 与Broker 建立连接后,如果想接收消息,需要先订阅自己感兴趣的主题topic,一个Client 可以订阅多个主题topic。MQTT Client 如果想对外发布消息,消息都需要指定主题topic,Broker 需要通过topic 确定要将该消息分发给哪些Client(MQTT Broker 为每个topic 维护一个订阅者列表)。MQTT Client 向Broker 订阅感兴趣主题topic 的过程如下:
MQTT Client 向Broker 订阅感兴趣主题的流程
MQTT 为了适应多种应用场景,支持不同的消息传递服务等级QoS,有些不重要的消息即便对方没有收到也影响不大,有些重要的消息则需要确认对方接收到了。还有些特殊场景,比如支付场景,不仅要求对方确认收到该消息,而且要求该消息的传递是幂等的,也即相同的消息发送多次跟发送一次是一样的结果,不至于出现重复支付的情况。

MQTT Client 订阅主题时,可以设置Broker 向其发送消息的最大QoS 等级,我们为了抓取并分析不同QoS 等级消息的传递报文,在mosquitto_sub 和mosquitto_pub 命令中新增QoS 参数后的命令如下:

# MQTT Client 1,以QoS 等级 1 订阅主题"mqtt/test1"
> mosquitto_sub -t "mqtt/test1" -q 1 -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test1" --will-payload "Goodbye!"
Hello, IoT!

# MQTT Client 2,,以QoS 等级 2 订阅主题"mqtt/test2",随后取消订阅该主题
> mosquitto_sub -t "mqtt/test2" -q 2 -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test2" --will-payload "Goodbye!"
Hello, IoT!
> mosquitto_sub -t "mqtt/test2" -q 2 -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin -U "mqtt/test2"

# MQTT Client 3,分别以QoS 等级 1 向主题"mqtt/test1" 发布消息,以QoS 等级 2 向主题"mqtt/test2" 发布消息
> mosquitto_pub -t "mqtt/test1" -m "Hello, IoT!" -q 1 -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test1" --will-payload "Goodbye!"
> mosquitto_pub -t "mqtt/test2" -m "Hello, IoT!" -q 2 -h test.mosquitto.org -p 1883 -V mqttv5 -u admin -P admin --will-topic "mqtt/test2" --will-payload "Goodbye!"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

通过Wireshark 抓取到的SUBSCRIBE 订阅报文格式如下:
MQTT Client 订阅主题报文示例2
SUBSCRIBE 报文中比较常用的参数字段如下:

SUBSCRIBE packet parametersDescription
TopicMQTT Client 向Broker 订阅的主题名或主题过滤器,可以使用通配符一次订阅多个主题。所订阅的主题有消息发布时,Broker 会将发布到该主题的消息转发给该客户端。
QoS告诉MQTT Broker,当所订阅主题有消息发布时,最大以哪个级别的消息服务质量向该Client 发送所订阅主题的消息。

MQTT Client 向Broker 订阅主题时,都会收到一个订阅确认报文SUBACK,Broker 通过该报文通知该Client 订阅主题的处理结果,报文比较简单,主要回复一个响应码:

SUBACK packet parametersDescription
Reason Code如果MQTT Client 订阅的主题被Broker 接受,则会返回Broker 发送所订阅主题消息到该Client 使用的最大QoS(MQTT 支持的QoS 有 0、1、2 三种),如果订阅主题失败,从该响应码可获知订阅失败的原因。

MQTT Client 取消订阅不感兴趣的主题时,通过UNSUBSCRIBE 报文告知Broker 要取消订阅哪些主题,报文中比较常用的参数是topic。Broker 也会通过UNSUBACK 报文通知Client 取消订阅主题的结果,报文中比较常用的参数是Reason Code。取消订阅的过程和报文格式跟订阅过程类似,这里就不赘述了。

MQTT Client 订阅感兴趣的主题后,如果其它Client 在该topic 上发布消息时,Broker 就可以将该主题上的消息转发给它的订阅者。MQTT Client 在指定主题上发布消息的过程如下:
MQTT Client 向Broker 发布某主题的消息
通过Wireshark 抓取到的PUBLISH 订阅报文格式如下:
MQTT Client 发布某个topic 的消息
MQTT Client 向指定主题发布消息时,根据应用场景的不同,需要设置消息发布的服务等级QoS,以便控制发布到指定主题的消息发送到Broker 的服务质量。PUBLISH 报文中比较常用的参数字段如下:

PUBLISH packet parametersDescription
Topic NameMQTT Client 要将应用消息发布到的主题名(相当于一个数据通道),这里的主题名不能包含通配符。Broker 接收到发布者发送的消息后,会根据消息主题将其发送给该主题的所有订阅者。
QoSMQTT Publisher 设置的将消息发送给Broker 的服务等级,可以根据消息的重要程度和应用场景设置,比如是否保证消息送达、多次送达的消息是否幂等。
Retain保留消息标识位,Publisher 告知Broker 是否将该消息设置为保留消息,若Retain 设置为1 则Broker 会存储该消息,以便将其发送给未来订阅该主题的Subscriber。比如低功耗长间隔不定期发布状态的场景中,未来新的订阅者可以在订阅该主题时及时获知发布者的当前状态。
Payload要发布的实际应用数据,数据格式由应用程序定义,可以是UTF-8 字符 或者二进制数据。

前面介绍MQTT Client 订阅主题时也指定了QoS,Broker 实际发送给订阅者的消息服务等级还要看消息发布者将消息发送给Broker 的服务等级。也即,Publisher 到Subscribers 传递消息的服务等级等于发布报文中的QoS 和订阅报文中QoS 的最小值。

某个主题的消息从Publisher 经Broker 到Subscribers,采用不同的消息QoS 服务等级,传递消息的报文交互也不同,MQTT 支持的三种QoS 服务等级下,报文的交互过程对比如下:
MQTT Quality of Service
MQTT 在Publisher 和Subscriber 之间传递消息的三种QoS 服务区别如下:

  • QoS 0 (At most once):Publisher / Broker 只管发布消息,不关心对方是否收到(类似于UDP 协议,没有确认重传机制),QoS 0 的消息发送速率比较高,但消息可能会丢失。常用于对消息丢失不敏感的场景,比如传感器发布状态数据,中间几次数据丢失没关系;
  • QoS 1 (At least once):Publisher / Broker 通过PUBACK 报文确认对方收到了发布的消息,若一段时间内未收到对方的PUBACK 报文则继续重发该消息(重发的消息DUP 标识位设置为1,Packet Identifier 字段值不变),QoS 0 的消息保证对方接收到,但消息可能会重复(Broker / Subscribers 发送PUBACK 报文后再收到相同Packet Identifier 的消息也是按新消息处理的),比如下发给执行器的命令,需要确认命令被响应了;
  • QoS 2 (Exactly once):Publisher / Broker 不仅要通过PUBREC 报文确认对方接收到了发布的消息,还要通过报文PUBREL 和PUBCOMP 的二次交互保证对方接收的消息不重复。这要求接收者Broker / Subscribers 先暂存该消息,等接收到了PUBREL 报文后再将消息递交给上层或转发(消息接收者会忽略PUBLISH 报文中相同Packet Identifier 的消息),QoS 0 的消息对硬件计算存储资源的要求较高、消息延迟也较高,比如在计费支付系统中,每笔订单需要且只能处理一次。

消息接收者Broker / Subscribers 在接收QoS 等级大于0(也即QoS 1 或QoS 2)的消息时,需要在本地暂存该消息(QoS 0 的消息本地不暂存,直接转发或递交给上层应用),QoS 1 的消息需要在本地存储到发送PUBACK 报文后,QoS 2 的消息需要在本地存储到接收PUBREL 报文后。

为了应对网络拥塞的情况,MQTT v5 也是支持流量控制机制的,在CONNECCT 报文和CONNACK 报文中有一个Receive Maximum 的属性字段(类似于TCP 中的接收窗口)。该字段主要是控制QoS 1 或QoS 2 消息流量的,MQTT Client 和Broker 建立连接时,Broker 愿意为Client 同时处理的QoS 1 或QoS 2 最大消息数量(也即消息发送者在接收到PUBACK 报文或PUBREL 报文前,可重复发送的PUBLISH 报文最大数量,当达到最大数量时不能再继续发送该报文)。

MQTT 传递消息的安全性前面已经介绍过了,MQTT Broker 和Client 之间可以使用TLS 协议实现对通信数据的加密、完整性校验、通信对端身份认证等功能。MQTT 应用层也可以消息载荷进行加密和完整性校验。

MQTT Broker 也可以通过用户名密码或身份证书验证连接的MQTT Client 的身份,拒绝未经允许的Client 连接。建立连接后,MQTT Client 也不能随意订阅任何主题,Broker 可以配置哪些Client 可以以怎样的权限访问哪些topic,保证网络中的数据安全。

更多文章:

  • 《IOT-OS之RT-Thread(十八)— 如何使用MQTT 协议实现OneNET 远程监控?》
  • 《Web技术(六):QUIC 是如何解决TCP 性能瓶颈的?》
  • 《Web技术(五):HTTP/2 是如何解决HTTP/1.1 性能瓶颈的?》
  • 《Web技术(四):TLS 握手过程与性能优化》
  • 《Web技术(三):TLS 1.2/1.3 加密原理》
  • 《MQTT协议5.0中文版》
  • 《iot-mqtt-why-good-for-iot》
  • 《mqtt-protocol-iot-devices》
  • 《通俗理解MQTT协议的实现原理和异步方式》
  • 《我最喜欢的进程之间通信方式-消息总线》
  • 《物联网网关开发:基于MQTT消息总线的设计过程(上)》
  • 《物联网网关开发:基于MQTT消息总线的设计过程(下)》
注:本文转载自blog.csdn.net的流云IoT的文章"https://blog.csdn.net/m0_37621078/article/details/116246058"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top