Netty实现udp服务器

avatar
作者
猴君
阅读量:3

1、TCP与UDP通信协议

网络传输层协议有两种,一种是TCP,另外一种是UDP。

TCP是一种面向连接的协议,提供可靠的数据传输。TCP通过三次握手建立连接,并通过确认和重传机制,保证数据的完整性和可靠性。TCP适用于对数据准确性要求较高、对实时性要求较低的应用场景,如网页浏览、文件传输等。

UDP是一种无连接的协议,不保证数据的可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速度较快。UDP适用于对实时性要求较高、对数据准确性要求较低的应用场景,如视频传输、语音通信等。

那么,这两种有哪些区别呢?请看下面:

  • 连接方式:UDP是无连接的,TCP是面向连接的;
  • 可靠性:UDP不保证数据的可靠性传输,TCP保证数据的可靠传输;
  • 速度:UDP传输速度较快,TCP传输速度较慢;
  • 传输方式:UDP采用尽力交付的方式传输数据包,不进行确认和重传;TCP通过确认和重传机制保证数据的完整性和可靠性;
  • 开销:UDP的开销较低,TCP的开销较高。

总结:我们可以根据具体的应用场景和需求选择使用UDP或TCP进行数据传输。如果对数据的实时性要求较高,且对数据准确性要求较低,可以选择使用UDP。如果对数据的准确性要求较高,可以选择使用TCP。

2、游戏行业选择的通信协议

游戏由于对数据的准确性(不允许丢包,乱序)非常高,一般都是选择基于TCP的socket通信。但UDP的低时延,快速传输对实时性要求非常高的游戏类型也是非常大的吸引力。据说,魔兽世界以及Dota2使用UDP开发。当然,使用udp通信的游戏肯定在通信层做了适配,保证关键数据不丢包,不乱序。

近年来,基于UDP协议的KCP(Kuai Control Protocol,快速可靠传输协议),也得到了快速的发展。据说,原神是使用KCP通信的。

3、Netty使用UDP协议

netty使用udp协议,网上的例子都是非常简单的。都是两个类搞定。没有解决以下几个问题:

  • 如何与现有网络框架适配(支持私有协议栈,支持javabean,不单单是字符串)
  • 如何主动与客户端通信(不再是客户端发一个消息,服务器直接推送一个回包)

本文主要就这两个问题进行案例说明。

3.1、netty数据包载体

udp是无连接的,这意味着通信双方无需像TCP那般“三次握手四次释放”。只要知道对方的socket地址(Ip+Port),即可发送数据,发完即终止。在Netty里,udp的通信载体叫做DatagramPacket,负责将数据(ByteBuf)从源头发送到目的地。

public class DatagramPacket extends DefaultAddressedEnvelope<ByteBuf, InetSocketAddress> implements ByteBufHolder {     public DatagramPacket(ByteBuf data, InetSocketAddress recipient) {         super(data, recipient);     }      public DatagramPacket(ByteBuf data, InetSocketAddress recipient, InetSocketAddress sender) {         super(data, recipient, sender);     } }

3.2私有协议栈编解码

3.2.1、通信基类

而我们的网络底层通信是基于javabean的,因此定义我们的消息基类如下:

public class UdpMessage implements Message {      private String senderIp;      private int senderPort;      private String receiverIp;      private int receiverPort;  }

3.2.2、私有协议栈消息编码

我们将私有协议栈定义为 包头(消息类型id),包体(具体消息经编码后的字节数组)。将自定义消息UdpMessage转为DatagramPacket

public class UdpProtocolEncoder extends MessageToMessageEncoder<UdpMessage> {     private static final Logger logger = LoggerFactory.getLogger("socketserver");      private final MessageFactory messageFactory;      private final MessageCodec messageCodec;      public UdpProtocolEncoder(MessageFactory messageFactory, MessageCodec messageCodec) {         this.messageFactory = messageFactory;         this.messageCodec = messageCodec;     }      @Override     protected void encode(ChannelHandlerContext ctx, UdpMessage message, List<Object> out) throws Exception {         // ----------------protocol pattern-------------------------         // packetLength | cmd | body         // int int byte[]         int  cmd = messageFactory.getMessageId(message.getClass());         try {             byte[] body = messageCodec.encode(message);             //消息内容长度             ByteBuf buf = Unpooled.buffer(body.length+4);             // 写入cmd类型             buf.writeInt(cmd);             buf.writeBytes(body);             out.add(new DatagramPacket(buf, new InetSocketAddress(message.getReceiverIp(), message.getReceiverPort())));         } catch (Exception e) {             logger.error("wrote message {} failed", cmd, e);         }     } }

这里消息pojo编码,使用了jforgame的组件,根据javabean的字段元信息,自动编码为byte数组。

依赖申明如下:

<dependency>     <groupId>io.github.jforgame</groupId>     <artifactId>jforgame-codec-struct</artifactId>     <version>1.1.0</version> </dependency>

3.2.3、私有协议栈消息解码

私有协议栈解码负责将数据包DatagramPacket转为UdpMessage。将底层数据流转为ByteBuf之后,还需要将字节数据进行解码,才可以转换为应用程序认识的消息。

public class UdpProtocolDecoder extends MessageToMessageDecoder<DatagramPacket> {     private final MessageFactory messageFactory;      private final MessageCodec messageCodec;       public UdpProtocolDecoder(MessageFactory messageFactory, MessageCodec messageCodec) {         this.messageFactory = messageFactory;         this.messageCodec = messageCodec;     }      @Override     protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {         ByteBuf in = msg.content();         int length = in.readableBytes();         int cmd = in.readInt();         byte[] body = new byte[length - 4];         in.readBytes(body);         Class<?> msgClazz = messageFactory.getMessage(cmd);         out.add(messageCodec.decode(msgClazz, body));     }  } 

4、服务端代码

4.1、会话管理

服务端会话管理(建立及摧毁),以及消息接受(下文的channelRead方法)。

@ChannelHandler.Sharable public class UdpChannelIoHandler extends ChannelInboundHandlerAdapter {      private final static Logger logger = LoggerFactory.getLogger("socketserver");      /** 消息分发器 */     private final SocketIoDispatcher messageDispatcher;      public UdpChannelIoHandler(SocketIoDispatcher messageDispatcher) {         super();         this.messageDispatcher = messageDispatcher;     }      @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         Channel channel = ctx.channel();         ChannelUtils.duplicateBindingSession(ctx.channel(), new NSession(channel));         SessionManager.getInstance().buildSession(ChannelUtils.getSessionBy(channel));         System.out.println("socket register " + channel);     }      @Override     public void channelRead(ChannelHandlerContext context, Object packet) throws Exception {         logger.debug("receive pact, content is {}", packet.getClass().getSimpleName());          final Channel channel = context.channel();         IdSession session = ChannelUtils.getSessionBy(channel);         messageDispatcher.dispatch(session, packet);     }      @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {         Channel channel = ctx.channel();         System.out.println("socket inactive " + channel);         IdSession userSession = ChannelUtils.getSessionBy(channel);         messageDispatcher.onSessionClosed(userSession);     }  }

需要注意的是,UDP Socket Server的链接建立,只在启动的时候触发一次。之后,无论有多少个客户端,上文的channelActive都不会再次触发。也就是说,客户端的链接不是一对一的,全局只有一个服务端链接。这点与tcp不同。那么,怎么区分不同的客户端呢?后文例子揭晓。

4.2、消息路由

具体的消息处理(通过消息路由及消息处理器注解)。网关接受到消息之后,自动把消息分发到对应的处理器。类似与springmvc的Controller以及RequestMapper功能。

public class MessageIoDispatcher extends ChainedMessageDispatcher {      private MessageHandlerRegister handlerRegister;      MessageFactory messageFactory = GameMessageFactory.getInstance();      private MessageParameterConverter msgParameterConverter= new DefaultMessageParameterConverter(messageFactory);      public MessageIoDispatcher() {         LoginRouter router = new LoginRouter();         this.handlerRegister = new CommonMessageHandlerRegister(Collections.singletonList(router), messageFactory);         MessageHandler messageHandler = (session, message) -> {             int cmd = GameMessageFactory.getInstance().getMessageId(message.getClass());             MessageExecutor cmdExecutor = handlerRegister.getMessageExecutor(cmd);             if (cmdExecutor == null) {                 logger.error("message executor missed,  cmd={}", cmd);                 return true;             }              Object[] params = msgParameterConverter.convertToMethodParams(session, cmdExecutor.getParams(), message);             Object controller = cmdExecutor.getHandler();              MessageTask task = MessageTask.valueOf(session, session.hashCode(), controller, cmdExecutor.getMethod(), params);             task.setRequest(message);             // 丢到任务消息队列,不在io线程进行业务处理             GameServer.getMonitorGameExecutor().accept(task);             return true;         };          addMessageHandler(messageHandler);     }      @Override     public void onSessionCreated(IdSession session) {     }      @Override     public void onSessionClosed(IdSession session) {     }  }

4.3、服务端启动代码

public class UdpSocketServer implements ServerNode {      private static final Logger logger = LoggerFactory.getLogger("socketserver");      private EventLoopGroup group = new NioEventLoopGroup();      protected HostAndPort nodesConfig = HostAndPort.valueOf(8088);      public SocketIoDispatcher socketIoDispatcher;      public MessageFactory messageFactory;      public MessageCodec messageCodec;      @Override     public void start() throws Exception {         try {             SessionManager.getInstance().schedule();                          Bootstrap bootstrap = new Bootstrap();             bootstrap.group(group)                     .channel(NioDatagramChannel.class)                     .handler(new LoggingHandler(LogLevel.DEBUG))                     .handler(new ChannelInitializer<DatagramChannel>() {                         @Override                         public void initChannel(DatagramChannel ch) throws Exception {                             ChannelPipeline pipeline = ch.pipeline();                             pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec));                             pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec));                             pipeline.addLast(new UdpChannelIoHandler(socketIoDispatcher));                          }                     });              logger.info("socket server is listening at " + nodesConfig.getPort() + "......");             bootstrap.bind(nodesConfig.getPort()).sync().channel().closeFuture().sync();          } catch (Exception e) {             logger.error("", e);             group.shutdownGracefully();         }     }      @Override     public void shutdown() throws Exception {         group.shutdownGracefully();     }       public static void main(String[] args) throws Exception {         UdpSocketServer udpSocketServer = new UdpSocketServer();         udpSocketServer.messageFactory = GameMessageFactory.getInstance();         udpSocketServer.messageCodec = new StructMessageCodec();         udpSocketServer.socketIoDispatcher = new MessageIoDispatcher();          udpSocketServer.start();     } }

5、客户端代码

5.1、客户端启动代码

public class UdpSocketClient extends AbstractSocketClient {      private final EventLoopGroup group = new NioEventLoopGroup(1);      private HostAndPort nativeHostPort;       public UdpSocketClient(SocketIoDispatcher messageDispatcher, MessageFactory messageFactory, MessageCodec messageCodec, HostAndPort hostPort) {         this.ioDispatcher = messageDispatcher;         this.messageFactory = messageFactory;         this.messageCodec = messageCodec;         this.targetAddress = hostPort;     }      @Override     public IdSession openSession() throws IOException {         try {             final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();             Bootstrap bootstrap = new Bootstrap();             bootstrap.channel(NioDatagramChannel.class);             bootstrap.group(nioEventLoopGroup);             bootstrap.handler(new LoggingHandler(LogLevel.INFO));             bootstrap.handler(new UdpProtoBufClientChannelInitializer());             ChannelFuture f = bootstrap.connect(new InetSocketAddress(targetAddress.getHost(), targetAddress.getPort()),                     new InetSocketAddress(nativeHostPort.getHost(), nativeHostPort.getPort())).sync();             IdSession session = new NSession(f.channel());             this.session = session;             return session;         } catch (Exception e) {             group.shutdownGracefully();             throw new IOException(e);         }     }      @Override     public void close() throws IOException {         this.session.close();     }       public void send(UdpMessage message) {         message.setSenderIp(nativeHostPort.getHost());         message.setSenderPort(nativeHostPort.getPort());          message.setReceiverIp(targetAddress.getHost());         message.setReceiverPort(targetAddress.getPort());         session.send(message);     }       class UdpProtoBufClientChannelInitializer extends ChannelInitializer<NioDatagramChannel> {         @Override         protected void initChannel(NioDatagramChannel ch) throws Exception {             ChannelPipeline pipeline = ch.pipeline();             pipeline.addLast("protocolDecoder", new UdpProtocolDecoder(messageFactory, messageCodec));             pipeline.addLast("protocolEncoder", new UdpProtocolEncoder(messageFactory, messageCodec));             pipeline.addLast(new UdpChannelIoHandler(ioDispatcher));         }     } }

这里需要注意下:

客户端需要指定端口与服务器通信,这样才能方便消息包携带自己的地址信息。

5.2、测试代码

客户端测试代码如下:

模拟10个玩家登录。读者可自行修改程序。udp是不可靠链接,不保证交付。如果在外网跑,是会出现消息丢包,或者乱序。在本地跑,是不太可能出现这种情况。读者可自行测试,同一个角色发送一大堆消息(附加上次序字段),看服务器收到的消息序号是否完整有序。

 private static AtomicLong idFactory = new AtomicLong(1000);      public static void main(String[] args) throws Exception {         MessageCodec messageCodec = new StructMessageCodec();         GameMessageFactory.getInstance().registeredClassTypes().forEach(Codec::getSerializer);         for (int i = 0; i < 10; i++) {             System.out.println("----------i=" + i);             UdpSocketClient socketClient = new UdpSocketClient(new SocketIoDispatcherAdapter() {                 @Override                 public void dispatch(IdSession session, Object message) {                     System.out.println("receive package ---------" + JsonUtil.object2String(message));                 }              }, GameMessageFactory.getInstance(), messageCodec, HostAndPort.valueOf(8088));              socketClient.nativeHostPort = HostAndPort.valueOf(8099 + i);             socketClient.openSession();             for (int j = 0; j < 1; j++) {                 ReqLogin req = new ReqLogin();                 req.setPlayerId(idFactory.getAndIncrement());                 socketClient.send(req);             }         }      }

6、游戏服务器示例功能

6.1、demo逻辑

我们以上面的代码,实现一个简单的游戏逻辑。

  1. 玩家根据服务端的地址进行udp通信。链接建立之后,模拟账号登录逻辑。
  2. 服务器接收到登录消息之后,立马推送登录成功的协议。
  3. 服务器每隔一段时间,向所有注册的客户端推送一个消息包。

6.2、登录请求/响应包

@MessageMeta(cmd = 55555) public class ReqLogin extends UdpMessage {      private long playerId; }
@MessageMeta(cmd = 55556) public class ResPlayerLogin extends UdpMessage {      private long playerId; }

6.3、登录路由

@MessageRoute public class LoginRouter {      @RequestHandler     public void reqTime(IdSession session, ReqLogin req) {         long playerId = req.getPlayerId();         System.out.println("player login" + playerId);         Player player = new Player();         player.setId(playerId);         player.setRemoteAddr(HostAndPort.valueOf(req.getSenderIp(), req.getSenderPort()));         SessionManager.getInstance().register(playerId, player);         ResPlayerLogin resp = new ResPlayerLogin();         resp.setPlayerId(playerId);         player.receive(session, resp);     } } 

其中SessionManager 类缓存服务器的全局session,以及各个客户端环境的通信地址。以及,每隔一段时间主动向客户端推送消息。

public class SessionManager {      private static SessionManager inst = new SessionManager();      private ConcurrentMap<Long, Player> id2Players = new ConcurrentHashMap<>();      private IdSession serverSession;      public static SessionManager getInstance() {         return inst;     }      public void register(long playerId, Player player) {         id2Players.put(playerId, player);     }      public void buildSession(IdSession session) {         serverSession = session;     }      public void schedule() {         SchedulerManager.getInstance().scheduleAtFixedRate(()->{             id2Players.forEach((key, value) -> {                 ResWelcome push = new ResWelcome();                 push.setTime(System.currentTimeMillis());                 value.receive(serverSession, push);             });         }, TimeUtil.MILLIS_PER_MINUTE, 10*TimeUtil.MILLIS_PER_SECOND);     }  }

需要注意的是,客户端只有在登录成功之后,服务器才能绑定玩家与对应的客户端地址,才能主动推送消息。也就是说,在登录之前,服务器也是无法主动推送消息的,在业务上来说,也是没有意义的。

6.4、客户端测试代码输出

7、总结

udp是一种无连接的协议,t提供不可靠性传输。UDP通过尽力交付数据包的方式进行传输,不对数据包的传输状态进行确认和重传,因此速度较快。UDP适用于对实时性要求较高、对数据准确性要求较低的应用场景。例如,如果语音视频服务。如果游戏服务器确实需要使用udp协议的话,需要在应用层解决丢包乱序问题。

对于乱序,接受方可以先把数据都缓存起来,等一段窗口期的数据接受完毕后再分发给业务层。对于丢包,无论是发送方还是接受方,都需要缓存最近发送的消息。以便用于丢包重传。当然,这只是基本的思路,实际处理起来可能会非常复杂。如果考虑到UDP协议的话,可能KCP会是更好的选择。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!