linux怎么查看本机内存大小
280
2022-11-28
物联网消息队列客户端-MQTT-基本功能实现
1. 主要实现功能
快速开始
4.1 引入依赖
4.2 配置
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456
更多配置如下
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 客户端标识,需保持全局唯一 client-id: parking_server # 是否清除session clean-session: false # 连接超时时间,单位秒 connection-timeout: 10 # 心跳间隔时间,单位秒 keep-alive-interval: 10 # 全局消息质量 global-qos: 1 # 重新连接之间等待的最长时间 maxReconnect-delay: 128000 # 是否自动重新连接 automatic-reconnect: true # 最大消息并发数量,超过此数量并发时可能会丢消息 maxInflight: 1000
4.3 开启自动配置
在启动类上增加@EnableRabbitMqAutoConfiguration注解
import com.demo.mqttclient.anno.EnableEmqAutoConfiguration;@SpringBootApplication@EnableEmqAutoConfigurationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); }}
4.4 发布消息
在生产者的业务程序中,注入MQTTClient
import com.demo.mqttclient.MQTTClient;@Resourceprivate MQTTClient defaultMQTTClient;
为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。
4.4.1 第三方消息发送
public String publishHeartbeatReply() { HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage(); heartbeatReplyMessage.setCmd(32896); heartbeatReplyMessage.setExpire(1605252875L); heartbeatReplyMessage.setDevid("095437323930030130523933"); heartbeatReplyMessage.setServer_time("1605252875"); defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage); return "success";}
4.4.2 内部消息发送
消息实体实现Message接口
package com.example.test.message;import com.demo.mqttclient.MQTTMessage;import lombok.Data;import java.util.UUID;@Datapublic class demoMessage implements MQTTMessage { private String msgId = UUID.randomUUID().toString(); private String name; private String gender; @Override public String getMsgId() { return this.msgId; } }
然后直接调用该类的publish方法发送即可
@GetMapping("demo/publish")public String demoPublish() { demoMessage demoMessage = new demoMessage(); demoMessage.setName("点都"); demoMessage.setGender("xx"); defaultMQTTClient.publish("demo/topic", demoMessage); return "success";}
其中存在多个重载的方法。
package com.demo.mqttclient;import com.demo.mqttclient.enums.ShareModelEnum;import com.demo.plugin.core.lang.json.JSONUtil;import org.eclipse.paho.client.mqttv3.MqttMessage;/** * mqtt客户端 * * @author zhangliuyang * @date 2022/07/18 * @since 1.0.0 */public interface MQTTClient { /** * 启动客户端 */ void start(); /** * 关闭客户端 */ void close(); /** * 发布 * * @param topic 主题 * @param message 消息 */ default
4.5 接收消息
消费者需要在消息处理类上添加@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})注解,指定要监听topics和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMQTTClient,使用qos执行订阅消息质量。当消息处理类中有多个public方法时,需要@MQTTConsumerMethod标记具体消费方法
package com.example.test.handler;import com.demo.mqttclient.MessageHandler;import com.demo.mqttclient.anno.MQTTConsumerMethod;import com.demo.mqttclient.anno.MQTTSubscriber;import com.demo.mqttclient.enums.ShareModelEnum;import com.demo.plugin.core.lang.json.JSONUtil;import com.example.test.message.DeviceStartMessage;import com.example.test.message.PlateRecognitionReportMessage;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttMessage;import java.util.Map;@Slf4j@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE)public class ParkingMessageHandler { public static final String TOPIC = "npt/park/type1/server/10010"; public static final String CMD_KEY = "cmd"; @MQTTConsumerMethod public void handle(Map
4.6 发送延迟消息
要发送延迟消息,需要先开启emq延迟发布配置。发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为秒,最大为4294967秒
//发送一个延时时长为10s的消息defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);
4.7 多数据源
spring: mqtt: emq: client: # 多数据源客户端名称,默认default default: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456 # 多数据源客户端名称 parking: # broker地址 host: 127.0.0.1 # 端口 port: 31883 # 用户名 username: admin # 密码 password: 123456
4.7.1 发布消息
首先注入MQTTClient,与单数据源的唯一区别就是bean的名称。默认向Spring容器中添加的实现类名称为“${数据源名称}MQTTClient”以上面的配置文件为例,默认的bean名称为 defaultMQTTClient 和 billMQTTClient
import com.demo.mqttclient.MQTTClient;@Resourceprivate MQTTClient defaultMQTTClient;@Resourceprivate MQTTClient billMQTTClient;
其他操作同单数据源
4.7.2 接收消息
接收消息与单数据源基本一致,唯一的区别是在@MQTTSubscriber中指定clientName属性,指定当前从哪个数据源进行消费。
import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking")public class ParkingMessageHandler {}
4.8 分组共享订阅
系统默认使用spring.application.name作为分组名称,用户可在消息消费类上指定@MQTTSubscriber属性中groupName = "group_name"即可
import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name")public class ParkingMessageHandler {}
4.9 不分组共享订阅
只需要在消息消费类上指定@MQTTSubscriber属性中share = ShareModelEnum.UN_GROUP_SHARE即可。
import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE)public class ParkingMessageHandler {}
4.10 排它订阅
只需要在消息消费类上指定@MQTTSubscriber属性中exclusive = true即可,开启排它订阅时,默认关闭共享订阅。
import com.demo.mqttclient.anno.MQTTSubscriber;@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true)public class ParkingMessageHandler {}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~