阅读量:0
在这篇文章,我实现一个最简单的协议-DISCARD协议。它把收到的请求数据立即抛弃,而且不做任何回复。
我们知道,Netty是一个事件驱动的网络程序框架,因此我们先从编写我们自己的事件处理器开始。
编写事件处理器
package netty.example.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1) @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // (2) // Discard the received data silently. ((ByteBuf)msg).release(); // (3) } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // (4) // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
DiscardServerHandler
继承ChannelInboundHandlerAdapter
,ChannelInboundHandlerAdapter
简单实现了接口ChannelInboundHandler
中的所有方法。ChannelInboundHandler
提供了很多种可被重写的事件处理方法。由于我们的程序简单,我们只需继承ChannelInboundHandlerAdapter
,然后重学我们需要自定义的方法即可。- 这里我们重写了
channelRead()
方法。当有客户端发来请求的时候,这个方法会被调用,请求携带的数据会作为参数传进来。在这里,收到的数据的类型是ByteBuf
。 - 鉴于我们实现的是DISCARD协议,这里忽略收到的数据。另外需要注意的是,
ByteBuf
是携带引用计数的,需要显示的调用release()
释放引用计数。我们在编写处理器是,要时刻想着返回前要释放传进来的任何带引用计数的对象。通常情况下,channelRead()
方法会遵循如下方式编写:
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { // Do something with msg } finally { ReferenceCountUtil.release(msg); } }
- 当I/O发生错误,或者事件处理器执行过程中发生错误,会触发
exceptionCaught()
方法,该方法携带一个Throwable
的参数。通常情况下,异常信息写到日志,关闭关联的channel。当然,根据具体情况会有不同的实现。比如,返回一个带有错误码的响应信息。
当目前为止,一切都很好,我们已经实现了一半的需求。接下来,我们编写main()
方法启动服务器。
编写main()
方法启动服务器
package netty.example.discard; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // 绑定端口并等待连接 ChannelFuture f = b.bind(port).sync(); // (7) // 阻塞直到 server socket 关闭。 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { port = Integer.parseInt(args[0]); } new DiscardServer(port).run(); } }
NioEventLoopGroup
是一个多线程的,处理I/O的事件循环。Netty为不同的传输类型提供了很多种EventLoopGroup
的实现。我们这里因为实现的是服务器端,因此需要两个NioEventLoopGroup
,第一个称之为’boss’,负责接受连接。第二个称之为’worker’,负责已创建的并注册到worker的连接的数据交换。不同的EventLoopGroup
的实现决定使用多少线程以及哪个线程如何分配到创建的Channel
。ServerBootstrap
是一个helper class,负责配置启动服务器。- 这里,我们指定
NioServerSocketChannel
用于实例化Channel
以接受新的连接。 - 这里指定的处理器被已已创建的连接使用。
ChannelInitializer
是一个特殊的处理器,用于配置ChannelPipeline
,增加新的处理器,如DiscardServerHandler
。随着程序的复杂度的增加,在ChannelPipeline
加入更多的处理器,你可能会把这个匿名类抽出来成为top-level
的类。 - 你可以为不同的Channel实现配置响应的选项。我们正在写基于TCP/IP的服务器,因此我们可以设置socket选项:tcpNoDelay和keepAlive。参考apidocs的
ChannelOption
和特定的ChannelConfig
实现。 option()
用于接受连接的NioServerSocketChannel
,childOption()
用于已经被ServerChannel
创建连接的Channel
,即NioSocketChannel
。- 最后,绑定端口并启动服务。这里我们绑定机器所有网卡的8080端口,你可以多次调用
bind()
,绑定不同的地址。
到这,恭喜你,你已经完成了第一个基于Netty的网络程序。