EMQX
MQTT协议介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议。它特别适用于物联网(IoT)设备之间的通信,因其轻量、高效、低带宽需求等特性,广泛应用于需要高实时性和高可靠性的场景中。
EMQX介绍
EMQX(原名EMQ)是一个开源的、高性能、分布式的MQTT消息中间件。它作为一个MQTT Broker,负责接收客户端设备发布的消息,并将这些消息转发给订阅了相应主题的其他客户端。EMQX支持大规模的并发连接和消息处理,非常适合物联网场景中的大规模设备通信需求 。
MQTTX介绍
MQTTX:客户端工具,用于连接EMQX服务器,模拟收纳消息。
部署启动EMQX
emqx文件夹:使用bin里面的emqx.md:(windows)
启动:emqx start
进入页面:localhos:18083
账号:admin,密码:public
如果连不上在bin目录下重置密码;
emqx start emqx ctl admins passwd admin admin123
这里改为admin123
加密选择md5(举例)
连上后测试:
数据库查询:(存在mqtt_user表)
update mqtt_user set password = md5(‘123456’)
在MQTTX中连接测试。
springboot整合mqtt
springboot中实现客户端,和MQTTX中创建的客户端功能一致
项目依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
配置类:MQTTX连接属性+ip+端口号(1883)+ 超时+是否自动重连+是否清理会话
注意:http协议是18083,mqtt协议是1883
@Data @ConfigurationProperties(prefix = "qf.emqx.mqtt") public class MqttProperties { private String host = "localhost"; //EMQX服务器主机地址 private int port = 1883; //EMQX服务器端口 private String username; //连接EMQX服务器的账号 private String password; //连接EMQX服务器的密码 private String clientId = "qf_java2402"; //连接EMQX服务器的客户端ID private String clientName = "client_test"; /** * 连接保持的时间 */ private int keepAlive = 60; /** * 连接超时的时间 */ private int timeout = 30; /** * 是否自动重连 */ private boolean reconnect = false; /** * 是否清理会话 */ private boolean cleanSession = true; }
@Configuration @EnableConfigurationProperties(MqttProperties.class) public class MqttConfig { @Bean public MqttMsgClient mqttAcceptClient(MqttProperties properties) throws MqttException { MqttMsgClient client = new MqttMsgClient(properties); client.subscribe("/qf/java2402", 1); return client; } }
application.yml文件:
qf: emqx: mqtt: username: qf password: java2402
接收消息和发送消息的客户端:
/** * @Description : MQTT接受服务的客户端 */ public class MqttMsgClient { private final MqttClient client; public MqttMsgClient(MqttProperties properties) throws MqttException { //MQTT协议使用的是tcp地址 String address = "tcp://" +properties.getHost() +":" + properties.getPort(); MqttClient client = new MqttClient(address, properties.getClientId(), new MemoryPersistence()); //Mqtt连接选项配置 MqttConnectOptions options = new MqttConnectOptions(); //设置连接使用的账号 options.setUserName(properties.getUsername()); //设置连接使用的密码 options.setPassword(properties.getPassword().toCharArray()); //设置连接超时的时间 options.setConnectionTimeout(properties.getTimeout()); options.setKeepAliveInterval(properties.getKeepAlive()); options.setAutomaticReconnect(properties.isReconnect()); options.setCleanSession(properties.isCleanSession()); //设置客户端接收到信息时处理的回调 client.setCallback(new MsgProcessor()); this.client = client; //客户端建立连接 client.connect(options); MqttMessage msg = new MqttMessage("这是Java代码发出来的消息".getBytes(StandardCharsets.UTF_8)); msg.setQos(1); msg.setRetained(true); client.publish("/qf/java2403", msg); } /** * 订阅某个主题 * * @param topic 主题 * @param qos 连接方式 */ public void subscribe(String topic, int qos) { try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消订阅某个主题 * * @param topic */ public void unsubscribe(String topic) { try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
处理消息的接口:
public class MsgProcessor implements MqttCallback { @Override public void connectionLost(Throwable cause) { System.out.println(cause.getMessage()); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { System.out.println("当前主题:" +topic); byte[] payload = message.getPayload();//获取MQTT消息的载荷部分,也就是消息体 System.out.println(new String(payload)); } @Override public void deliveryComplete(IMqttDeliveryToken token) { } }
原理:客户端订阅主题,向EMQX里这个主题里发送消息,只要其他的客户端订阅了这个主题就能获得消息。
Qos(Quality of Service):消息服务质量
0:最多分发一次,分发依赖于网络能力
1:至少分发一次,得到应答后不会再发,如果没有得到应答就不断发送直到得到应答为止
2:只分发一次,类似于隔离级别为序列化,对性能消耗很大
MQTT基础概念:
会话:一次连接
订阅:客户端可以订阅多个主题,减少客户端连接开销
主题名:主题的名称
载荷:消息体