物联网mqtt网关搭建背后的技术原理

avatar
作者
筋斗云
阅读量:0

前言

物联网是现在比较热门的软件领域,众多物联网厂商都有自己的物联网平台,而物联网平台其中一个核心的模块就是Mqtt网关。这篇文章的目的是手把手教大家写书写一个mqtt网关,后端存储支持Kafka/Pulsar,支持mqtt 连接、断链、发送消息、订阅消息。技术选型:

  • Netty java最流行的网络框架
  • netty-codec-mqtt netty的子项目,mqtt编解码插件
  • Pulsar/Kafka 流行的消息中间件作为后端存储

核心pom依赖如下

        <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-codec-mqtt</artifactId>         </dependency>         <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-common</artifactId>         </dependency>         <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-transport</artifactId>         </dependency>         <dependency>             <groupId>org.apache.pulsar</groupId>             <artifactId>pulsar-client-original</artifactId>             <version>${pulsar.version}</version>         </dependency>         <dependency>             <groupId>org.apache.kafka</groupId>             <artifactId>kafka-clients</artifactId>             <version>${kafka.version}</version>         </dependency>         <dependency>             <groupId>org.eclipse.paho</groupId>             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>             <version>${mqtt-client.version}</version>             <scope>test</scope>         </dependency> 

软件参数设计

软件参数可谓是非常常见,复杂的开源项目,参数甚至可以达到上百个、配置文件长达数千行。我们需要的配置有

MqttServer监听的端口

监听端口的配置即使是写demo也非常必要,常常用在单元测试中,由于单元测试跑完之后,即使网络服务器关闭,操作系统也不会立即释放端口,所以单元测试的时候指定随机端口非常关键,在java中,我们可以通过这样的工具类来获取一个空闲的端口。未配置的话,我们就使用mqtt的默认端口1883。

package io.github.protocol.mqtt.broker.util;  import java.io.IOException; import java.io.UncheckedIOException; import java.net.ServerSocket;  public class SocketUtil {      public static int getFreePort() {         try (ServerSocket serverSocket = new ServerSocket(0)) {             return serverSocket.getLocalPort();         } catch (IOException e) {             throw new UncheckedIOException(e);         }     }  } 

后端存储配置

我们的mqtt网关是没有可靠的存储能力的,依赖后端的消息中间件来做持久化处理。后端规划支持Pulsar、Kafka两种类型。定义枚举类如下

public enum ProcessorType {     KAFKA,     PULSAR, } 

对应的KafkaProcessorConfig、PulsarProcessorConfig比较简单,包含基础的连接地址即可,如果后续要做性能调优、安全,这块还是会有更多的配置项

@Setter @Getter public class KafkaProcessorConfig {      private String bootstrapServers = "localhost:9092";      public KafkaProcessorConfig() {     } } 
@Setter @Getter public class PulsarProcessorConfig {      private String httpUrl = "http://localhost:8080";      private String serviceUrl = "pulsar://localhost:6650";      public PulsarProcessorConfig() {     } } 

启动netty MqttServer

我们通过netty启动一个mqttServer,添加mqtt解码器

package io.github.protocol.mqtt.broker;  import io.github.protocol.mqtt.broker.processor.KafkaProcessor; import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig; import io.github.protocol.mqtt.broker.processor.MqttProcessor; import io.github.protocol.mqtt.broker.processor.PulsarProcessor; import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig; import io.github.protocol.mqtt.broker.util.SocketUtil; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j;  @Slf4j public class MqttServer {      private final MqttServerConfig mqttServerConfig;      public MqttServer() {         this(new MqttServerConfig());     }      public MqttServer(MqttServerConfig mqttServerConfig) {         this.mqttServerConfig = mqttServerConfig;         if (mqttServerConfig.getPort() == 0) {             mqttServerConfig.setPort(SocketUtil.getFreePort());         }     }      public void start() throws Exception {         EventLoopGroup bossGroup = new NioEventLoopGroup(1);         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             ServerBootstrap b = new ServerBootstrap();             b.group(bossGroup, workerGroup)                     .channel(NioServerSocketChannel.class)                     .option(ChannelOption.SO_BACKLOG, 100)                     .handler(new LoggingHandler(LogLevel.INFO))                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         public void initChannel(SocketChannel ch) throws Exception {                             ChannelPipeline p = ch.pipeline();                             // decoder                             p.addLast(new MqttDecoder());                             p.addLast(MqttEncoder.INSTANCE);                         }                     });              // Start the server.             ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync();              // Wait until the server socket is closed.             f.channel().closeFuture().sync();         } finally {             // Shut down all event loops to terminate all threads.             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     }      private MqttProcessor processor(MqttServerConfig config) {         return switch (config.getProcessorType()) {             case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig());             case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig());         };     }      public int getPort() {         return mqttServerConfig.getPort();     }  } 

MqttserverStarter.java

我们写一个简单的main函数用来启动mqttServer,方便调测

package io.github.protocol.mqtt.broker;  public class MqttServerStarter {      public static void main(String[] args) throws Exception {         new MqttServer().start();     }  } 

客户端使用eclipse mqtt client进行测试

package io.github.protocol.mqtt;  import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;  @Log4j2 public class MqttClientPublishExample {      public static void main(String[] args) throws Exception {         String topic = "MQTT Examples";         String content = "Message from MqttPublishExample";         int qos = 2;         String broker = "tcp://127.0.0.1:1883";         String clientId = "JavaSample";         MemoryPersistence persistence = new MemoryPersistence();          try {             MqttClient sampleClient = new MqttClient(broker, clientId, persistence);             MqttConnectOptions connOpts = new MqttConnectOptions();             connOpts.setCleanSession(true);             log.info("Connecting to broker: {}", broker);             sampleClient.connect(connOpts);             log.info("Connected");             log.info("Publishing message: {}", content);             MqttMessage message = new MqttMessage(content.getBytes());             message.setQos(qos);             sampleClient.publish(topic, message);             log.info("Message published");             sampleClient.disconnect();             log.info("Disconnected");             System.exit(0);         } catch (MqttException me) {             log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me);         }     }  } 

然后我们先运行MqttServer,再运行MqttClient,发现MqttClient卡住了

Connecting to broker: tcp://127.0.0.1:1883 

这是为什么呢,我们通过抓包发现仅仅只有客户端发送了Mqtt connect信息,服务端并没有响应

image.png

但是根据mqtt标准协议,发送Connect消息,必须要有ConnAck响应

image.png

所以我们需要在接收到Connect后,返回connAck消息。我们创建一个MqttHandler,让他继承ChannelInboundHandlerAdapter, 用来接力MqttDecoder解码完成后的消息,这里要重点继承其中的channelRead方法,以及channelInactive方法,用来释放断链时需要释放的资源

package com.github.shoothzj.mqtt;  import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j;  @Slf4j public class MqttHandler extends ChannelInboundHandlerAdapter {      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         super.channelRead(ctx, msg);     }  } 

然后把这个handler加入到netty的职责链中,放到解码器的后面

image.png

在mqtt handler中插入我们的代码

    @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         super.channelRead(ctx, msg);         if (msg instanceof MqttConnectMessage) {             handleConnect(ctx, (MqttConnectMessage) msg);         } else {             log.error("Unsupported type msg [{}]", msg);         }     }      private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {         log.info("connect msg is [{}]", connectMessage);     } 

打印出connectMessage如下

[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]] 

通常,mqtt connect message中会包含qos、用户名、密码等信息,由于我们启动客户端的时候也没有携带用户名和密码,这里获取到的都为null,我们先不校验这些消息,直接给客户端返回connack消息,代表连接成功

        final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();         ctx.channel().writeAndFlush(ackMessage); 

我们再运行起Server和Client,随后可以看到已经走过了Connect阶段,进入了publish message过程,接下来我们再实现更多的其他场景

image-20201218204302720

附上此阶段的MqttHandler代码

package com.github.shoothzj.mqtt;  import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import lombok.extern.slf4j.Slf4j;  import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;  @Slf4j public class MqttHandler extends ChannelInboundHandlerAdapter {      @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         super.channelRead(ctx, msg);         if (msg instanceof MqttConnectMessage) {             handleConnect(ctx, (MqttConnectMessage) msg);         } else {             log.error("Unsupported type msg [{}]", msg);         }     }      private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) {         log.info("connect msg is [{}]", connectMessage);         final MqttFixedHeader fixedHeader = connectMessage.fixedHeader();         final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();         final MqttConnectPayload connectPayload = connectMessage.payload();         final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();         ctx.channel().writeAndFlush(ackMessage);     }  } 

我们当前把所有的逻辑都放在MqttHandler里面,不方便后续的扩展。抽象出一个MqttProcessor接口来处理具体的请求,MqttHandler负责解析MqttMessage的类型并分发。MqttProcess接口设计如下

package io.github.protocol.mqtt.broker.processor;  import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;  public interface MqttProcessor {      void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception;      void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception;      void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception;      void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception;      void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;      void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;      void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;      void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception;      void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception;      void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception;      void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception;      void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;      void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;      void processDisconnect(ChannelHandlerContext ctx) throws Exception;      void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception;  } 

我们允许这些方法抛出异常,当遇到极难处理的故障时,把mqtt连接断掉(如后端存储故障),等待客户端的重连。

MqttHandler中来调用MqttProcessor,相关MqttHandler代码如下

        Preconditions.checkArgument(message instanceof MqttMessage);         MqttMessage msg = (MqttMessage) message;         try {             if (msg.decoderResult().isFailure()) {                 Throwable cause = msg.decoderResult().cause();                 if (cause instanceof MqttUnacceptableProtocolVersionException) {                     // Unsupported protocol version                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                             new MqttFixedHeader(MqttMessageType.CONNACK,                                     false, MqttQoS.AT_MOST_ONCE, false, 0),                             new MqttConnAckVariableHeader(                                     MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION,                                     false), null);                     ctx.writeAndFlush(connAckMessage);                     log.error("connection refused due to invalid protocol, client address [{}]",                             ctx.channel().remoteAddress());                     ctx.close();                     return;                 } else if (cause instanceof MqttIdentifierRejectedException) {                     // ineligible clientId                     MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                             new MqttFixedHeader(MqttMessageType.CONNACK,                                     false, MqttQoS.AT_MOST_ONCE, false, 0),                             new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,                                     false), null);                     ctx.writeAndFlush(connAckMessage);                     log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress());                     ctx.close();                     return;                 }                 throw new IllegalStateException(msg.decoderResult().cause().getMessage());             }             MqttMessageType messageType = msg.fixedHeader().messageType();             if (log.isDebugEnabled()) {                 log.debug("Processing MQTT Inbound handler message, type={}", messageType);             }             switch (messageType) {                 case CONNECT:                     Preconditions.checkArgument(msg instanceof MqttConnectMessage);                     processor.processConnect(ctx, (MqttConnectMessage) msg);                     break;                 case CONNACK:                     Preconditions.checkArgument(msg instanceof MqttConnAckMessage);                     processor.processConnAck(ctx, (MqttConnAckMessage) msg);                     break;                 case PUBLISH:                     Preconditions.checkArgument(msg instanceof MqttPublishMessage);                     processor.processPublish(ctx, (MqttPublishMessage) msg);                     break;                 case PUBACK:                     Preconditions.checkArgument(msg instanceof MqttPubAckMessage);                     processor.processPubAck(ctx, (MqttPubAckMessage) msg);                     break;                 case PUBREC:                     processor.processPubRec(ctx, msg);                     break;                 case PUBREL:                     processor.processPubRel(ctx, msg);                     break;                 case PUBCOMP:                     processor.processPubComp(ctx, msg);                     break;                 case SUBSCRIBE:                     Preconditions.checkArgument(msg instanceof MqttSubscribeMessage);                     processor.processSubscribe(ctx, (MqttSubscribeMessage) msg);                     break;                 case SUBACK:                     Preconditions.checkArgument(msg instanceof MqttSubAckMessage);                     processor.processSubAck(ctx, (MqttSubAckMessage) msg);                     break;                 case UNSUBSCRIBE:                     Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage);                     processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg);                     break;                 case UNSUBACK:                     Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage);                     processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg);                     break;                 case PINGREQ:                     processor.processPingReq(ctx, msg);                     break;                 case PINGRESP:                     processor.processPingResp(ctx, msg);                     break;                 case DISCONNECT:                     processor.processDisconnect(ctx);                     break;                 case AUTH:                     processor.processAuth(ctx, msg);                     break;                 default:                     throw new UnsupportedOperationException("Unknown MessageType: " + messageType);             }         } catch (Throwable ex) {             ReferenceCountUtil.safeRelease(msg);             log.error("Exception was caught while processing MQTT message, ", ex);             ctx.close();         } 

这里的代码,主要是针对MqttMessage的不同类型,调用MqttProcessor的不同方法,值得一提的有两点

  • 提前判断了一些解码异常,fast fail
  • 全局捕获异常,并进行断链处理

维护MqttSession

维护Mqtt会话的session,主要用来持续跟踪客户端会话信息,跟踪在系统中占用的资源等,考虑到无论是何种后端实现,都需要维护Mqtt的Session,我们构筑一个AbstractMqttProcessor来维护MqttSession

package io.github.protocol.mqtt.broker.processor;  import io.github.protocol.mqtt.broker.MqttSessionKey; import io.github.protocol.mqtt.broker.auth.MqttAuth; import io.github.protocol.mqtt.broker.util.ChannelUtils; import io.github.protocol.mqtt.broker.util.MqttMessageUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageFactory; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils;  import java.util.stream.IntStream;  @Slf4j public abstract class AbstractProcessor implements MqttProcessor {      protected final MqttAuth mqttAuth;      public AbstractProcessor(MqttAuth mqttAuth) {         this.mqttAuth = mqttAuth;     }      @Override     public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception {         String clientId = msg.payload().clientIdentifier();         String username = msg.payload().userName();         byte[] pwd = msg.payload().passwordInBytes();         if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) {             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                     new MqttFixedHeader(MqttMessageType.CONNACK,                             false, MqttQoS.AT_MOST_ONCE, false, 0),                     new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED,                             false), null);             ctx.writeAndFlush(connAckMessage);             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());             ctx.close();             return;         }         if (!mqttAuth.connAuth(clientId, username, pwd)) {             MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                     new MqttFixedHeader(MqttMessageType.CONNACK,                             false, MqttQoS.AT_MOST_ONCE, false, 0),                     new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD,                             false), null);             ctx.writeAndFlush(connAckMessage);             log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress());             ctx.close();             return;         }          MqttSessionKey mqttSessionKey = new MqttSessionKey();         mqttSessionKey.setUsername(username);         mqttSessionKey.setClientId(clientId);         ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey);         log.info("username {} clientId {} remote address {} connected",                 username, clientId, ctx.channel().remoteAddress());         onConnect(mqttSessionKey);         MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage(                 new MqttFixedHeader(MqttMessageType.CONNACK,                         false, MqttQoS.AT_MOST_ONCE, false, 0),                 new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),                 null);         ctx.writeAndFlush(mqttConnectMessage);     }      protected void onConnect(MqttSessionKey mqttSessionKey) {     }      @Override     public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("publish, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();             return;         }         if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) {             log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername());             return;         }         if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) {             log.error("does not support QoS2 protocol. clientId {}, username {} ",                     mqttSession.getClientId(), mqttSession.getUsername());             return;         }         onPublish(ctx, mqttSession, msg);     }      protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                              MqttPublishMessage msg) throws Exception {     }      @Override     public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("sub, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }         onSubscribe(ctx, mqttSession, msg.payload());         MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,                 false, MqttQoS.AT_MOST_ONCE, false, 0);         IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value());         MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray());         ctx.writeAndFlush(MqttMessageFactory.newMessage(                 fixedHeader,                 MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()),                 payload));     }      protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                                MqttSubscribePayload subscribePayload) throws Exception {     }      @Override     public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("unsub, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         ctx.writeAndFlush(MqttMessageUtil.pingResp());     }      @Override     public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     }      @Override     public void processDisconnect(ChannelHandlerContext ctx) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress());         }         onDisconnect(mqttSession);     }      protected void onDisconnect(MqttSessionKey mqttSessionKey) {     }      @Override     public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {         MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel());         if (mqttSession == null) {             log.error("auth, client address {} not authed", ctx.channel().remoteAddress());             ctx.close();         }     } } 

可以看到,这里的AbstractProcessor主要是维护了MqttSessionKey,校验MqttSessionKey,并拦截publish中不支持的Qos2、Failure。同时,也影响了mqtt心跳请求。同样的,我们允许在onPublishonSubscribe中抛出异常。

基于消息队列实现的mqtt网关的基础思想也比较简单,简而言之就是,有publish消息的时候向消息队列中生产消息。有订阅的时候就从消息队列中拉取消息。由此延伸出来,我们可能需要维护每个mqtt topic和producer、consumer的对应关系,因为像kafka、pulsar这些消息中间件的消费者都是区分topic的,片段通用代码如下:

    protected final ReentrantReadWriteLock.ReadLock rLock;      protected final ReentrantReadWriteLock.WriteLock wLock;      protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;      protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;      protected final Map<MqttTopicKey, P> producerMap;      protected final Map<MqttTopicKey, C> consumerMap;      public AbstractMqProcessor(MqttAuth mqttAuth) {         super(mqttAuth);         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();         rLock = lock.readLock();         wLock = lock.writeLock();         this.sessionProducerMap = new HashMap<>();         this.sessionConsumerMap = new HashMap<>();         this.producerMap = new HashMap<>();         this.consumerMap = new HashMap<>();     }      @Override     protected void onConnect(MqttSessionKey mqttSessionKey) {         wLock.lock();         try {             sessionProducerMap.put(mqttSessionKey, new ArrayList<>());             sessionConsumerMap.put(mqttSessionKey, new ArrayList<>());         } finally {             wLock.unlock();         }     }      @Override     protected void onDisconnect(MqttSessionKey mqttSessionKey) {         wLock.lock();         try {             // find producers             List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey);             if (produceTopicKeys != null) {                 for (MqttTopicKey mqttTopicKey : produceTopicKeys) {                     P producer = producerMap.get(mqttTopicKey);                     if (producer != null) {                         ClosableUtils.close(producer);                         producerMap.remove(mqttTopicKey);                     }                 }             }             sessionProducerMap.remove(mqttSessionKey);             List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey);             if (consumeTopicKeys != null) {                 for (MqttTopicKey mqttTopicKey : consumeTopicKeys) {                     C consumer = consumerMap.get(mqttTopicKey);                     if (consumer != null) {                         ClosableUtils.close(consumer);                         consumerMap.remove(mqttTopicKey);                     }                 }             }             sessionConsumerMap.remove(mqttSessionKey);         } finally {             wLock.unlock();         }     } } 

kafka processor实现

由于kafka producer不区分topic,我们可以在kafka processor中复用producer,在将来单个kafka producer的性能到达上限时,我们可以将kafka producer扩展为kafka producer列表进行轮询处理,消费者由于mqtt协议可能针对每个订阅topic有不同的行为,不合适复用同一个消费者实例。我们在构造函数中启动KafkaProducer

     private final KafkaProcessorConfig kafkaProcessorConfig;      private final KafkaProducer<String, ByteBuffer> producer;      public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) {         super(mqttAuth);         this.kafkaProcessorConfig = kafkaProcessorConfig;         this.producer = createProducer();     }      protected KafkaProducer<String, ByteBuffer> createProducer() {         Properties properties = new Properties();         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers());         properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class);         return new KafkaProducer<>(properties);     } 

处理MqttPublish消息,MqttPublish消息包含如下几个关键参数

MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel(); String topic = publishMessage.variableHeader().topicName(); ByteBuffer byteBuffer = publishMessage.payload().nioBuffer(); 

其中

  • qos代表这条消息的质量级别,0没有任何保障,1代表至少一次,2代表恰好一次。当前仅支持qos0、qos1
  • topicName就是topic的名称
  • ByteBuffer就是消息的内容

根据topic、qos发送消息,代码如下

        String topic = msg.variableHeader().topicName();         ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer());         switch (msg.fixedHeader().qosLevel()) {             case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> {                 if (exception != null) {                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception);                     return;                 }                 log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                         mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset());             });             case AT_LEAST_ONCE -> {                 try {                     RecordMetadata recordMetadata = producer.send(record).get();                     log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}",                             mqttSessionKey, recordMetadata.topic(),                             recordMetadata.partition(), recordMetadata.offset());                     ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId()));                 } catch (Exception e) {                     log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e);                 }             }             case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));         } 

处理订阅消息,我们暂时仅根据订阅的topic,创建topic进行消费即可,由于kafka原生客户端建议的消费代码模式如下

while (true) {   ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));   for (ConsumerRecord<String, byte[]> record : records) {     // do logic   } } 

我们需要切换到其他线程对consumer进行消息,书写一个KafkaConsumerListenerWrapper的wrapper,转换为listener异步消费模型

package io.github.protocol.mqtt.broker.processor;  import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;  import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException;  @Slf4j public class KafkaConsumerListenerWrapper implements AutoCloseable {      private final AdminClient adminClient;      private final KafkaConsumer<String, byte[]> consumer;      public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) {         Properties adminProperties = new Properties();         adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());         this.adminClient = KafkaAdminClient.create(adminProperties);         Properties properties = new Properties();         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());         properties.put(ConsumerConfig.GROUP_ID_CONFIG, username);         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);         this.consumer = new KafkaConsumer<>(properties);     }      public void start(String topic, KafkaMessageListener listener) throws Exception {         try {             TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic))                     .values().get(topic).get();             log.info("topic info is {}", topicDescription);         } catch (ExecutionException ee) {             if (ee.getCause() instanceof UnknownTopicOrPartitionException) {                 log.info("topic {} not exist, create it", topic);                 adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1)));             } else {                 log.error("find topic info {} error", topic, ee);             }         } catch (Exception e) {             throw new IllegalStateException("find topic info error", e);         }         consumer.subscribe(Collections.singletonList(topic));         log.info("consumer topic {} start", topic);         new Thread(() -> {             try {                 while (true) {                     ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1));                     for (ConsumerRecord<String, byte[]> record : records) {                         listener.messageReceived(record);                     }                 }             } catch (WakeupException we) {                 consumer.close();             } catch (Exception e) {                 log.error("consumer topic {} consume error", topic, e);                 consumer.close();             }         }).start();         Thread.sleep(5_000);     }      @Override     public void close() throws Exception {         log.info("wake up {} consumer", consumer);         consumer.wakeup();     } } 
    @Override     protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                                MqttSubscribePayload subscribePayload) throws Exception {         for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {             KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName());             subscribe(ctx, consumer, topicSubscription.topicName());         }     }      private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) {         MqttTopicKey mqttTopicKey = new MqttTopicKey();         mqttTopicKey.setTopic(topic);         mqttTopicKey.setMqttSessionKey(mqttSessionKey);          wLock.lock();         try {             KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey);             if (consumer == null) {                 consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername());                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                     if (mqttTopicKeys == null) {                         mqttTopicKeys = new ArrayList<>();                     }                     mqttTopicKeys.add(mqttTopicKey);                     return mqttTopicKeys;                 });                 consumerMap.put(mqttTopicKey, consumer);             }             return consumer;         } finally {             wLock.unlock();         }     }      protected void subscribe(ChannelHandlerContext ctx,                              KafkaConsumerListenerWrapper consumer, String topic) throws Exception {         BoundInt boundInt = new BoundInt(65535);         consumer.start(topic, record -> {             log.info("receive message from kafka, topic {}, partition {}, offset {}",                     record.topic(), record.partition(), record.offset());             MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(                     MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value());             ctx.writeAndFlush(mqttPublishMessage);         });     } 

在上述的代码中,有一个需要通篇注意的点:日志打印的时候,注意要将关键的信息携带,比如:topic、mqtt username、mqtt clientId等,在写demo的时候没有感觉,但是海量请求下需要定位问题的时候,就知道这些信息的关键之处了。

使用BountInt这个简单的工具类来生成从0~65535的packageId,满足协议的要求

pulsar processor实现

pulsar相比kafka来说,更适合作为mqtt协议的代理。原因有如下几点:

  • pulsar支持百万topic、topic实现更轻量
  • pulsar原生支持listener的消费模式,不需要每个消费者启动一个线程
  • pulsar支持share的消费模式,消费模式更灵活
  • pulsar消费者的subscribe可确保成功创建订阅,相比kafka的消费者没有这样的语义保障
    protected final ReentrantReadWriteLock.ReadLock rLock;      protected final ReentrantReadWriteLock.WriteLock wLock;      protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap;      protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap;      protected final Map<MqttTopicKey, Producer<byte[]>> producerMap;      protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap;      private final PulsarProcessorConfig pulsarProcessorConfig;      private final PulsarAdmin pulsarAdmin;      private final PulsarClient pulsarClient;      public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) {         super(mqttAuth);         ReentrantReadWriteLock lock = new ReentrantReadWriteLock();         rLock = lock.readLock();         wLock = lock.writeLock();         this.sessionProducerMap = new HashMap<>();         this.sessionConsumerMap = new HashMap<>();         this.producerMap = new HashMap<>();         this.consumerMap = new HashMap<>();         this.pulsarProcessorConfig = pulsarProcessorConfig;         try {             this.pulsarAdmin = PulsarAdmin.builder()                     .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl())                     .build();             this.pulsarClient = PulsarClient.builder()                     .serviceUrl(pulsarProcessorConfig.getServiceUrl())                     .build();         } catch (Exception e) {             throw new IllegalStateException("Failed to create pulsar client", e);         }     } 

处理publish消息

    @Override     protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                              MqttPublishMessage msg) throws Exception {         String topic = msg.variableHeader().topicName();         Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic);         int len = msg.payload().readableBytes();         byte[] messageBytes = new byte[len];         msg.payload().getBytes(msg.payload().readerIndex(), messageBytes);         switch (msg.fixedHeader().qosLevel()) {             case AT_MOST_ONCE -> producer.sendAsync(messageBytes).                     thenAccept(messageId -> log.info("clientId [{}],"                                     + " username [{}]. send message to pulsar success messageId: {}",                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId))                     .exceptionally((e) -> {                         log.error("clientId [{}], username [{}]. send message to pulsar fail: ",                                 mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e);                         return null;                     });             case AT_LEAST_ONCE -> {                 try {                     MessageId messageId = producer.send(messageBytes);                     MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK,                             false, MqttQoS.AT_MOST_ONCE, false, 0);                     MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader,                             MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null);                     log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}",                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId);                     ctx.writeAndFlush(pubAckMessage);                 } catch (PulsarClientException e) {                     log.error("clientId [{}], username [{}]. send pulsar error: {}",                             mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage());                 }             }             case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException(                     String.format("mqttSessionKey %s can not reach here", mqttSessionKey));         }     }      private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception {         MqttTopicKey mqttTopicKey = new MqttTopicKey();         mqttTopicKey.setTopic(topic);         mqttTopicKey.setMqttSessionKey(mqttSessionKey);          rLock.lock();         try {             Producer<byte[]> producer = producerMap.get(mqttTopicKey);             if (producer != null) {                 return producer;             }         } finally {             rLock.unlock();         }          wLock.lock();         try {             Producer<byte[]> producer = producerMap.get(mqttTopicKey);             if (producer == null) {                 producer = createProducer(topic);                 sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                     if (mqttTopicKeys == null) {                         mqttTopicKeys = new ArrayList<>();                     }                     mqttTopicKeys.add(mqttTopicKey);                     return mqttTopicKeys;                 });                 producerMap.put(mqttTopicKey, producer);             }             return producer;         } finally {             wLock.unlock();         }     }      protected Producer<byte[]> createProducer(String topic) throws Exception {         return pulsarClient.newProducer(Schema.BYTES).topic(topic).create();     } 

处理subscribe消息

    @Override     protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                                MqttSubscribePayload subscribePayload) throws Exception {         for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) {             subscribe(ctx, mqttSessionKey, topicSubscription.topicName());         }     }      protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey,                              String topic) throws Exception {         MqttTopicKey mqttTopicKey = new MqttTopicKey();         mqttTopicKey.setTopic(topic);         mqttTopicKey.setMqttSessionKey(mqttSessionKey);          wLock.lock();         try {             Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey);             if (consumer == null) {                 consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic);                 sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> {                     if (mqttTopicKeys == null) {                         mqttTopicKeys = new ArrayList<>();                     }                     mqttTopicKeys.add(mqttTopicKey);                     return mqttTopicKeys;                 });                 consumerMap.put(mqttTopicKey, consumer);             }         } finally {             wLock.unlock();         }     }      protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username,                                               String topic) throws Exception {         BoundInt boundInt = new BoundInt(65535);         try {             PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false);             log.info("topic {} partitioned stats {}", topic, partitionedStats);         } catch (PulsarAdminException.NotFoundException nfe) {             log.info("topic {} not found", topic);             pulsarAdmin.topics().createPartitionedTopic(topic, 1);         }         return pulsarClient.newConsumer(Schema.BYTES).topic(topic)                 .messageListener((consumer, msg) -> {                     log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId());                     MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage(                             MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData());                     ctx.writeAndFlush(mqttPublishMessage);                 })                 .subscriptionName(username).subscribe();     } 

测试用例

鲁邦的软件应该有相应的测试用例,这里简单写了两个基础的pubsub用例,实际的production ready的项目,测试用例会更加复杂,涵盖各种异常的场景。有句话说的很好 ”单元测试是对开发人员的即时激励“,我也很认同这句话

kafka

启动kafka测试broker

我们可以通过embedded-kafka-java这个项目来启动用做单元测试的kafka broker。通过如下的group引入依赖

        <dependency>             <groupId>io.github.embedded-middleware</groupId>             <artifactId>embedded-kafka-core</artifactId>             <version>0.0.2</version>             <scope>test</scope>         </dependency> 

我们就可以通过如下的代码启动基于kafka的mqtt broker

@Slf4j public class MqttKafkaTestUtil {      public static MqttServer setupMqttKafka() throws Exception {         EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer();         new Thread(() -> {             try {                 embeddedKafkaServer.start();             } catch (Exception e) {                 log.error("kafka broker started exception ", e);             }         }).start();         Thread.sleep(5_000);         MqttServerConfig mqttServerConfig = new MqttServerConfig();         mqttServerConfig.setPort(0);         mqttServerConfig.setProcessorType(ProcessorType.KAFKA);         KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig();         kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort()));         mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig);         MqttServer mqttServer = new MqttServer(mqttServerConfig);         new Thread(() -> {             try {                 mqttServer.start();             } catch (Exception e) {                 log.error("mqsar broker started exception ", e);             }         }).start();         Thread.sleep(5000L);         return mqttServer;     }  } 

kafka端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来

@Log4j2 public class MqttKafkaPubSubTest {      @Test     public void pubSubTest() throws Exception {         MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka();         String topic = UUID.randomUUID().toString();         String content = "test-msg";         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());         String clientId = UUID.randomUUID().toString();         MemoryPersistence persistence = new MemoryPersistence();         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);         MqttConnectOptions connOpts = new MqttConnectOptions();         connOpts.setUserName(UUID.randomUUID().toString());         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());         connOpts.setCleanSession(true);         log.info("Mqtt connecting to broker");         sampleClient.connect(connOpts);         CompletableFuture<String> future = new CompletableFuture<>();         log.info("Mqtt subscribing");         sampleClient.subscribe(topic, (s, mqttMessage) -> {             log.info("messageArrived");             future.complete(mqttMessage.toString());         });         log.info("Mqtt subscribed");         MqttMessage message = new MqttMessage(content.getBytes());         message.setQos(1);         log.info("Mqtt message publishing");         sampleClient.publish(topic, message);         log.info("Mqtt message published");         TimeUnit.SECONDS.sleep(3);         sampleClient.disconnect();         String msg = future.get(5, TimeUnit.SECONDS);         Assertions.assertEquals(content, msg);     }  } 

pulsar

我们可以通过embedded-pulsar-java这个项目来启动用做单元测试的pulsar broker。通过如下的group引入依赖

        <dependency>             <groupId>io.github.embedded-middleware</groupId>             <artifactId>embedded-pulsar-core</artifactId>             <version>0.0.2</version>             <scope>test</scope>         </dependency> 

我们就可以通过如下的代码启动基于pulsar的mqtt broker

@Slf4j public class MqttPulsarTestUtil {      public static MqttServer setupMqttPulsar() throws Exception {         EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer();         embeddedPulsarServer.start();         MqttServerConfig mqttServerConfig = new MqttServerConfig();         mqttServerConfig.setPort(0);         mqttServerConfig.setProcessorType(ProcessorType.PULSAR);         PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig();         pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort()));         pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort()));         mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig);         MqttServer mqttServer = new MqttServer(mqttServerConfig);         new Thread(() -> {             try {                 mqttServer.start();             } catch (Exception e) {                 log.error("mqsar broker started exception ", e);             }         }).start();         Thread.sleep(5000L);         return mqttServer;     } } 

pulsar端到端测试用例,比较简单,通过mqtt client publish一条消息,然后消费出来

@Log4j2 public class MqttPulsarPubSubTest {      @Test     public void pubSubTest() throws Exception {         MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar();         String topic = UUID.randomUUID().toString();         String content = "test-msg";         String broker = String.format("tcp://localhost:%d", mqttServer.getPort());         String clientId = UUID.randomUUID().toString();         MemoryPersistence persistence = new MemoryPersistence();         MqttClient sampleClient = new MqttClient(broker, clientId, persistence);         MqttConnectOptions connOpts = new MqttConnectOptions();         connOpts.setUserName(UUID.randomUUID().toString());         connOpts.setPassword(UUID.randomUUID().toString().toCharArray());         connOpts.setCleanSession(true);         log.info("Mqtt connecting to broker");         sampleClient.connect(connOpts);         CompletableFuture<String> future = new CompletableFuture<>();         log.info("Mqtt subscribing");         sampleClient.subscribe(topic, (s, mqttMessage) -> {             log.info("messageArrived");             future.complete(mqttMessage.toString());         });         log.info("Mqtt subscribed");         MqttMessage message = new MqttMessage(content.getBytes());         message.setQos(1);         log.info("Mqtt message publishing");         sampleClient.publish(topic, message);         log.info("Mqtt message published");         TimeUnit.SECONDS.sleep(3);         sampleClient.disconnect();         String msg = future.get(5, TimeUnit.SECONDS);         Assertions.assertEquals(content, msg);     } } 

性能优化

这里我们简单描述几个性能优化点,像一些调整线程数、buffer大小这类的参数调整就不在这里赘述了,这些需要具体的性能压测来决定参数的设置。

在linux上使用Epoll网络模型

public class EventLoopUtil {      /**      * @return an EventLoopGroup suitable for the current platform      */     public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {         if (Epoll.isAvailable()) {             return new EpollEventLoopGroup(nThreads, threadFactory);         } else {             return new NioEventLoopGroup(nThreads, threadFactory);         }     }      public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {         if (eventLoopGroup instanceof EpollEventLoopGroup) {             return EpollServerSocketChannel.class;         } else {             return NioServerSocketChannel.class;         }     }  } 

通过Epollo.isAvailable,以及在指定channel类型的时候通过判断group的类型选择对应的channel类型

        EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1,                 new DefaultThreadFactory("mqtt-acceptor"));         EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1,                 new DefaultThreadFactory("mqtt-worker")); 
                b.group(acceptorGroup, workerGroup)                     // key point                     .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup))                     .option(ChannelOption.SO_BACKLOG, 100)                     .handler(new LoggingHandler(LogLevel.INFO))                     .childHandler(new ChannelInitializer<SocketChannel>() {                         @Override                         public void initChannel(SocketChannel ch) throws Exception {                             ChannelPipeline p = ch.pipeline();                             // decoder                             p.addLast(new MqttDecoder());                             p.addLast(MqttEncoder.INSTANCE);                             p.addLast(new MqttHandler(processor(mqttServerConfig)));                         }                     }); 

关闭tcp keepalive

由于mqtt协议本身就有心跳机制,所以可以关闭tcp的keepalive,依赖mqtt协议层的心跳即可,节约海量连接下的性能。配置ChannelOption.SO_KEEPALIVE为false即可

                    .option(ChannelOption.SO_KEEPALIVE, false) 

超时时间调短

默认情况下,无论是单元测试中mqtt,还是pulsar producer和kafka producer的生产超时时间,都相对较长(一般为30s),如果在内网环境部署,可以将超时时间调整到5s。来避免无意义的超时等待

使用多个KafkaProducer来优化性能

单个KafkaProducer会达到tcp链路带宽的瓶颈,当有海量请求,而延时在kafka生产比较突出的情况下,可以考虑启动多个KafkaProducer。并根据mqtt协议的特点(链路多,单个链路上qps不高),用mqttSessionKey的哈希值来决定使用那个KafkaProducer发送消息

在KafkaProcessorConfig中添加如下配置,生产者个数,默认为1

        private int producerNum = 1; 

在初始化的时候,初始化Producer数组,而不是单个Producer

        this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()];         for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) {             producerArray[i] = createProducer();         } 

封装一个方法来获取producer

    private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) {         return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())];     } 

结语

本文的代码均已上传到github。我们这里仅仅只实现了基础的mqtt 连接、发布、订阅功能,甚至不支持暂停、取消订阅。想要实现一个成熟商用的mqtt网关,我们还需要用户隔离、对协议的更多支持、可靠性、可运维、流控、安全等能力。如有商用生产级别的mqtt需求,又无法快速构筑成熟的mqtt网关的提供稳定可靠的mqtt服务,支持海量设备连接上云、设备和云端消息双向通信能力。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!