用netty轻松实现一个高效稳定的TCP服务器

avatar
作者
猴君
阅读量:2

          随着物联网的发展,很多项目都开始涉及到了tcp连接这块,在这里我们轻松用netty去实现,站在巨人的肩膀上。

关于netty包引用:

 <!-- TCP SERVER -->         <dependency>             <groupId>io.netty</groupId>             <artifactId>netty-all</artifactId>             <version>4.1.42.Final</version>             <scope>compile</scope>         </dependency>

实现TCP服务器代码

依赖netty只需几行代码tcp服务:

 import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory;  import java.nio.ByteOrder;  public class TcpServer {      private Logger log = LoggerFactory.getLogger(getClass());     //自定义tcp服务端口号     private int port=9000;      static TcpServer tcpServer;     //单例设计模式     private TcpServer(){      }      public static TcpServer getInstance(){         if(tcpServer==null){             tcpServer=new TcpServer();         }         return tcpServer;     };   public void run() throws InterruptedException {       // 创建主线程组(接受连接)     EventLoopGroup bossGroup = new NioEventLoopGroup();     // 创建工作线程组(处理连接)     EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 指定工作线程数量为20      // 创建ServerBootstrap实例,用于配置服务器     ServerBootstrap bootstrap = new ServerBootstrap();     // 配置主、工作线程组     bootstrap.group(bossGroup, workerGroup);     // 指定使用NIO进行网络传输     bootstrap.channel(NioServerSocketChannel.class);     // 设置子Channel的Socket选项,允许地址重用     bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);      // 配置子Channel的处理器,这里使用ChannelInitializer     bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {         @Override         public void initChannel(SocketChannel ch) throws Exception {             // 添加自定义的解码器,这里是处理协议             ch.pipeline().addLast(new YKCDecoderV1());             // 添加自定义的服务器处理器             ch.pipeline().addLast(new TCPServerHandler());         }     });      // 绑定端口并添加监听器,处理绑定操作的结果     bootstrap.bind(port).addListener((ChannelFutureListener) future -> {         // 在绑定成功后输出日志信息         log.info("bind success in port: " + port);     });      // 输出服务器启动成功信息     System.out.println("server started!"); }    }

 业务处理代码(参考)

以下是处理报文业务类可参考,注意代码未优化:

import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder;  import java.net.InetSocketAddress; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern;  /**  * 正则解析版  */ public class YKCDecoderV1 extends ByteToMessageDecoder {      final static String reg = "^68.{14,332}";//单指令解析 根据业务协议报文定最短和最长     final static Pattern pattern1 = Pattern.compile(reg);      @Override     protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf bufferIn, List<Object> list) throws Exception {          // 获取可读字节数         int leng = bufferIn.readableBytes();          // 如果可读字节数小于8,输出错误信息并跳过这部分数据         if (leng < 8) {             System.out.println("err! cmd len < 8 .");             String s = ByteBufUtil.hexDump(bufferIn);             System.out.println(s);             bufferIn.skipBytes(leng);             return;         } else {                          String s = ByteBufUtil.hexDump(bufferIn);             Matcher matcher1 = pattern1.matcher(s);             if (matcher1.find()) {                  String cmd = matcher1.group();                 //单指令                 System.out.println("sign cmd: " + cmd);                 String lenStr = cmd.substring(2, 4);                 int len = (Integer.parseInt(lenStr, 16) + 4) * 2;                  int cmdLen = cmd.length();                 if (cmdLen == len) {                     JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(cmd);                     list.add(jfyChargeProtocol);                     bufferIn.skipBytes(leng);                  } else if (cmdLen > len) {                      multiHand(cmd, list);                     bufferIn.skipBytes(leng);                  }               } else {                 logErr(channelHandlerContext, s);                 System.out.println("err! cmd format invalid: " + s);                 bufferIn.skipBytes(leng);              }          }       }      private void multiHand(String cmd, List<Object> list) {          if (cmd.length() < 8) {             return;         }         String lenStr = cmd.substring(2, 4);         int len = (Integer.parseInt(lenStr, 16) + 4) * 2;          if (len > cmd.length()) {             return;         }          String newCmd = cmd.substring(0, len);          if (newCmd.length() == len) {             System.out.println("multi cmd-> " + newCmd);              JFYChargeProtocol jfyChargeProtocol = new JFYChargeProtocol(newCmd);             list.add(jfyChargeProtocol);         }          if (cmd.length() > len) {             System.out.println("multi xxx-> " + cmd);             String two = cmd.substring(len);             if(two.startsWith("68")){                 multiHand(two, list);             }         }       }      private int checkSignCmd(String cmd) {         int cmd_len = getCmdLen(cmd);         return cmd.length() - cmd_len;     }      private int getCmdLen(String cmd) {         String leng = cmd.substring(28, 30) + cmd.substring(26, 28);         int dec_num = Integer.parseInt(leng, 16);         return (dec_num * 2) + 34;     }      private void logErr(ChannelHandlerContext ctx, String msg) {         InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();         String clientIP = insocket.getAddress().getHostAddress();         System.out.println(clientIP + " :: " + msg);     }
public class JFYChargeProtocol {       private int length;     private byte[] raw;     private String rawStr;      public JFYChargeProtocol(int length,byte[] raw){         this.length=length;         this.raw=raw;     }     public JFYChargeProtocol(String raw){         this.rawStr=raw;     }           public int getLength() {         return length;     }      public void setLength(int length) {         this.length = length;     }      public byte[] getRaw() {         return raw;     }      public void setRaw(byte[] raw) {         this.raw = raw;     }      public String getRawStr() {         return rawStr;     }      public void setRawStr(String rawStr) {         this.rawStr = rawStr;     } }

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.stereotype.Service; import java.util.Map; import java.util.concurrent.ConcurrentHashMap;  @Service public class TCPServerHandler extends ChannelInboundHandlerAdapter {     private static final Logger logger = LogManager.getLogger(TCPServerHandler.class);      static Map<String,ChannelHandlerContext> inList=new ConcurrentHashMap<String,ChannelHandlerContext>();       /**      * 新连接      * @param ctx      */     @Override     public void channelActive(ChannelHandlerContext ctx) {         String channelName=getChannelName(ctx);         inList.put(channelName,ctx);         logger.info("dev new conn > " +channelName);     }      private String getChannelName(ChannelHandlerContext ctx) {         return "ykc".concat(ctx.channel().remoteAddress().toString());      }      /**      * 连接下线      * @param ctx      * @throws Exception      */     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {         String channelName=getChannelName(ctx);         logger.info("dev close conn > " + channelName);         inList.remove(channelName);         ctx.fireChannelInactive();     }         @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {         JFYChargeProtocol in = (JFYChargeProtocol) msg;          String readMsg= in.getRawStr();         logger.info("read dev <= " + readMsg);         String channelName=getChannelName(ctx);         readMsg=channelName+"$$"+readMsg;         PackageHandlerImpl.getInstance().doHandle(readMsg);         //ctx.writeAndFlush(in);     }       @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {          cause.printStackTrace();          ctx.close();     }      /**      * 回复信息给设备      * @param hex      */     public static boolean RepDev(String hex){          String[] kv= hex.split("\\$\\$");         if(kv.length==2){             String key=kv[0];             ChannelHandlerContext context=inList.get(key);             if(context!=null){                 byte[] bytes= ByteUtil.hexString2Bytes(kv[1]);                 ByteBuf byteBuf= Unpooled.copiedBuffer(bytes);                 context.writeAndFlush(byteBuf);                 return true;             }else{                 logger.error("dev offline="+key);             }          }else{             logger.error("cmd format err");          }       return false;     }  }
import java.util.ArrayList; import java.util.List; public  class PackageHandlerImpl implements PackageHandler {      public static List<PackageHandler> packageHandlers= new ArrayList<PackageHandler>();     static PackageHandlerImpl packageHandler;     protected PackageHandlerImpl(){         super();         System.out.println("init PackageHandlerImpl");     }      public static PackageHandlerImpl getInstance(){         if(packageHandler==null){             packageHandler=new PackageHandlerImpl();         }         return packageHandler;     }      @Override     public void doHandle(String hex) {         for(PackageHandler f : packageHandlers){             f.doHandle(hex);         }      }     public PackageHandlerImpl addHandle(PackageHandler f){         packageHandlers.add(f);         return this;     }   }
/**  * 包处理  */ public interface PackageHandler {      void doHandle(String hex) ;  }
import org.apache.activemq.command.ActiveMQQueue; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service;  import javax.jms.Destination;  @Service public class TranServiceImpl {      private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);      /**      * 接受服务器 返回数据      */     private final String out_name="ykc_out";       /**      * 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装      */     @Autowired     private JmsMessagingTemplate jmsTemplate;      /**      * 发送消息 采用系统配置类型      *      * @param queueName 是发送到的队列名称      * @param message   是发送到的队列      */     public void sendMessage(String queueName, final String message) {         jmsTemplate.convertAndSend(queueName, message);     }      /**      * 发送消息 采用指定队列类型      *      * @param queueName 是发送到的队列      * @param message   是发送到的队列      */     public void sendMessageByQueue(String queueName, final String message) {         Destination destination = new ActiveMQQueue(queueName);         jmsTemplate.convertAndSend(destination, message);     }      @JmsListener(destination = out_name)     public void receiveQueue(String text) {         System.out.println("to dev => "+text);          if(!TCPServerHandler.RepDev(text)){             logger.error("write mq fail ==> "+text);         }     } }

 运行

当前是集成到 springboot2框架在这里即可运行,或实现实现 org.springframework.boot.ApplicationRunner 或 org.springframework.boot.CommandLineRunner 的接口,即启动后执行的任务,不用框架的在main方法也可以直接运行。

/**  * tcp服务在框架启动后 跟着启动即可  */ @SpringBootApplication public class DevServiceApplication {     private static final Logger logger = LogManager.getLogger(DevServiceApplication.class);       public static void main(String[] args) {           TcpServer tcpServer = TcpServer.getInstance();         try {             tcpServer.run();             PackageHandlerImpl packageHandler = PackageHandlerImpl.getInstance();             packageHandler.addHandle(new PackageHandlerByMQ());          } catch (InterruptedException e) {             e.printStackTrace();             logger.error("TCP服务错误", e);             throw new RuntimeException();         }       } 

总结

看看服务器上的tcp服务运行情况

运行天数:

流量状态图:

            站在netty巨人的肩膀上,这个tcp服务实现方式简单,运行更是稳定。服务器运行时就部署了一直到今天共运行1235天了,900多个设备同步在线,配了2g的jvm运行内存,cpu占用5.6(top截图等了很久才出来5.6是个峰值,平时不到1)确保某个市的充电桩设备。中间由于客户的充电桩设备协议问题更新过几次,刚时开始是使用netty官网的解码LengthFieldBasedFrameDecoder做处理,可以说非常高效,但随后发现有几个产商的设备报文头部有特殊字符,而且刚好和协议头有些重叠,再考虑到示来的产商协议的不确定性,为了兼容这些产家不得以并以正则的方法去处理。

扩展部分

Netty 官方提供的编解码器
  1. 字符串编解码器:

    • StringEncoder:将字符串编码为字节。
    • StringDecoder:将字节解码为字符串。
  2. 字节流编解码器:

    • ByteArrayEncoder:将字节数组编码为字节。
    • ByteArrayDecoder:将字节解码为字节数组。
  3. 对象序列化编解码器:

    • ObjectEncoder:将对象序列化为字节。
    • ObjectDecoder:将字节反序列化为对象。
  4. 长度字段编解码器:

    • LengthFieldPrepender:在消息头部添加表示消息长度的字段。
    • LengthFieldBasedFrameDecoder:根据长度字段解码消息,用于处理拆包和粘包问题。
  5. 行分隔符编解码器:

    • LineBasedFrameDecoder:按行切分消息,通常用于处理文本协议。
  6. DelimiterBasedFrameDecoder:

    • DelimiterBasedFrameDecoder:按照指定的分隔符切分消息,用于处理自定义分隔符的协议。
  7. Protobuf 编解码器:

    • ProtobufEncoder:将 Protobuf 对象编码为字节。
    • ProtobufDecoder:将字节解码为 Protobuf 对象。
  8. HTTP 编解码器:

    • HttpRequestEncoder:将 HTTP 请求编码为字节。
    • HttpResponseDecoder:将字节解码为 HTTP 响应。
    • HttpRequestDecoder:将字节解码为 HTTP 请求。
    • HttpResponseEncoder:将 HTTP 响应编码为字节。
  9. WebSocket 编解码器:

    • WebSocketServerProtocolHandler:处理 WebSocket 握手以及帧的编解码。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!