阅读量:0
准备工作 (下载EMQX服务端,相关客户端工具)
1. 服务端工具:
https://www.emqx.io/downloads?os=Windows
2. 客户端工具:
<!--web依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <!--mqtt相关依赖--> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
自定义yml配置
server: port: 8989 #mqtt properties mqtt: #uris 可以有多个 所以是个数组 uris: - tcp://127.0.0.1:1883 clientId: mqtt_test1 topics: - demo - test username: admin password: 123456 timeout: 30 keepalive: 60 qos: 1
增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)
package com.huawen.mqtt.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; /** * @author:xjl * @date:2022/5/5 17:27 * @Description: MQTT的配置类 **/ @Component @ConfigurationProperties(prefix = "mqtt") @Data public class MqttConfiguration { /** * uris 服务器地址配置 */ private String[] uris; /** * clientId */ private String clientId; /** * 话题 */ private String[] topics; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 连接超时时长 */ private Integer timeout; /** * keep Alive时间 */ private Integer keepalive; /** * 遗嘱消息 QoS */ private Integer qos; }
消费者配置
package com.huawen.mqtt.config; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import javax.annotation.Resource; /** * @author:xjl * @date:2022/5/6 9:06 * @Description: MQTT 消费端的配置 **/ @Configuration @Slf4j public class MqttInBoundConfiguration { @Resource private MqttConfiguration mqttProperties; //==================================== 消费消息==========================================// /** * 入站通道 * * @return 消息通道对象 {@link MessageChannel} */ @Bean("input") public MessageChannel mqttInputChannel() { //直连通道 return new DirectChannel(); } /** * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置 * * @return MQTT客户端工厂 {@link MqttPahoClientFactory} */ @Bean public MqttPahoClientFactory inClientFactory() { //设置连接属性 DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttProperties.getUris()); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepalive()); // 接受离线消息 告诉代理客户端是否要建立持久会话 false为建立持久会话 options.setCleanSession(false); //设置断开后重新连接 options.setAutomaticReconnect(true); factory.setConnectionOptions(options); return factory; } /** * 入站 * * @return 消息提供者 {@link MessageProducer} */ @Bean public MessageProducer producer() { // Paho客户端消息驱动通道适配器,主要用来订阅主题 对inboundTopics主题进行监听 //clientId 加后缀 不然会报retrying 不能重复 MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics()); adapter.setCompletionTimeout(5000); // Paho消息转换器 DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); // 按字节接收消息 // defaultPahoMessageConverter.setPayloadAsBytes(true); adapter.setConverter(defaultPahoMessageConverter); // 设置QoS adapter.setQos(mqttProperties.getQos()); adapter.setOutputChannel(mqttInputChannel()); return adapter; } /** * 通过通道获取数据 * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。 * tips: * 异步处理 * * @return 消息处理 {@link MessageHandler} */ @Bean @ServiceActivator(inputChannel = "input") public MessageHandler handler() { return message -> { log.info("收到的完整消息为--->{}", message); log.info("----------------------"); log.info("message:" + message.getPayload()); log.info("Id:" + message.getHeaders().getId()); log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS)); String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC); log.info("topic:" + topic); log.info("----------------------"); }; } }
生产者配置
package com.huawen.mqtt.controller; import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author:xjl * @date:2022/5/6 9:17 * @Description: mqtt发布消息controller **/ @RestController public class MqttPublishController { @Resource private MqttGateway mqttGateWay; @PostMapping("/send") public String send(@RequestBody MyMessage myMessage) { // 发送消息到指定主题 mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent()); return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent(); } }
创建一个通用接口 用于发送数据
package com.huawen.mqtt.inter; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; /** * @author:xjl * @date:2022/5/6 9:20 * @Description: 接口MqttGateway **/ @MessagingGateway(defaultRequestChannel = "out") public interface MqttGateway { /** * 定义重载方法,用于消息发送 * * @param payload 负载 */ void sendToMqtt(String payload); /** * 指定topic进行消息发送 * * @param topic topic话题 * @param payload 负载 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 指定topic和qos进行消息发送 * * @param topic topic话题 * @param qos qos * @param payload 负载 (字符串类型) */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); /** * 指定topic和qos进行消息发送 * * @param topic topic话题 * @param qos qos * @param payload 负载 (字节数组类型) */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }
生产者测试controller
package com.huawen.mqtt.controller; import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; /** * @author:xjl * @date:2022/5/6 9:17 * @Description: mqtt发布消息controller **/ @RestController public class MqttPublishController { @Resource private MqttGateway mqttGateWay; @PostMapping("/send") public String send(@RequestBody MyMessage myMessage) { // 发送消息到指定主题 mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent()); return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent(); } }
该文章参考 https://blog.csdn.net/m0_46689235/article/details/124606005