MQTT协议

avatar
作者
筋斗云
阅读量:0

EMQX

MQTT协议介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议。它特别适用于物联网(IoT)设备之间的通信,因其轻量、高效、低带宽需求等特性,广泛应用于需要高实时性和高可靠性的场景中。

EMQX介绍

EMQX(原名EMQ)是一个开源的、高性能、分布式的MQTT消息中间件。它作为一个MQTT Broker,负责接收客户端设备发布的消息,并将这些消息转发给订阅了相应主题的其他客户端。EMQX支持大规模的并发连接和消息处理,非常适合物联网场景中的大规模设备通信需求 。

MQTTX介绍

MQTTX:客户端工具,用于连接EMQX服务器,模拟收纳消息。

部署启动EMQX

emqx文件夹:使用bin里面的emqx.md:(windows)

启动:emqx start

进入页面:localhos:18083

账号:admin,密码:public

如果连不上在bin目录下重置密码;

emqx start emqx ctl admins passwd admin admin123 

这里改为admin123

加密选择md5(举例)

连上后测试:

数据库查询:(存在mqtt_user表)

update mqtt_user set password = md5(‘123456’)

在MQTTX中连接测试。

springboot整合mqtt

springboot中实现客户端,和MQTTX中创建的客户端功能一致

项目依赖:

<dependency>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-integration</artifactId> </dependency>  <dependency>     <groupId>org.springframework.integration</groupId>     <artifactId>spring-integration-mqtt</artifactId> </dependency> 

配置类:MQTTX连接属性+ip+端口号(1883)+ 超时+是否自动重连+是否清理会话

注意:http协议是18083,mqtt协议是1883

@Data @ConfigurationProperties(prefix = "qf.emqx.mqtt") public class MqttProperties {  private String host = "localhost"; //EMQX服务器主机地址  private int port = 1883; //EMQX服务器端口  private String username; //连接EMQX服务器的账号  private String password; //连接EMQX服务器的密码  private String clientId = "qf_java2402"; //连接EMQX服务器的客户端ID  private String clientName = "client_test"; /** * 连接保持的时间 */ private int keepAlive = 60; /** * 连接超时的时间 */ private int timeout = 30; /** * 是否自动重连 */ private boolean reconnect = false; /** * 是否清理会话 */ private boolean cleanSession = true; }  
@Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfig {      @Bean     public MqttMsgClient mqttAcceptClient(MqttProperties properties) throws MqttException {         MqttMsgClient client =  new MqttMsgClient(properties);         client.subscribe("/qf/java2402", 1);         return client;     } } 

application.yml文件:

qf:   emqx:     mqtt:       username: qf       password: java2402 

接收消息和发送消息的客户端:

/**  * @Description : MQTT接受服务的客户端  */  public class MqttMsgClient {       private final MqttClient client;      public MqttMsgClient(MqttProperties properties) throws MqttException {         //MQTT协议使用的是tcp地址         String address = "tcp://" +properties.getHost() +":" + properties.getPort();         MqttClient client = new MqttClient(address, properties.getClientId(), new MemoryPersistence());         //Mqtt连接选项配置         MqttConnectOptions options = new MqttConnectOptions();         //设置连接使用的账号         options.setUserName(properties.getUsername());         //设置连接使用的密码         options.setPassword(properties.getPassword().toCharArray());         //设置连接超时的时间         options.setConnectionTimeout(properties.getTimeout());         options.setKeepAliveInterval(properties.getKeepAlive());         options.setAutomaticReconnect(properties.isReconnect());         options.setCleanSession(properties.isCleanSession());         //设置客户端接收到信息时处理的回调         client.setCallback(new MsgProcessor());         this.client = client;         //客户端建立连接         client.connect(options);         MqttMessage msg = new MqttMessage("这是Java代码发出来的消息".getBytes(StandardCharsets.UTF_8));         msg.setQos(1);         msg.setRetained(true);         client.publish("/qf/java2403", msg);     }      /**      * 订阅某个主题      *      * @param topic 主题      * @param qos   连接方式      */     public void subscribe(String topic, int qos) {         try {             client.subscribe(topic, qos);         } catch (MqttException e) {             e.printStackTrace();         }     }     /**      * 取消订阅某个主题      *      * @param topic      */     public void unsubscribe(String topic) {         try {             client.unsubscribe(topic);         } catch (MqttException e) {             e.printStackTrace();         }     } } 

处理消息的接口:

public class MsgProcessor implements MqttCallback {      @Override     public void connectionLost(Throwable cause) {         System.out.println(cause.getMessage());     }      @Override     public void messageArrived(String topic, MqttMessage message) throws Exception {         System.out.println("当前主题:" +topic);         byte[] payload = message.getPayload();//获取MQTT消息的载荷部分,也就是消息体         System.out.println(new String(payload));     }      @Override     public void deliveryComplete(IMqttDeliveryToken token) {      } } 

原理:客户端订阅主题,向EMQX里这个主题里发送消息,只要其他的客户端订阅了这个主题就能获得消息。

Qos(Quality of Service):消息服务质量

0:最多分发一次,分发依赖于网络能力

1:至少分发一次,得到应答后不会再发,如果没有得到应答就不断发送直到得到应答为止

2:只分发一次,类似于隔离级别为序列化,对性能消耗很大

MQTT基础概念:

会话:一次连接

订阅:客户端可以订阅多个主题,减少客户端连接开销

主题名:主题的名称

载荷:消息体

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!