Emqx入门系列【3】客户端认证方式之-内置用户认证

avatar
作者
筋斗云
阅读量:0

一、EMQX 认证方式

EMQ X 是一个高度可扩展的 MQTT 消息服务器,它支持多种认证和授权机制来确保安全性。以下是 EMQ X 支持的一些常见认证方式:

  1. 内置用户认证

    • EMQ X 拥有一个内置的用户数据库,可以存储用户名和密码,进行基本的认证。
  2. HTTP 认证

    • 通过 HTTP 接口进行认证,可以集成外部的用户系统。
  3. 数据库认证

    • 支持 MySQL、PostgreSQL、SQLite 等多种数据库,通过数据库查询进行用户认证。
  4. LDAP 认证

    • 支持与 LDAP 服务器集成,进行目录服务认证。
  5. JWT(JSON Web Tokens):

    • 支持 JWT 作为认证机制,适用于分布式系统和微服务架构。
  6. OAuth 2.0

    • 支持 OAuth 2.0 授权框架,可以与各种 OAuth 服务提供商集成。
  7. API 密钥

    • 通过 API 密钥进行认证,适用于不需要用户身份验证的简单场景。
  8. MQTT 证书认证

    • 支持 SSL/TLS 证书认证,适用于需要加密传输的场合。
  9. 访问控制列表(ACL)

    • 使用 ACL 管理用户或客户端的权限,控制对特定主题的访问。
  10. 外部脚本认证

    • 允许使用外部脚本进行复杂的认证逻辑处理。
  11. Kerberos 认证

    • 对于需要 Kerberos 认证的企业环境,EMQ X 也提供了相应的支持。
  12. Nest 认证

    • 支持 Nest 协议进行设备认证,适用于物联网设备。

每种认证方式都有其适用场景和配置方法。例如,数据库认证适用于需要集中管理用户信息的大型系统,而 MQTT 证书认证适用于需要加密通信的金融或医疗行业。

要配置认证方式,通常需要在 EMQ X 的配置文件 emqx.conf 中进行相应的设置,或者通过 EMQ X Dashboard 进行配置

 

二、EMQX常用认证方式(EMQX 界面配置)

1.内置用户认证

1.1 创建内置用户认证客户端认证

af9349c3bd384fd0ac29c1a98b0e16d8.png

1.2 选择数据库类型

ad8d2c68f8ba4cc082f8ee31fe841f8e.png

1.3 选择账号类型 这里选择username 

ff41cd13c291427aaf8bca2d46949bd8.png

1.4 创建用户

fdf6969d69f741f8a1564890045ac5a9.png

d94187024dc24e7bb19d97671b441bf5.png

1.5 java实现emqt集成

1.5.1 pom.xml 中引入 mqtt相关jar

<!-- mqtt --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>

1.5.2  application.yml 配置mq连接信息

369f0f819076413dbbc0fb88b382d7ba.png

1.5.3  编写实现代码

MqttConfiguration.java
package com.chopin.mqtt.mqtt.config;  import com.chopin.mqtt.mqtt.client.MyMQTTClient; import com.chopin.mqtt.mqtt.utils.JwtUtil; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  /**  * @class MqttConfiguration  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:38  **/ @Configuration @Slf4j @Data public class MqttConfiguration {      @Value("${mqtt.host}")     String host;     @Value("${mqtt.username}")     String username;     @Value("${mqtt.password}")     String password;     @Value("${mqtt.clientId}")     String clientId;     @Value("${mqtt.timeout}")     int timeOut;     @Value("${mqtt.keepalive}")     int keepAlive;     @Value("${mqtt.topic1}")     public String topic1;     @Value("${mqtt.topic2}")     public String topic2;     @Value("${mqtt.topic3}")     public String topic3;     @Value("${mqtt.topic4}")     public String topic4;      @Bean//注入spring     public MyMQTTClient myMQTTClient() {         MyMQTTClient myMQTTClient = new MyMQTTClient(host, username, password, clientId, timeOut, keepAlive);          try {             myMQTTClient.connect();             //不同的主题             //   myMQTTClient.subscribe(topic1, 1);             //   myMQTTClient.subscribe(topic2, 1);             //   myMQTTClient.subscribe(topic3, 1);             //   myMQTTClient.subscribe(topic4, 1);          } catch (MqttException e) {             log.error("MQTT connect exception:{} ", e.getMessage());         }          return myMQTTClient;     }  } 
MyMQTTClient.java
package com.chopin.mqtt.mqtt.client;  import com.chopin.mqtt.mqtt.callback.MyMQTTCallback; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  /**  * @class MyMQTTClient  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:41  **/ @Slf4j public class MyMQTTClient {     private static MqttClient client;     private String host;     private String username;     private String password;     private String clientId;     private int timeout;     private int keepalive;      public MyMQTTClient(String host, String username, String password, String clientId, int timeOut, int keepAlive) {         this.host = host;         this.username = username;         this.password = password;         this.clientId = clientId;         this.timeout = timeOut;         this.keepalive = keepAlive;     }      public static MqttClient getClient() {         return client;     }      public static void setClient(MqttClient client) {         MyMQTTClient.client = client;     }      /**      * 设置mqtt连接参数      *      * @param username      * @param password      * @param timeout      * @param keepalive      * @return      */     public MqttConnectOptions setMqttConnectOptions(String username, String password, int timeout, int keepalive) {         MqttConnectOptions options = new MqttConnectOptions();         options.setUserName(username);         options.setPassword(password.toCharArray());         options.setConnectionTimeout(timeout);         options.setKeepAliveInterval(keepalive);         options.setCleanSession(true);         options.setAutomaticReconnect(true);         return options;     }      /**      * 连接mqtt服务端,得到MqttClient连接对象      */     public void connect() throws MqttException {         if (client == null) {             client = new MqttClient(host, clientId, new MemoryPersistence());             client.setCallback(new MyMQTTCallback(MyMQTTClient.this));         }         MqttConnectOptions mqttConnectOptions = setMqttConnectOptions(username, password, timeout, keepalive);         if (!client.isConnected()) {             client.connect(mqttConnectOptions);         } else {             client.disconnect();             client.connect(mqttConnectOptions);         }         log.info("MQTT connect success");//未发生异常,则连接成功     }      /**      * 发布,默认qos为0,非持久化      *      * @param pushMessage      * @param topic      */     public void publish(String pushMessage, String topic) {         publish(pushMessage, topic, 0, false);     }      /**      * 发布消息      *      * @param pushMessage      * @param topic      * @param qos      * @param retained:留存      */     public void publish(String pushMessage, String topic, int qos, boolean retained) {         MqttMessage message = new MqttMessage();         message.setPayload(pushMessage.getBytes());         message.setQos(qos);         message.setRetained(retained);         MqttTopic mqttTopic = MyMQTTClient.getClient().getTopic(topic);         if (null == mqttTopic) {             log.error("topic is not exist");         }         MqttDeliveryToken token;//Delivery:配送         synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充             try {                 token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件                 token.waitForCompletion(1000L);             } catch (Exception e) {                 e.printStackTrace();             }         }     }      /**      * 订阅某个主题      *      * @param topic      * @param qos      */     public void subscribe(String topic, int qos) {         try {             MyMQTTClient.getClient().subscribe(topic, qos);         } catch (MqttException e) {             e.printStackTrace();         }     }      /**      * 取消订阅主题      *      * @param topic 主题名称      */     public void cleanTopic(String topic) {         if (client != null && client.isConnected()) {             try {                 client.unsubscribe(topic);             } catch (MqttException e) {                 e.printStackTrace();             }         } else {             System.out.println("取消订阅失败!");         }     } }  
MyMQTTCallback.java 
package com.chopin.mqtt.mqtt.callback;  import com.chopin.mqtt.mqtt.client.MyMQTTClient; import com.chopin.mqtt.mqtt.config.MqttConfiguration; import com.chopin.mqtt.mqtt.dto.MqttMsg; import com.chopin.mqtt.mqtt.utils.SpringUtils; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import com.alibaba.fastjson.JSON; import java.util.Map;  /**  * @class MyMQTTCallback  * @description  * @copyright Copyright: 2024-2030  * @creator chopin  * @create-time 2024/8/08 14:43  **/ @Slf4j public class MyMQTTCallback implements MqttCallbackExtended {     //手动注入     private MqttConfiguration mqttConfiguration = SpringUtils.getBean(MqttConfiguration.class);     private MyMQTTClient myMQTTClient;     public MyMQTTCallback(MyMQTTClient myMQTTClient) {         this.myMQTTClient = myMQTTClient;     }     /**      * 丢失连接,可在这里做重连      * 只会调用一次      *      * @param throwable      */     @Override     public void connectionLost(Throwable throwable) {         log.error("mqtt connectionLost 连接断开,5S之后尝试重连: {}", throwable.getMessage());         long reconnectTimes = 1;         while (true) {             try {                 if (MyMQTTClient.getClient().isConnected()) {                     //判断已经重新连接成功  需要重新订阅主题 可以在这个if里面订阅主题  或者 connectComplete(方法里面)  看你们自己选择                     log.warn("mqtt reconnect success end  重新连接  重新订阅成功");                     return;                 }                 reconnectTimes+=1;                 log.warn("mqtt reconnect times = {} try again...  mqtt重新连接时间 {}", reconnectTimes, reconnectTimes);                 MyMQTTClient.getClient().reconnect();             } catch (MqttException e) {                 log.error("mqtt断连异常", e);             }             try {                 Thread.sleep(5000);             } catch (InterruptedException e1) {                 e1.printStackTrace();             }         }     }     /**      * @param topic      * @param mqttMessage      * @throws Exception      * subscribe后得到的消息会执行到这里面      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {         log.info("接收消息主题 : {},接收消息内容 : {}", topic, new String(mqttMessage.getPayload()));         //发布消息主题         if (topic.contains("B/pick/warn/")){             MqttMsg mqttMsg =  JSON.parseObject(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8), MqttMsg.class);             //你自己的业务接口             log.info("接收消息主题 : {},接收消息内容 : {}", topic, JSON.toJSONString(mqttMsg));          }         if (topic.equals("embed/resp")){             Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));             //你自己的业务接口          }         //接收报警主题         if (topic.equals("embed/warn")){             Map maps = (Map) JSON.parse(new String(mqttMessage.getPayload(), CharsetUtil.UTF_8));             //你自己的业务接口         }     }          /**      *连接成功后的回调 可以在这个方法执行 订阅主题  生成Bean的 MqttConfiguration方法中订阅主题 出现bug      *重新连接后  主题也需要再次订阅  将重新订阅主题放在连接成功后的回调 比较合理      * @param reconnect      * @param serverURI      */     @Override     public  void  connectComplete(boolean reconnect,String serverURI){         log.info("MQTT 连接成功,连接方式:{}",reconnect?"重连":"直连");         //订阅主题         myMQTTClient.subscribe(mqttConfiguration.topic1, 1);         myMQTTClient.subscribe("B/pick/warn/#", 1);         myMQTTClient.subscribe(mqttConfiguration.topic3, 1);         myMQTTClient.subscribe(mqttConfiguration.topic4, 1);     }     /**      * 消息到达后      * subscribe后,执行的回调函数      *      * @param s      * @param mqttMessage      * @throws Exception      */     /**      * publish后,配送完成后回调的方法      *      * @param iMqttDeliveryToken      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());     } } 

运行测试:

f73dca31ea364f209cc9d69ae2aac8c0.png

 

emqx 面板

a7561bd1e75d49e196e9a8499a202526.png

ws 数据订阅

5408079f23a747b9986e0aeccf23c7bf.png

c48f661f70874bd99b6732784fc4a262.png

 

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!