EMQX(MQTT协议)服务和测试工具安装及整合SringBoot后的使用

avatar
作者
猴君
阅读量:32

文章目录


一、WINDOWS下搭建MQTT服务EMQX

1、下载服务

官网地址:

https://www.emqx.io/downloads

进入官网后下载window压缩包
在这里插入图片描述

2、 启动服务

解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在这里插入图片描述
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:

pushd D:\Environment\emqx\bin 

启动服务,终端输入:

emqx.cmd start 

打开浏览器输入:

localhost:18083

出现如下界面便是EMQX服务已经启动。
在这里插入图片描述
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。

在这里插入图片描述
在这里插入图片描述

二、测试工具

1、安装

官网地址:

https://mqttx.app/zh/downloads

安装过于简单,自行下载安装。

2、汉化及使用

在这里插入图片描述

添加一个连接
在这里插入图片描述
刷新管理端页面
在这里插入图片描述

三、整合SpringBoot

1、导入Maven依赖

        <!-- mqtt依赖 start -->         <dependency>             <groupId>org.springframework.boot</groupId>             <artifactId>spring-boot-starter-integration</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-stream</artifactId>         </dependency>         <dependency>             <groupId>org.springframework.integration</groupId>             <artifactId>spring-integration-mqtt</artifactId>         </dependency>         <!-- mqtt依赖 end --> 

2、配置

2.1、配置文件

# MQTT配置 mqtt:   # emqx服务器地址   host: tcp://127.0.0.1:1883   # 用户名   username: ces   # 密码   password: ces1   # 客户端id   clientId: service_${random.uuid}   # 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值   connectionTimeout: 10   # 消息服务质量   qos: 2   # 连接保持检查周期 秒   keepAliveInterval: 20   # 开启自动重连   automaticReconnect: true   # 是否清除会话session   cleanSession: true   # 默认订阅主题   defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+   # 是否保留发布消息   retained: false 

2.2、常量

package net.rakan.distributedservice.common.constant;  import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  /**  * MQTT常量  * @author LiChangRui on 2024/4/21 16:53  */ @Data @Slf4j @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttConstants {     /**      * emqx服务器地址      */     private String host;     /**      * 用户名      */     private String username;     /**      * 密码      */     private String password;     /**      * 客户端id      */     private String clientId;     /**      * 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值      */     private int connectionTimeout;     /**      * 消息服务质量      */     private int qos;     /**      * 连接保持检查周期 秒      */     private int keepAliveInterval;     /**      * 开启自动重连      */     private Boolean automaticReconnect;     /**      * 是否清除会话session      */     private Boolean cleanSession;     /**      * 默认订阅主题      */     private String defaultSubscribeTopic;     /**      * 是否保留发布消息      */     private Boolean retained; } 

3、MQTT服务类

package net.rakan.distributedservice.common.service;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.Arrays;  /**  * MQTT服务  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Component public class MqttService {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT连接      */     private MqttClient mqttClient;       private void setMqttClient(MqttClient mqttClient) {         this.mqttClient = mqttClient;     }      /**      * 推送消息      * @author LiChangRui on 2024/4/21 18:02      */     public void publish(String topic, String msg) {         try {             mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained());         } catch (MqttException e) {             log.error("MQTT推送消息失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String topic) {         log.info("开始订阅主题:" + topic + "。");         try {             mqttClient.subscribe(topic, mqttConstants.getQos());         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String[] topic) {         int[] qos = new int[topic.length];         Arrays.fill(qos, mqttConstants.getQos());         log.info("开始订阅主题:" + String.join(",", topic) + "。");         try {             mqttClient.subscribe(topic, qos);         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:48      */     public void connect(String host, String clientId, MqttCallback mqttCallback, MqttConnectOptions options) {         MqttClient mqttClient;         try {             // 设置连接参数             mqttClient = new MqttClient(host, clientId);             // 设置回调             mqttClient.setCallback(mqttCallback);             // 连接             mqttClient.connect(options);         } catch (MqttException e) {             log.error("连接失败:" + e.getMessage() + "!");             return;         }         this.setMqttClient(mqttClient);         log.info("连接成功!");     } } 

4、MQTT服务回调类

package net.rakan.distributedservice.common.callback;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT回调  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Configuration public class MqttCallback implements MqttCallbackExtended {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;       /**      * 客户端断开后触发      * @author LiChangRui on 2024/4/21 18:19      */     @Override     public void connectionLost(Throwable throwable) {         log.info("客户端连接断开!");         // 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑     }      /**      * 客户端收到消息触发      * @author LiChangRui on 2024/4/21 18:21      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) {         log.info("接收消息主题 : " + topic);         log.info("接收消息Qos : " + mqttMessage.getQos());         log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));     }      /**      * 发布消息成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         String[] topics = iMqttDeliveryToken.getTopics();         for (String topic : topics) {             log.info("向主题:" + topic + "发送消息成功!");         } //        try { //            // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null //            MqttMessage message = iMqttDeliveryToken.getMessage(); //            byte[] payload = message.getPayload(); //            String s = new String(payload, StandardCharsets.UTF_8); //            log.info("消息的内容是:" + s + "。"); //        } catch (MqttException e) { //            log.error("获取发送消息内容失败!"); //        }     }      /**      * 客户端连接成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void connectComplete(boolean reconnect, String serverURI) {         log.info("客户端连接成功!");         String[] topic = mqttConstants.getDefaultSubscribeTopic().split(",");         mqttService.subscribe(topic);     } } 

5、MQTT配置类

package net.rakan.distributedservice.common.config;  import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT配置  * @author LiChangRui on 2024/4/21 22:26  */ @Slf4j @Configuration public class MqttConfig {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;     /**      * MQTT回调      */     @Autowired     private MqttCallback mqttCallback;      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:51      */     @PostConstruct     public void mqttClient() {         MqttConnectOptions options = new MqttConnectOptions();         // 用户名         options.setUserName(mqttConstants.getUsername());         // 密码         options.setPassword(mqttConstants.getPassword().toCharArray());         // 设置连接超时时间         options.setConnectionTimeout(mqttConstants.getConnectionTimeout());         // 开启自动重连         options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect());         // 是否清除会话session         options.setCleanSession(mqttConstants.getCleanSession());         // 设置心跳间隔时间         options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval());         mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options);     } } 

6、测试类

package net.rakan.distributedservice.testserver.controller;  import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import net.rakan.distributedservice.common.dto.CancelOrderDTO; import net.rakan.distributedservice.common.dto.PublishDTO; import net.rakan.distributedservice.common.dto.SendMailDTO; import net.rakan.distributedservice.common.service.MqttService; import net.rakan.distributedservice.common.service.RabbitMqService; import net.rakan.distributedservice.common.vo.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;  /**  * 测试  * @author LiChangRui on 2024/3/15 10:14  */ @Tag(name = "测试") @RestController @RequestMapping("/test") public class TestController {      @Autowired     private MqttService mqttService;      /**      * 推送消息      * @author LiChangRui on 2024/3/15 13:59      */     @Operation(summary = "推送消息")     @PostMapping("/publish")     public Result<?> publish(@RequestBody @Validated PublishDTO dto) {          mqttService.publish(dto.getTopic(), dto.getMsg());          return Result.ok();     }  }  

在这里插入图片描述
控制台消息:
在这里插入图片描述

总结

如果您发现错误,还望及时提醒,共同进步。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!