springboot集成MQTT实现消息接收

avatar
作者
猴君
阅读量:2

MQTT介绍

简单来说MQTT是一种协议,用来解决物联网之间的数据传输,它功耗更低,稳定性也不错,现在很多物联网的设备都在使用mqtt。感兴趣可以查看详情中文介绍

SpringBoot集成Mqtt

  1. 引入pom文件
<dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-core</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-integration</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-stream</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-mqtt</artifactId>         </dependency> 
  1. 编写mqtt的配置类
publish:   mqtt:     host: tcp://你自己的mqtt地址:端口     clientId: mqtt_id     userName: 你的账号名     password: 你的密码     # 这里表示会话不过期     cleanSession: false     # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取     defaultTopic: devops     timeout: 1000     keepAliveInterval: 10     #断线重连方式,自动重新连接与会话不过期配合使用会导致     #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我     automaticReconnect: true     connectionTimeout: 3000     topic: topic_0     # 最大链接数     maxInFlight: 100     topics: topic_1,topic_2,topic_3,topic_4 
package com.cshf.receive.common;  import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  import java.util.List;  /**  */ @Configuration @ConfigurationProperties(MQTTConfig.PREFIX) @Data public class MQTTConfig { 	 	//配置的名称     public static final String PREFIX = "publish.mqtt";      private String host;      private String clientId;      private String username;      private String password;      private boolean cleanSession;      private String defaultTopic;      private int timeout;      private int keepAliveInterval;      private boolean automaticReconnect;      private int connectionTimeout;      private int maxInFlight;      private String topic;      private List<String> topics;  } 
  1. 编写连接mqtt的配置类
package com.cshf.receive.mqtt;  import com.cshf.receive.common.MQTTConfig; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;  @Configuration @IntegrationComponentScan @AllArgsConstructor @Slf4j public class MqttSenderConfig {      private final MQTTConfig mqttConfig;      private MqttMessageHandler messageHandler;      @Bean     public MqttConnectOptions getMqttConnectOptions() {         MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();         mqttConnectOptions.setCleanSession(true);         mqttConnectOptions.setConnectionTimeout(10);         mqttConnectOptions.setKeepAliveInterval(90);         mqttConnectOptions.setAutomaticReconnect(true);         mqttConnectOptions.setUserName(mqttConfig.getUsername());         mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray());         mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getHost()});         mqttConnectOptions.setKeepAliveInterval(2);         return mqttConnectOptions;     }      @Bean     public MqttPahoClientFactory mqttClientFactory() {         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();         factory.setConnectionOptions(getMqttConnectOptions());         return factory;     }      @Bean     @ServiceActivator(inputChannel = "mqttOutboundChannel")     public MessageHandler mqttOutbound() {         MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory());         messageHandler.setAsync(true);         messageHandler.setDefaultTopic(mqttConfig.getTopic());         messageHandler.setDefaultQos(1);         messageHandler.setDefaultRetained(true);         return messageHandler;     }      @Bean     public MessageChannel mqttOutboundChannel() {         return new DirectChannel();     }      /**      * 接收通道      */     @Bean     public MessageChannel mqttInputChannel() {         return new DirectChannel();     }      /**      * 配置client,监听的topic      */     @Bean     public MessageProducer inbound() {         String[] topicsArr = mqttConfig.getTopics().toArray(new String[0]);         MqttPahoMessageDrivenChannelAdapter adapter =                 new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"_inbound", mqttClientFactory(), topicsArr);         adapter.setConverter(new DefaultPahoMessageConverter());         adapter.setQos(1);         adapter.setOutputChannel(mqttInputChannel());         return adapter;     }      /**      * 通过通道获取数据      */     @Bean     @ServiceActivator(inputChannel = "mqttInputChannel")     public MessageHandler handler() {         return messageHandler;     } } 
  1. 编写handler类
package com.cshf.receive.mqtt;  //import com.cvdmp.domain.exception.ConditionException; //import com.cvdmp.service.MqttDrugBoxDataService; import com.cshf.core.exception.ServiceException; import com.cshf.receive.service.NestDataService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component;  import java.util.ArrayList; import java.util.List;    @Component @Slf4j @AllArgsConstructor public class MqttMessageHandler implements MessageHandler { 	/** 	*接收到的消息和topic  	*/     @Override     public void handleMessage(Message<?> message) throws MessagingException {         String topic = message.getHeaders().get("mqtt_receivedTopic").toString();         log.info("消息主题:{},内容:{}", topic, message.getPayload());              } } 
  1. springboot发送消息给mqtt
package com.cshf.receive.mqtt;  //import com.cvdmp.domain.exception.ConditionException; //import com.cvdmp.service.MqttDrugBoxDataService; import com.cshf.core.exception.ServiceException; import com.cshf.receive.service.NestDataService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component;  import java.util.ArrayList; import java.util.List;   /**  * @Program iot-platform  * @ClassName MqttMessageHandler  * @Author 代志华  * @Date 2021/10/18 13:16  * @Description: mqtt处理类  */ @Component @Slf4j @AllArgsConstructor public class MqttMessageHandler implements MessageHandler {       private final NestDataService nestDataService;       @Override     public void handleMessage(Message<?> message) throws MessagingException {         String topic = message.getHeaders().get("mqtt_receivedTopic").toString();         log.debug("消息主题:{},内容:{}", topic, message.getPayload());         nestDataService.handleMqttData(topic,message.getPayload().toString());     } } 
package com.cvdmp.service;  import java.util.Date;   import com.cvdmp.mqtt.MqttGateway; import com.cvdmp.service.util.DateUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service;  import com.alibaba.fastjson.JSONObject;   @Service @Slf4j @AllArgsConstructor public class MqttCommandService{       private final MqttGateway mqttGateway;      public void sendCommondToMqtt(JSONObject jsonObject) {         String content = jsonObject.getString("content");         String topic = jsonObject.getString("topic");         //log.info("sendCommondToMqtt  -----------------------");         mqttGateway.sendToMqtt(content,topic);         log.info("发送mqtt指令成功,topic:{} ,content:{}",topic,content);     }      public static void main(String[] args) {         String date = "2024-06-26 17:46:00";         Date time = DateUtil.paseHour(date);         String recomander = time.getTime()/1000+"-"+(time.getTime()+5*60*1000)/1000+"-"+(time.getTime()+10*60*1000)/1000;         System.out.println(recomander);          Date ddd = new Date(1634629566000L);         System.out.println(DateUtil.dateToStr(ddd,"yyyy-MM-dd HH:mm:ss"));     } } 

总结

总结下来就这几步

  1. 导入依赖
  2. 配置mqtt连接参数(要订阅的topic)
  3. mqtt连接
  4. handle服务获取topic的信息

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!