阅读量:2
MQTT介绍
简单来说MQTT是一种协议,用来解决物联网之间的数据传输,它功耗更低,稳定性也不错,现在很多物联网的设备都在使用mqtt。感兴趣可以查看详情中文介绍
SpringBoot集成Mqtt
- 引入pom文件
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
- 编写mqtt的配置类
publish: mqtt: host: tcp://你自己的mqtt地址:端口 clientId: mqtt_id userName: 你的账号名 password: 你的密码 # 这里表示会话不过期 cleanSession: false # 配置一个默认的主题,加载时不会用到,只能在需要时手动提取 defaultTopic: devops timeout: 1000 keepAliveInterval: 10 #断线重连方式,自动重新连接与会话不过期配合使用会导致 #断线重新连接后会接收到断线期间的消息。需要更改设置请看password联系我 automaticReconnect: true connectionTimeout: 3000 topic: topic_0 # 最大链接数 maxInFlight: 100 topics: topic_1,topic_2,topic_3,topic_4
package com.cshf.receive.common; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.List; /** */ @Configuration @ConfigurationProperties(MQTTConfig.PREFIX) @Data public class MQTTConfig { //配置的名称 public static final String PREFIX = "publish.mqtt"; private String host; private String clientId; private String username; private String password; private boolean cleanSession; private String defaultTopic; private int timeout; private int keepAliveInterval; private boolean automaticReconnect; private int connectionTimeout; private int maxInFlight; private String topic; private List<String> topics; }
- 编写连接mqtt的配置类
package com.cshf.receive.mqtt; import com.cshf.receive.common.MQTTConfig; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration @IntegrationComponentScan @AllArgsConstructor @Slf4j public class MqttSenderConfig { private final MQTTConfig mqttConfig; private MqttMessageHandler messageHandler; @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(mqttConfig.getUsername()); mqttConnectOptions.setPassword(mqttConfig.getPassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{mqttConfig.getHost()}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getClientId(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttConfig.getTopic()); messageHandler.setDefaultQos(1); messageHandler.setDefaultRetained(true); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } /** * 接收通道 */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic */ @Bean public MessageProducer inbound() { String[] topicsArr = mqttConfig.getTopics().toArray(new String[0]); MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId()+"_inbound", mqttClientFactory(), topicsArr); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 通过通道获取数据 */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return messageHandler; } }
- 编写handler类
package com.cshf.receive.mqtt; //import com.cvdmp.domain.exception.ConditionException; //import com.cvdmp.service.MqttDrugBoxDataService; import com.cshf.core.exception.ServiceException; import com.cshf.receive.service.NestDataService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @Component @Slf4j @AllArgsConstructor public class MqttMessageHandler implements MessageHandler { /** *接收到的消息和topic */ @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); log.info("消息主题:{},内容:{}", topic, message.getPayload()); } }
- springboot发送消息给mqtt
package com.cshf.receive.mqtt; //import com.cvdmp.domain.exception.ConditionException; //import com.cvdmp.service.MqttDrugBoxDataService; import com.cshf.core.exception.ServiceException; import com.cshf.receive.service.NestDataService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; /** * @Program iot-platform * @ClassName MqttMessageHandler * @Author 代志华 * @Date 2021/10/18 13:16 * @Description: mqtt处理类 */ @Component @Slf4j @AllArgsConstructor public class MqttMessageHandler implements MessageHandler { private final NestDataService nestDataService; @Override public void handleMessage(Message<?> message) throws MessagingException { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); log.debug("消息主题:{},内容:{}", topic, message.getPayload()); nestDataService.handleMqttData(topic,message.getPayload().toString()); } }
package com.cvdmp.service; import java.util.Date; import com.cvdmp.mqtt.MqttGateway; import com.cvdmp.service.util.DateUtil; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; @Service @Slf4j @AllArgsConstructor public class MqttCommandService{ private final MqttGateway mqttGateway; public void sendCommondToMqtt(JSONObject jsonObject) { String content = jsonObject.getString("content"); String topic = jsonObject.getString("topic"); //log.info("sendCommondToMqtt -----------------------"); mqttGateway.sendToMqtt(content,topic); log.info("发送mqtt指令成功,topic:{} ,content:{}",topic,content); } public static void main(String[] args) { String date = "2024-06-26 17:46:00"; Date time = DateUtil.paseHour(date); String recomander = time.getTime()/1000+"-"+(time.getTime()+5*60*1000)/1000+"-"+(time.getTime()+10*60*1000)/1000; System.out.println(recomander); Date ddd = new Date(1634629566000L); System.out.println(DateUtil.dateToStr(ddd,"yyyy-MM-dd HH:mm:ss")); } }
总结
总结下来就这几步
- 导入依赖
- 配置mqtt连接参数(要订阅的topic)
- mqtt连接
- handle服务获取topic的信息