Springboot整合物联网IOT的MQTT协议

avatar
作者
猴君
阅读量:0

准备工作 (下载EMQX服务端,相关客户端工具)

1. 服务端工具:

https://www.emqx.io/downloads?os=Windows

2. 客户端工具:

https://mqttx.app/zh

 <!--web依赖-->         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-web</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-integration</artifactId>         </dependency>         <!--mqtt相关依赖-->         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-stream</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-mqtt</artifactId>         </dependency>         <dependency>             <groupId>org.projectlombok</groupId>             <artifactId>lombok</artifactId>         </dependency> 

自定义yml配置

server:   port: 8989 #mqtt properties mqtt:   #uris 可以有多个 所以是个数组   uris:     - tcp://127.0.0.1:1883   clientId: mqtt_test1   topics:     - demo     - test   username: admin   password: 123456   timeout: 30   keepalive: 60   qos: 1 

增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)

package com.huawen.mqtt.config;  import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;  /**  * @author:xjl  * @date:2022/5/5 17:27  * @Description: MQTT的配置类  **/ @Component @ConfigurationProperties(prefix = "mqtt") @Data public class MqttConfiguration {      /**      * uris 服务器地址配置      */     private String[] uris;      /**      * clientId      */     private String clientId;      /**      * 话题      */     private String[] topics;      /**      * 用户名      */     private String username;      /**      * 密码      */     private String password;      /**      * 连接超时时长      */     private Integer timeout;      /**      * keep Alive时间      */     private Integer keepalive;      /**      * 遗嘱消息 QoS      */     private Integer qos; }  

消费者配置

package com.huawen.mqtt.config;  import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:06  * @Description: MQTT 消费端的配置  **/ @Configuration @Slf4j public class MqttInBoundConfiguration {     @Resource     private MqttConfiguration mqttProperties;      //==================================== 消费消息==========================================//      /**      * 入站通道      *      * @return 消息通道对象 {@link MessageChannel}      */     @Bean("input")     public MessageChannel mqttInputChannel() {         //直连通道         return new DirectChannel();     }       /**      * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置      *      * @return MQTT客户端工厂 {@link MqttPahoClientFactory}      */     @Bean     public MqttPahoClientFactory inClientFactory() {         //设置连接属性         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();         MqttConnectOptions options = new MqttConnectOptions();         options.setServerURIs(mqttProperties.getUris());         options.setUserName(mqttProperties.getUsername());         options.setPassword(mqttProperties.getPassword().toCharArray());         options.setConnectionTimeout(mqttProperties.getTimeout());         options.setKeepAliveInterval(mqttProperties.getKeepalive());         // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话         options.setCleanSession(false);         //设置断开后重新连接         options.setAutomaticReconnect(true);         factory.setConnectionOptions(options);         return factory;     }       /**      * 入站      *      * @return 消息提供者 {@link MessageProducer}      */     @Bean     public MessageProducer producer() {         // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听         //clientId 加后缀 不然会报retrying 不能重复         MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());         adapter.setCompletionTimeout(5000);         // Paho消息转换器         DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();         // 按字节接收消息         // defaultPahoMessageConverter.setPayloadAsBytes(true);         adapter.setConverter(defaultPahoMessageConverter);         // 设置QoS         adapter.setQos(mqttProperties.getQos());         adapter.setOutputChannel(mqttInputChannel());         return adapter;     }      /**      * 通过通道获取数据      * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。      * tips:      * 异步处理      *      * @return 消息处理 {@link MessageHandler}      */     @Bean     @ServiceActivator(inputChannel = "input")     public MessageHandler handler() {         return message -> {             log.info("收到的完整消息为--->{}", message);             log.info("----------------------");             log.info("message:" + message.getPayload());             log.info("Id:" + message.getHeaders().getId());             log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));             String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);             log.info("topic:" + topic);             log.info("----------------------");         };     } }  

生产者配置

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

创建一个通用接口 用于发送数据

package com.huawen.mqtt.inter;  import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header;  /**  * @author:xjl  * @date:2022/5/6 9:20  * @Description: 接口MqttGateway  **/ @MessagingGateway(defaultRequestChannel = "out") public interface MqttGateway {     /**      * 定义重载方法,用于消息发送      *      * @param payload 负载      */     void sendToMqtt(String payload);      /**      * 指定topic进行消息发送      *      * @param topic   topic话题      * @param payload 负载      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字符串类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字节数组类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }  

生产者测试controller

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

该文章参考 https://blog.csdn.net/m0_46689235/article/details/124606005

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!