企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
**`MQTT`** 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)是一种基于**发布/订阅**范式的“轻量级”消息协议,由 IBM 发布。 ---- [TOC] ---- ## 概述 * MQTT 可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽的为连接远程设备提供实时可靠的消息服务,它适用于硬件性能低下的远程设备以及网络状况糟糕的环境下,因此 MQTT 协议在 IoT(Internet of things,物联网),小型设备应用,移动应用等方面有较广泛的应用。 * IoT 设备要运作,就必须连接到互联网,设备才能相互协作,以及与后端服务协同工作。而互联网的基础网络协议是 TCP/IP,MQTT 协议是基于 TCP/IP 协议栈而构建的,因此它已经慢慢的已经成为了 IoT 通讯的标准。 MQTT 简单特点介绍 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。该协议的特点有: * 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。 * 对负载内容屏蔽的消息传输。 * 使用 TCP/IP 提供网络连接。 * 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。 * 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。 有三种消息发布服务质量: * “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。 * “至少一次”,确保消息到达,但消息重复可能会发生。 * “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 [[官方网站]](http://mqtt.org/) [[github-wiki]](https://github.com/mqtt/mqtt.github.io/wiki) [[github-Brokers/servers]](https://github.com/mqtt/mqtt.github.io/wiki/servers) [[github-Client libraries]](https://github.com/mqtt/mqtt.github.io/wiki/libraries) connect/publish/subscribe ## 结构 :-: ![](https://img.kancloud.cn/1e/8f/1e8fc5520847f88d2d5fa3958b2d256d_211x381.jpg) `MQTT`协议中有三种身份:`发布者(Publish)`、`代理(Broker)`(服务器)、`订阅者(Subscribe)`。 消息的`发布者`和`订阅者`都是客户端,消息`代理`是服务器,消息`发布者`可以同时是`订阅者`。 ## broker ### ActiveMQ /Appollo [[官方网站]](https://activemq.apache.org/) [[Github-activemq]](https://github.com/apache/activemq) [[下载地址]](http://activemq.apache.org/apollo/download.html) Apache ActiveMQ is a high performance Apache 2.0 licensed Message Broker and JMS 1.1 implementation. ### Mosquitto [[官方网站-mosquitto]](http://mosquitto.org/) Eclipse Mosquitto is an open source (EPL/EDL licensed) message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1. The Mosquitto project also provides a C library for implementing MQTT clients, and the very popular mosquitto\_pub and mosquitto\_sub command line MQTT clients. Mosquitto is part of the [Eclipse Foundation](https://eclipse.org/), is an [iot.eclipse.org](https://iot.eclipse.org/) project and is sponsored by [cedalo.com](https://cedalo.com/). 如果需要连接的设备没有超过 10 万台,使用 8GB 内存的云主机跑 Mosquitto 就可以; 如果设备量是几十万台,可以考虑 Mosquitto 做集群负载均衡; 如果设备量是大几十万台乃至百万台以上,那你需要专业的团队或专门的投入来维护这件事情, **libmosquitto** — MQTT version 5.0/3.1.1 client library [[官方文档-api]](https://mosquitto.org/api/) Author: Roger Light`<[roger@atchoo.org](mailto:roger@atchoo.org)>` ### EMQ X [[官方网站-emqx.io]](https://emqx.io) [[官方文档-docs]](https://docs.emqx.io/) [[官方指南-tutorial]](https://docs.emqx.io/tutorial/v4/en/) [[Github-emqx]](https://github.com/emqx/emqx) ![](https://img.kancloud.cn/24/09/24093e433dc8a5b494a4dd5c9e1c32a4_731x401.jpg) Scalable and Reliable Real-time MQTT 5.0 Message Broker for IoT in 5G Era. ![](https://img.kancloud.cn/05/60/056052b78add3e63b50cd52ad533930b_1152x512.png) * [EMQ X Broker download](https://www.emqx.io/downloads#broker) * [EMQ X Enterprise download](https://www.emqx.io/downloads#enterprise) * [EMQ X Edge download](https://www.emqx.io/downloads#edge) * [EMQ X Kuiper download](https://www.emqx.io/downloads#kuiper) | | Broke | Enterprise | Edge | Kuiper | | --- | --- | --- | --- | --- | | Feature Overview | <li>Fully Open Source</li><li> MQTT v5.0 Support</li><li> Distributed Architecture</li><li> High Concurrency</li><li>Low Latency</li><li>Highly Extensible</li><li> Running Anywhere</li> | <li>Million-level Device Connections</li><li> Multiple IoT Protocols Support</li><li> Powerful SQL-based Rule Engine</li><li> Bridging Messages to Kafka</li><li>Various Databases Persistence</li><li> Management&Monitoring Center</li><li> 7x24 Global Support Team</li> | <li>Fully Open Source</li><li> x86 and ARM support</li><li> Bridge to Multi-Cloud</li><li> Offline Data Storage</li><li> SQL-based Rule Engine</li><li> Web-based Management </li>| <li> Fully Open Source</li><li> Lightweight and across operating system</li><li> Real-time streaming processing</li><li> Rich time windows support</li><li> Rich SQL functions support</li><li> CLI and Restful API tools</li> | #### Installation [[官方文档]](https://docs.emqx.io/broker/latest/en/getting-started/installation.html) - Package manager installation (Linux) **Centos** 1. Install the required dependencies ~~~ $ sudo yum install -y yum-utils device-mapper-persistent-data lvm2 ~~~ 2. Set up a stable repository, taking the CentOS7 as an example. ~~~ $ sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo ~~~ 3. Install the latest version of EMQ X ~~~ $ sudo yum install emqx ~~~ If prompted to accept the GPG key, confirm that the key complies with fc84 1ba6 3775 5ca8 487b 1e3c c0b4 0946 3e64 0d53 and accept the fingerprint. 4. Install a specific version of EMQ X i. Query available version ~~~ $ yum list emqx --showduplicates | sort -r emqx.x86_64 4.0.0-1.el7 emqx-stable emqx.x86_64 3.0.1-1.el7 emqx-stable emqx.x86_64 3.0.0-1.el7 emqx-stable ~~~ ii. Install a specific version based on the version string in the second column, such as 4.0.0 ~~~ $ sudo yum install emqx-4.0.0 ~~~ 5. Start EMQ X * Directly start ~~~ $ emqx start emqx 4.0.0 is started successfully! $ emqx_ctl status Node 'emqx@127.0.0.1' is started emqx v4.0.0 is running ~~~ * systemctl start ~~~ $ sudo systemctl start emqx ~~~ * service start ~~~ $ sudo service emqx start ~~~ 6. Stop EMQ X Broker ~~~ $ emqx stop ok ~~~ 7. Remove EMQ X Broker ~~~ $ sudo yum remove emqx ~~~ #### ZIP (Linux、MaxOS、Windows) 1. Download the zip package of the EMQ X Broker version to be installed from [emqx.io](https://www.emqx.io/downloads/broker?osType=Linux) or [Github](https://github.com/emqx/emqx/releases). 2. Unzip the installation file: ~~~ $ unzip emqx-ubuntu18.04-v4.0.0.zip ~~~ 3. Start EMQ X Broker ~~~ $ ./bin/emqx start emqx 4.0.0 is started successfully! $ ./bin/emqx_ctl status Node 'emqx@127.0.0.1' is started emqx v4.0.0 is running ~~~ 4. Stop EMQ X Broker ~~~ $ ./bin/emqx stop ok ~~~ 5. Remove EMQ X Broker Simply delete the EMQ X Broker directory #### Download and Start EMQ X in Five Minutes Once the package is downloaded and installed (or unzipped), the EQM X is ready to start. Taking the zip package for Mac as an example: ~~~ unzip emqx-macosx-v3.2.0.zip && cd emqx //start emqx ./bin/emqx start // Check the running status ./bin/emqx_ctl status // stop emqx ./bin/emqx stop ~~~ After EMQ X is started, the MQTT client can connect to it through port `1883`. By default, the running log is in the directory of `log/`. EMQ X loads the Dashboard plugin and launches the web management console by default. Users can check the broker running status, statistics, connections, sessions, topics, subscriptions, and plugins through the web console. Console address: `http://127.0.0.1:18083/`, default username: admin, password:public **Listener** is mainly used to configure listening ports and related parameters of different protocols. EMQ X Broker supports configuring multiple Listeners to listen to multiple protocols or ports at the same time. The following are the supported Listeners: | Listener | Description | | --- | --- | | TCP Listener | A listener for MQTT which uses TCP | | SSL Listener | A secure listener for MQTT which uses TLS | | Websocket Listener | A listener for MQTT over WebSockets | | Secure Websocket Listener | A secure listener for MQTT over secure WebSockets (TLS) | :-: **The default TCP ports used by the EMQ X message server** | Port | Description | | --- | --- | | 1883 | MQTT protocol port | | 8883 | MQTT/SSL port | | 8083 | MQTT/WebSocket port | | 8081 | HTTP API port | | 18083 | Dashboard Management Console Port | For each version EMQ X will be released as installation packages or zip packages for diverse OSes and platforms, including CentOS, Ubuntu, Debian, FreeBSD, macOS, Windows and etc. It is also available as Docker image. EMQ X 配置文件: /etc/emqx/emqx.conf 插件配置文件: /etc/emqx/plugins/*.conf 环境配置文件:/bin/emqx_env ~~~ #!/bin/sh [ "x" = "x$EMQX_NODE_NAME" ] && EMQX_NODE_NAME=emqx@127.0.0.1 [ "x" = "x$EMQX_NODE_COOKIE" ] && EMQX_NODE_COOKIE=emqxsecretcookie [ "x" = "x$EMQX_MAX_PACKET_SIZE" ] && EMQX_MAX_PACKET_SIZE=64KB [ "x" = "x$EMQX_MAX_PORTS" ] && EMQX_MAX_PORTS=65536 [ "x" = "x$EMQX_TCP_PORT" ] && EMQX_TCP_PORT=1883 [ "x" = "x$EMQX_SSL_PORT" ] && EMQX_SSL_PORT=8883 [ "x" = "x$EMQX_WS_PORT" ] && EMQX_WS_PORT=8083 [ "x" = "x$EMQX_WSS_PORT" ] && EMQX_WSS_PORT=8084 ~~~ 日志文件目录: /var/log/emqx 数据文件目录:/var/lib/emqx/ 启动停止 `systemctl start|stop|restart emqx.service` 安装完自动支持`ws`,想支持`wss`的小伙伴,去下载一个证书放到你的服务器 然后去`emqx.conf`,更改配置信息即可 `listener.wss.external.keyfile = 你自己的key文件` `listener.wss.external.certfile = 你自己的pem文件` 重启就可以了 用微信小程序连接的朋友,必须支持wss哟! ~~~bash $ emqx_ctl listeners listener on mqtt:ssl:8883 acceptors : 16 max_conns : 102400 current_conn : 0 shutdown_count : [] listener on mqtt:tcp:0.0.0.0:1883 acceptors : 8 max_conns : 1024000 current_conn : 0 shutdown_count : [{function_clause,36}] listener on mqtt:tcp:127.0.0.1:11883 acceptors : 4 max_conns : 1024000 current_conn : 0 shutdown_count : [] listener on http:dashboard:18083 acceptors : 4 max_conns : 512 current_conn : 0 shutdown_count : [] listener on http:management:8081 acceptors : 2 max_conns : 512 current_conn : 0 shutdown_count : [] listener on mqtt:ws:8083:8083 acceptors : 4 max_conns : 102400 current_conn : 1 shutdown_count : [] listener on mqtt:wss:8084:8084 acceptors : 4 max_conns : 16 current_conn : 0 shutdown_count : [] ~~~ #### Quick Start EMQ X uses `8083` ports for `WebSocket` connections, and `8084` for `WebSocket with SSL`. Local connect url is:`ws://localhost:8083/mqtt` The connect url can be split like:`ws:`//`localhost`:`8083` `/mqtt` `protocol`//`domain`:`port`/`path` Connection code: #### Open Source MQTT Client Project GitHub: [https://github.com/emqtt](https://github.com/emqtt) | Name | Description | | --- | --- | | emqttc | Erlang MQTT client library | | CocoaMQTT | Swift Language MQTT Client Library | | QMQTT | QT framework MQTT client library | | emqtt_benchmark | MQTT benchmark tool | #### EMQX\_AUTH\_HTTP 插件使用指南 [[出处]](https://www.cnblogs.com/emqx/p/11400545.html) #### 如何开机启动EMQX? **window 10** 1. 以管理员身份启动`powershell` 2. 到`emqx`解压后的`emqx/bin`目录 3. 运行命令`emqx install`安装windows服务 利用`bat`文件运行多条cmd命令(EMQ服务器注册和启动) ~~~ //关闭之前打开的cmd窗口 @echo off //注册emq服务 start D:\softwareDev\tools\EMQX\emqx-windows-v4.0.3\emqx\bin\emqx install //启动emq服务 D:\softwareDev\tools\EMQX\emqx-windows-v4.0.3\emqx\bin\emqx start //查询emq服务状态 D:\softwareDev\tools\EMQX\emqx-windows-v4.0.3\emqx\bin\emqx_ctl status //关闭cmd窗口 close ~~~ #### Rule Engine [[官方文档]](https://docs.emqx.io/tutorial/latest/en/rule_engine/rule_engine.html) ![](https://img.kancloud.cn/21/d3/21d338bebf4e83949b6d372bf2770ce4_1522x754.png) #### HTTP Publish [[官方文档1]](https://docs.emqx.io/broker/v3/en/guide.html#http-publish-api) The EMQ X message server provides an HTTP publish interface through which an application server or web server can publish MQTT messages: ~~~ HTTP POST http://host:8080/api/v3/mqtt/publish ~~~ Web servers such as PHP/Java/Python/NodeJS or Ruby on Rails can publish MQTT messages via HTTP POST requests: ~~~ curl -v --basic -u user:passwd -H "Content-Type: application/json" -d \ '{"qos":1, "retain": false, "topic":"world", "payload":"test" , "client_id": "C_1492145414740"}' \-k http://localhost:8080/api/v3/mqtt/publish ~~~ HTTP interface parameters: | parameter | description | | --- | --- | | client\_id | MQTT client ID | | qos | QoS: 0 | 1 | 2 | | retain | Retain: true | false | | topic | Topic | | payload | message payload | Note >[info] HTTP publishing interface uses authentication of [Basic](https://en.wikipedia.org/wiki/basic_access_authentication). The user and password in the above example are from the AppId and password in the Applications of Dashboard. [[官方文档2]](https://docs.emqx.io/broker/latest/en/advanced/http-api.html#endpoint-publish) `POST /api/v4/mqtt/publish` Publish MQTT message。 **Parameters (json):** | Name | Type | Required | Default | Description | | --- | --- | --- | --- | --- | | topic | String | Optional | | For topic and topics, with at least one of them specified | | topics | String | Optional | | Multiple topics separated by`,`. This field is used to publish messages to multiple topics at the same time | | clientid | String | Required | | Client identifier | | payload | String | Required | | Message body | | encoding | String | Optional | plain | The encoding used in the message body. Currently only plain and base64 are supported. | | qos | Integer | Optional | 0 | QoS level | | retain | Boolean | Optional | false | Whether it is a retained message | **Success Response Body (JSON):** | Name | Type | Description | | --- | --- | --- | | code | Integer | 0 | **Examples:** ~~~ $ curl -i --basic -u admin:public -X POST "http://localhost:8081/api/v4/mqtt/publish" -d '{"topic":"a/b/c","payload":"Hello World","qos":1,"retain":false,"clientid":"example"}' {"code":0} ~~~ #### MQTT WebSocket Connection EMQ X also supports WebSocket connections, web browsers or applications can connect directly to the broker via WebSocket: | | | | --- | --- | | WebSocket URI: | ws(s)://host:8083/mqtt | | Sec-WebSocket-Protocol: | ‘mqttv3.1’ or ‘mqttv3.1.1’ | The Dashboard plugin provides a test tool for an MQTT WebSocket connection: ~~~ http://127.0.0.1:18083/#/websocket ~~~ #### HTTP API [[官方文档]](https://docs.emqx.io/broker/latest/en/advanced/http-api.html#endpoint-brokers) EMQ X Broker provides HTTP APIs for integration with external systems, such as querying client information, publishing messages, and creating rules. EMQ X Broker's HTTP API service listens on port 8081 by default. You can modify the listening port through the configuration file of `etc/plugins/emqx_management.conf`, or enable HTTPS listening. All API calls start with `api/v4` after EMQ X Broker 4.0.0. EMQ X Broker's HTTP API uses the method of [Basic Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication). The `id` and `password` must be filled with `AppID` and `AppSecret` respectively. The default AppID and AppSecret are:`amdin/public`. You can modify and add AppID / AppSecret in the left menu bar of `Dashboard` by selecting "Manage"-> "Apps". #### Hooks [[官方文档]](https://docs.emqx.io/broker/latest/en/advanced/hooks.html) **Hooks**are a mechanism provided by EMQ X Broker, which modifies or extends system functions by intercepting function calls, message passing, and event passing between modules. In simple terms, the purpose of this mechanism is to enhance the scalability of the software system, facilitate integration with the three-party systems, or change the original default behavior of its system. Such as: ![](https://img.kancloud.cn/2c/ff/2cff044ee677662a90ae482c3f732293_1168x536.png) - **HookPoint** EMQ X Broker is based on a client's key activities during its life cycle, and presets a large number of **HookPoints**. The preset mount points in the system are: | Name | Description | Execution Timing | | --- | --- | --- | | client.connect | Process connection packet | When the server receives the connection packet from the client | | client.connack | Issue connection response | When the server is ready to issue a connection response message | | client.connected | Connection succeed | After client authentication is completed and successfully connected to the system | | client.disconnected | Disconnect | Connection layer of client is ready to close | | client.authenticate | Connection authentication | After`client.connect`is executed | | client.check\_acl | ACL authentication | Before publish/subscribe` operation is executed | | client.subscribe | Subscribe to topic | After receiving the subscription message, and before executing`client.check_acl` | | client.unsubscribe | Unsubscribe | After receiving the unsubscribe packet | | session.created | Session creation | When a`client.connected`is completed and a new session is created | | session.subscribed | Session subscription topics | After the subscription operation is completed | | session.unsubscribed | Session unsubscription | After the unsubscription operation is completed | | session.resumed | Session resume | when`client.connected`is executed and the old session information is successfully resumed | | session.discarded | Session discarded | After the session was terminated due to discarding | | session.takeovered | Session takeovered | After the session was terminated due to takeovering | | session.terminated | Session terminated | After the session was terminated due to other reason | | message.publish | Message published | Before the server publishes (routes) the message | | message.delivered | Message delivered | Before the message is ready to be delivered to the client | | message.acked | Message acked | After the message ACK is received from the client | | message.dropped | Message dropped | After the published messages are discarded | >[info] * **The session is discarded:** When the client logs in with the method of`clean session`, if the client's session already exists on the server, the old session will be discarded. > * **The Session is taken over:** When the client logs in with the method of`Reserved Session`, if the client's session already exists on the server, the old session will be taken over by the new connection - **Callback function** The input parameters and returned value of the callback function are shown in the following table: (For parameter data structure, see:[emqx\_types.erl](https://github.com/emqx/emqx/blob/master/src/emqx_types.erl)) | Name | input parameter | Returned value | | --- | --- | --- | | client.connect | `ConnInfo`:Client connection layer parameters <br> `Props`:Properties of MQTT v5.0 connection packets | New`Props` | | client.connack | `ConnInfo`:Client connection layer parameters <br> `Rc`:returned code <br>`Props`: Properties of MQTT v5.0 connection response packets | New`Props` | | client.connected | `ClientInfo`: Client information parameters <br> `ConnInfo`: Client connection layer parameters | \- | | client.disconnected | `ClientInfo`:Client information parameters <br> `ConnInfo`:Client connection layer parameters `ReasonCode`:Reason code | \- | | client.authenticate | `ClientInfo`:Client information parameters <br> `AuthResult`:Authentication results | New`AuthResult` | | client.check\_acl | `ClientInfo`:Client information parameters <br> `Topic`:Publish/subscribe topic <br> `PubSub`: Publish/subscribe <br> `ACLResult`:Authentication result | New`ACLResult` | | client.subscribe | `ClientInfo`:Client information parameters <br> `Props`:Properties parameters of MQTT v5.0 subscription messages <br> `TopicFilters`:List of topics of subscription | New`TopicFilters` | | client.unsubscribe | `ClientInfo`:Client information parameters <br> `Props`:Properties parameters of MQTT v5.0 unsubscription messages <br> `TopicFilters`:List of topics of unsubscription | New`TopicFilters` | | session.created | `ClientInfo`:Client information parameters <br> `SessInfo`:Session information | \- | | session.subscribed | `ClientInfo`:Client information parameters <br> `Topic`:subscribed topic <br> `SubOpts`:Configuration options for subscribe operations | \- | | session.unsubscribed | `ClientInfo`:Client information parameters <br> `Topic`:unsubscribed topic <br> `SubOpts`:Configuration options for unsubscribe operations | \- | | session.resumed | `ClientInfo`:Client information parameters <br> `SessInfo`:Session information | \- | | session.discarded | `ClientInfo`:Client information parameters <br> `SessInfo`:Session information | \- | | session.takeovered | `ClientInfo`:Client information parameters <br> `SessInfo`:Session information | | | session.terminated | `ClientInfo`:Client information parameters <br> `Reason`:Termination reason <br> `SessInfo`:Session information | \- | | message.publish | `Message`:Message object | New`Message` | | message.delivered | `ClientInfo`:Client information parameters <br> `Message`:Message object | New`Message` | | message.acked | `ClientInfo`:Client information parameters <br> `Message`:Message object | \- | | message.dropped | `Message`:Message object <br> `By`:Dropped by <br> `Reason`:Drop reason | \- | For the application of these hooks, see: [emqx\_plugin\_template](https://github.com/emqx/emqx-plugin-template) #### WebHooks [[官方文档]](https://docs.emqx.io/broker/latest/en/advanced/webhooks.html) WebHook is a plugin provided by the [emqx\_web\_hook](https://github.com/emqx/emqx-web-hook) plugin with the function of notifying a web service of hook events in EMQ X Broker. The internal implementation of WebHook is based on `hooks`, but it is closer to the top level. It obtains various events in EMQ X Broker through the callback function mounted on the hook, and forwards them to the web server configured in `emqx_web_hook`. Taking the client.connected event as an example, the event delivery process is as follows: ~~~ Client | EMQ X | emqx_web_hook | HTTP +------------+ =============>| - - - - - - -> - - - - - - - ->===========> | Web Server | | Broker | | Request +------------+ ~~~ >[info] **WebHook** processes events in one-way pattern. It only supports pushing events in EMQ X Broker to Web services, and does not care about the return of Web services. With the help of Webhooks, many services such as device going online, online and offline recording, subscription and message storage, and message delivery confirmation can be completed. - **Trigger rule** Trigger rules can be configured in`etc/plugins/emqx_web_hooks.conf`. The configuration format is as follows: ~~~ ## Format example web.hook.rule.<Event>.<Number> = <Rule> ## Example web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"} web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"} ~~~ - **Trigger event** The following events are currently supported: | Name | Description | Execution timing | | --- | --- | --- | | client.connect | Processing connection packets | When the server receives the client's connection packet | | client.connack | Issue connection acknowledge | When the server is ready to send connack packet | | client.connected | connected | After the client authentication is completed and successfully connected to the system | | client.disconnected | disconnected | When the client connection layer is about to close | | client.subscribe | subscribe | After receiving the subscription message,and before executing`client.check_acl`authentication | | client.unsubscribe | unsubscribe | After receiving the unsubscription message | | session.subscribed | Session subscribed | After completing the subscription operation | | session.unsubscribed | session unsubscribed | After completing the unsubscription operation | | message.publish | message published | Before the server rpublishes (routes) the message | | message.delivered | message deliveried | Before the message is ready to be delivered to the client | | message.acked | message acknowledged | After the server received the message ACK from the client | | message.dropped | message dropped | After the published message is dropped | #### CLI [[官方文档]](https://docs.emqx.io/broker/latest/en/advanced/cli.html) EMQ X Broker provides the management command line of `./bin/emqx_ctl` for users to manage, configure and query EMQ X Broker. Example: ~~~ $ ./bin/emqx_ctl status Node 'emqx@127.0.0.1' is started emqx v4.0.0 is running ~~~ | Type | Command | Description | | --- | --- | --- | | `mgmt` command can query the application | `mgmt list` | List application | | | `mgmt insert<AppId><Name>` | Add an application that can access to the HTTP API | | | `mgmt update<AppId><status>` | Update applications that can access to the HTTP API | | | `mgmt lookup<AppId>` | Get application details that can access to the HTTP API | | | `mgmt delete<AppId>` | Remove applications that can access to the HTTP API | #### 集群搭建 [[来源]](https://zhuanlan.zhihu.com/p/47649383) ### mosca [[官方网站-mosca]](http://www.mosca.io/) Mosca is a node.js mqtt broker, which can be used: * Standalone * Embedded in another Node.js application **Features** * MQTT 3.1 and 3.1.1 compliant. * QoS 0 and QoS 1. * Various storage options for QoS 1 offline packets, and subscriptions. * Usable inside ANY other Node.js app. * version 2.0.0+ targets node v6, v4 and v0.12 * version 1.0.0+ targets node v6, v5, v4 and v0.12, with partial support for node v0.10. ## client ### MQTT Client life cycle [[参考文档]](https://docs.emqx.io/broker/latest/en/development/client.html) The behavior of the MQTT client throughout the life cycle can be summarized as establishing a connection, subscribing to a topic, receiving and processing messages, publishing messages to a specified topic, unsubscribing, and disconnecting. Taking a client to connect, publish, and process messages as an example, the steps generally required for each link are: * **Create a connection:** * Specify MQTT Broker basic information access address and port * Specify whether the transmission type is TCP or MQTT over WebSocket * If TLS is enabled, you need to select the protocol version and carry the corresponding certificate * If Broker enables authentication, the client needs to carry the corresponding MQTT Username Password information * Configure client parameters such as keepalive duration, clean session callback retain flag, MQTT protocol version, will message (LWT), etc. * **Subscribe to a topic**: You can subscribe to the topic after the connection is established successfully, and you need to specify the topic information * Specify topic filter, support topic wildcards`+`and`#`when subscribing * Specify QoS, and the option of Qos 0 1 2 can be selected according to the client library and broker implementation. Please note that some brokers and cloud service providers do not support some QoS levels. For example, AWS IoT, Alibaba Cloud IoT Suite, and Azure IoT Hub do not support QoS 2 level * Topic subscription may fail due to network problems, Broker side ACL rules restrictions * **Receive messages and process:** * Generally, the processing function is specified at the time of connection. This processing method is slightly different according to the network programming model of the client library and the platform. * **Publish a message:** Publish a message to a specified topic * Specify the target topic. Note that the topic cannot contain wildcards`+`or`#`. If the topic contains wildcards, it may result in failure of message publishing and client disconnection (depending on the implementation of Broker and client library) * Specify the message QoS level. There are also different QoS levels supported by the Broker and the platform. For example, if the Azure IoT Hub releases a QoS 2 message, the client connection will be disconnected. * Specify the message payload, which cannot exceed the maximum message size set by Broker * Specify the retained message flag * **Unsubscribe** * Just specify the target topic * **Disconnect:** * The client actively disconnects, and the server publishes a will message (LWT) ### MQTT.js [[Github-MQTT.js]](https://github.com/mqttjs/MQTT.js) A client library for the `MQTT` protocol, written in JavaScript for node.js and the browser. As a **breaking change**, the`encoding`option in the old client is removed, and now everything is UTF-8 with the exception of the`password`in the CONNECT message and`payload`in the PUBLISH message, which are`Buffer`. Another **breaking change** is that MQTT.js now defaults to MQTT v3.1.1, so to support old brokers, please read the [client options doc](https://github.com/mqttjs/MQTT.js#client). #### Installation ~~~shell npm install mqtt --save ~~~ MQTT.js supports `WebSocket` protocol connections in `browser` environments and `TCP (SSL/TLS)` protocol in `Node.js` environments. Choose the appropriate connection mode according to your usage scenario: * Use the connect() function to connect and return a client instance * Uses callback functions to handle related logic in client events: * connect:Connection success event * reconnect:Connection error, abnormal disconnection and reconnection events * error:Connection error and termination of connection events * message:Receive subscription message event * client has several basic functions: * subscribe(): Subscribe to a topic or topics * publish(): Publish a message to a topic * end(): Close the client #### API [[Github-API]](https://github.com/mqttjs/MQTT.js) - `mqtt.` - **`connect([url], options)`** - **`Client(streamBuilder, options)`** - `on('eventName',callbackFn)` - `publish(topic, message, [options], [callback])` - `subscribe( topic/topic array/topic object, [options], [callback])` - `unsubscribe(topic/topic array, [options], [callback])` - `end([force], [options], [cb])` - `removeOutgoingMessage( mid)` - `reconnect()` - `handleMessage(packet, callback)` - `getLastMessageId()` - `connected` - `reconnecting` - **`store(streamBuilder, options)`** - `put(packet, callback)` - `del(packet, callback)` - `createStream()` - `close(callback)` :-: `eventName` vs `callbackFn` | envent-name | callback-function | Emitted | note | | --- | --- | --- | --- | | `connect` | function (connack) {} | on successful (re)connection (i.e. connack rc=0) | `connack` received connack packet. When `clean` connection option is `false` and server has a previous session for `clientId` connection option, then `connack.sessionPresent` flag is `true`. When that is the case, you may rely on stored session and prefer not to send subscribe commands for the client. | | `reconnect` | function () {} | when a reconnect starts. | - | | `close` | function () {} | after a disconnection | - | | `disconnect` | function (packet) {} | after receiving disconnect packet from broker. MQTT 5.0 feature. | - | | `offline` | function () {} | when the client goes offline. | - | | `error` | function (error) {} | when the client cannot connect (i.e. connack rc != 0) or when a parsing error occurs. | - | | `end` | function () {} | when `mqtt.Client#end()` is called. If a callback was passed to`mqtt.Client#end()`, this event is emitted once the callback returns. | - | | `message` | function (topic, message, packet) {} | when the client receives a publish packet | <li>`topic` topic of the received packet</li> <li> `message` payload of the received packet</li> <li> `packet` received packet, as defined in `mqtt-packet`</li> | | `packetsend` | function (packet) {} | when the client sends any packet. This includes .published() packets as well as packets used by MQTT for managing subscriptions and connections | `packet` received packet, as defined in `mqtt-packet` | | `packedreceive` | function (packet) {} | when the client receives any packet. This includes packets from subscribed topics as well as packets used by MQTT for managing subscriptions and connections | `packet` received packet, as defined in `mqtt-packet` | #### About QoS Here is how QoS works: * QoS 0 : received **at most once**: The packet is sent, and that's it. There is no validation about whether it has been received. * QoS 1 : received **at least once**: The packet is sent and stored as long as the client has not received a confirmation from the server. MQTT ensures that it *will* be received, but there can be duplicates. * QoS 2 : received **exactly once**: Same as QoS 1 but there is no duplicates. About data consumption, obviously, QoS 2 > QoS 1 > QoS 0, if that's a concern to you. ---- ### phpMQTT [[github, phpMQTT]](https://github.com/bluerhinos/phpMQTT) a simple php class to connect/publish/subscribe to a MQTT broker ### Mosquitto-PHP [[github, Mosquitto-PHP]](https://github.com/mgdm/Mosquitto-PHP) [[官方文档]](https://mosquitto-php.readthedocs.io/en/latest/overview.html) A wrapper for the Eclipse Mosquitto™ MQTT client library for PHP. ## 参考 ### 浅谈,实例mqtt [[来源]](https://www.jianshu.com/p/abc6f5a8face) ### 物联网通信协议——比较-MQTT、 DDS、 AMQP、XMPP、 JMS、 REST、 CoAP [[来源]](https://www.cnblogs.com/saryli/p/9742709.html)