物联网消息队列客户端-MQTT-基本功能实现

网友投稿 280 2022-11-28

物联网消息队列客户端-MQTT-基本功能实现

1. 主要实现功能

快速开始

4.1 引入依赖

org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5

4.2 配置

spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456

更多配置如下

spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 客户端标识,需保持全局唯一 client-id: parking_server # 是否清除session clean-session: false # 连接超时时间,单位秒 connection-timeout: 10 # 心跳间隔时间,单位秒 keep-alive-interval: 10 # 全局消息质量 global-qos: 1 # 重新连接之间等待的最长时间 maxReconnect-delay: 128000 # 是否自动重新连接 automatic-reconnect: true # 最大消息并发数量,超过此数量并发时可能会丢消息 maxInflight: 1000

4.3 开启自动配置

在启动类上增加​​@EnableRabbitMqAutoConfiguration​​注解

import com.demo.mqttclient.anno.EnableEmqAutoConfiguration;@SpringBootApplication@EnableEmqAutoConfigurationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); }}

4.4 发布消息

在生产者的业务程序中,注入​​MQTTClient​​

import com.demo.mqttclient.MQTTClient;@Resourceprivate MQTTClient defaultMQTTClient;

为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。

4.4.1 第三方消息发送

public String publishHeartbeatReply() { HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage(); heartbeatReplyMessage.setCmd(32896); heartbeatReplyMessage.setExpire(1605252875L); heartbeatReplyMessage.setDevid("095437323930030130523933"); heartbeatReplyMessage.setServer_time("1605252875"); defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage); return "success";}

4.4.2 内部消息发送

消息实体实现​​Message​​接口

package com.example.test.message;import com.demo.mqttclient.MQTTMessage;import lombok.Data;import java.util.UUID;@Datapublic class demoMessage implements MQTTMessage { private String msgId = UUID.randomUUID().toString(); private String name; private String gender; @Override public String getMsgId() { return this.msgId; } }

然后直接调用该类的​​publish​​方法发送即可

@GetMapping("demo/publish")public String demoPublish() { demoMessage demoMessage = new demoMessage(); demoMessage.setName("点都"); demoMessage.setGender("xx"); defaultMQTTClient.publish("demo/topic", demoMessage); return "success";}

其中存在多个重载的方法。

package com.demo.mqttclient;import com.demo.mqttclient.enums.ShareModelEnum;import com.demo.plugin.core.lang.json.JSONUtil;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * mqtt客户端 * * @author zhangliuyang * @date 2022/07/18 * @since 1.0.0 */public interface MQTTClient { /** * 启动客户端 */ void start(); /** * 关闭客户端 */ void close(); /** * 发布 * * @param topic 主题 * @param message 消息 */ default void publish(String topic, T message) { this.publish(topic, message, 1); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 */ default void publish(String topic, T message, int qos) { this.publish(topic, message, qos, 0); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 * @param delay 延迟时间[unit:s, max:4294967s, condition: > 0] */ default void publish(String topic, T message, int qos, long delay) { this.publish(topic, message, qos, delay, false); } /** * 发布 * * @param topic 主题 * @param message 消息 * @param qos 消息质量 * @param delay 延迟时间[unit:s, max:4294967s, condition: > 0] * @param retained 保留消息 */ default void publish(String topic, T message, int qos, long delay, boolean retained) { MQTTMessageContext mqttMessageContext = new MQTTMessageContext(); mqttMessageContext.setId(message.getMsgId()); mqttMessageContext.setPayload(JSONUtil.write(message)); mqttMessageContext.setQos(qos); mqttMessageContext.setDelay(delay); mqttMessageContext.setRetained(retained); mqttMessageContext.setTimestamp(System.currentTimeMillis()); this.publish(topic, mqttMessageContext); } /** * 发布 * * @param topic 主题 * @param messageContext 消息上下文 */ void publish(String topic, MQTTMessageContext messageContext); /** * 发送到第三方 * * @param topic 主题 * @param message 消息 */ default void publish2ThirdParty(String topic, Object message) { this.publish2ThirdParty(topic, 1, message); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param message 消息 */ default void publish2ThirdParty(String topic, int qos, Object message) { this.publish2ThirdParty(topic, qos, message, Constant.DEFAULT_CHARSET.name()); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param message 消息 * @param charsetName 字符集名称 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName) { this.publish2ThirdParty(topic, qos, message, charsetName, 0); } /** * publish2第三方 * * @param topic 主题 * @param qos qos * @param message 消息 * @param charsetName 字符集名称 * @param delay 延迟时间 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay) { this.publish2ThirdParty(topic, qos, message, charsetName, delay, false); } /** * publish2第三方 * * @param topic 主题 * @param qos qos * @param message 消息 * @param charsetName 字符集名称 * @param delay 延迟时间 * @param retained 是否保留消息 */ default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay, boolean retained) { this.publish2ThirdParty(topic, qos, JSONUtil.toString(message).getBytes(charsetName), delay, retained); } /** * 发送到第三方 * * @param topic 主题 * @param payload 有效载荷 */ default void publish2ThirdParty(String topic, byte[] payload) { this.publish2ThirdParty(topic, 1, payload, 0); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 */ default void publish2ThirdParty(String topic, int qos, byte[] payload) { this.publish2ThirdParty(topic, qos, payload, 0); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 * @param delay 延迟时间 */ default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay) { this.publish2ThirdParty(topic, qos, payload, delay, false); } /** * 发送到第三方 * * @param topic 主题 * @param qos 消息质量 * @param payload 有效载荷 * @param delay 延迟时间 * @param retained 是否保留消息 */ default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay, boolean retained) { MqttMessage mqttMessage = new MqttMessage(); mqttMessage.setPayload(payload); mqttMessage.setQos(qos); mqttMessage.setRetained(retained); this.publish2ThirdParty(topic, delay, mqttMessage); } /** * 发送到第三方 * * @param topic 主题 * @param mqttMessage mqtt消息 * @param delay 延迟时间 */ void publish2ThirdParty(String topic, long delay, MqttMessage mqttMessage); /** * 订阅 * * @param topic 主题 * @param qos 消息质量 * @param shareModel 共享模型 * @param groupName 分组名称 * @param exclusive 排它 * @param messageHandler 消息处理程序 */ void subscribe(String topic, int qos, ShareModelEnum shareModel, String groupName, boolean exclusive, MessageHandler messageHandler);}

4.5 接收消息

消费者需要在消息处理类上添加​​@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})​​​注解,指定要监听​​topics​​​和客户端名称即可。如果没有显示的指定客户端名称,则使用​​defaultMQTTClient​​​,使用​​qos​​​执行订阅消息质量。当消息处理类中有多个​​​public​​​方法时,需要​​@MQTTConsumerMethod​​标记具体消费方法

package com.example.test.handler;import com.demo.mqttclient.MessageHandler;import com.demo.mqttclient.anno.MQTTConsumerMethod;import com.demo.mqttclient.anno.MQTTSubscriber;import com.demo.mqttclient.enums.ShareModelEnum;import com.demo.plugin.core.lang.json.JSONUtil;import com.example.test.message.DeviceStartMessage;import com.example.test.message.PlateRecognitionReportMessage;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.Map;@Slf4j@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE)public class ParkingMessageHandler { public static final String TOPIC = "npt/park/type1/server/10010"; public static final String CMD_KEY = "cmd"; @MQTTConsumerMethod public void handle(Map message) { log.info("handle:{}", message); if (message.containsKey(CMD_KEY)) { Integer cmd = (Integer) message.get(CMD_KEY); switch (cmd) { case 129: handleDeviceStartMessage(message); break; case 140: handlePlateRecognitionReportMessage(message); break; default: log.warn("不支持此cmd:[{}]", cmd); break; } } else { log.warn("消息消费异常"); } } private void handleDeviceStartMessage(Map message) { DeviceStartMessage deviceStartMessage = JSONUtil.toObject(JSONUtil.toString(message), DeviceStartMessage.class); log.info("接收到设备启动消息:{}", deviceStartMessage); } private void handlePlateRecognitionReportMessage(Map message) { PlateRecognitionReportMessage plateRecognitionReportMessage = JSONUtil.toObject(JSONUtil.toString(message), PlateRecognitionReportMessage.class); log.info("接收到车牌上报识别消息:{}", plateRecognitionReportMessage); }}

4.6 发送延迟消息

要发送延迟消息,需要先开启​​emq​​​延迟发布配置。发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为​​​秒​​​,最大为​​4294967​​秒

//发送一个延时时长为10s的消息defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);

4.7 多数据源

spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 多数据源客户端名称 parking: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456

4.7.1 发布消息

首先注入​​MQTTClient​​​,与单数据源的唯一区别就是​​bean​​​的名称。默认向​​Spring​​​容器中添加的实现类名称为​​“${数据源名称}MQTTClient”​​​以上面的配置文件为例,默认的​​bean​​名称为 ​​defaultMQTTClient​​ 和 ​​billMQTTClient​​

import com.demo.mqttclient.MQTTClient;@Resourceprivate MQTTClient defaultMQTTClient;@Resourceprivate MQTTClient billMQTTClient;

其他操作同单数据源

4.7.2 接收消息

接收消息与单数据源基本一致,唯一的区别是在​​@MQTTSubscriber​​​中指定​​clientName​​属性,指定当前从哪个数据源进行消费。

import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking")public class ParkingMessageHandler {}

4.8 分组共享订阅

系统默认使用​​spring.application.name​​​作为分组名称,用户可在消息消费类上指定​​@MQTTSubscriber​​​属性中​​groupName = "group_name"​​即可

import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name")public class ParkingMessageHandler {}

4.9 不分组共享订阅

只需要在消息消费类上指定​​@MQTTSubscriber​​​属性中​​share = ShareModelEnum.UN_GROUP_SHARE​​即可。

import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE)public class ParkingMessageHandler {}

4.10 排它订阅

只需要在消息消费类上指定​​@MQTTSubscriber​​​属性中​​exclusive = true​​即可,开启排它订阅时,默认关闭共享订阅。

import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true)public class ParkingMessageHandler {}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:python【模块】xml.etree.ElementTree 解析 xml
下一篇:W5100学习之引脚定义、引脚布局
相关文章

 发表评论

暂时没有评论,来抢沙发吧~