文章目录
EventLoop和EventLoopGroup
二者大概是什么这里不再赘述,前一篇已简述过。
不理解也没关系。
下面会简单使用,看了就能明白是什么
这篇文章只说NioEventLoopGroup
后续文章会有服务器创建类BootStrap的方法总结。
服务器与客户端基本使用
EventLoopGroup
的常用实现类是NioEventLoopGroup
就以此为例,我们来看这两者,在使用netty
创建一个服务器的过程中处于什么位置。
PS:我觉得既然是专注于这个东西怎么用,就不要太纠结于底层。
我当时看到childHandler
,就想弄明白这个到底是怎么把Handler
加到新Channel
的Pipeline
中的。这就要涉及源码。下面这个链式调用其他部分也是一样。想搞明白每个调用的底层还是需要看源码。会给初学者带来没必要的精力耗费。我觉得需要收住好奇心,先会简单使用,再说底层实现。
//服务端 //ServerBootstrap相当于提供了创建服务器的辅助类。允许通过链式调用更优雅的启动一个服务器。 //我们通过一系列链式调用完成了服务器的创建。 new ServerBootstrap() //group相当于我们配置EventLoopGroup的地方,它将用于后续的事件处理 .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) //channel指定通道类型,NioServerSocketChannel是netty的封装类,即NIO中的升级版 .channel(NioServerSocketChannel.class) //这个名字child代表子通道,什么意思?即我们有一个NioServerSocketChannel来处理Accept请求。接受的每个连接都会创建自己的SocketChannel用于通信,可当作child,即子通道。 //所以顾名思义子通道处理器。即为新创建的每一个子通道都会绑定这个ChannelInitializer,那么它又是做什么的? //ChannelInitializer会在每个新Channel被注册到EventLoopGroup时执行内部我们重写的initChannel方法,将我们在initChannel方法中自定义的处理器添加到我们这个channel的channelpipeline中。即为我们新连接的Channel添加一个handler。关于handler和pipeline,在上篇中简单讲述了是什么。 //简单来说,它是对每个新建立的连接Channel,指定我们写好的处理逻辑,之后Channel的读写操作都会经过这些逻辑。 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) {//这里的参数我猜测就是我们新连接的channel //这里对新channel添加我们自定义的handler ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; if (byteBuf != null) { byte[] buf = new byte[16]; ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); log.debug(new String(buf)); } } }); } //绑定端口,并sync阻塞住 }).bind(8080).sync();
这里还有一些理解channel()
方法,将我们指定的NioServerSocketChannel
注册到group中的第一个group
内的某个EventLoop
,由该EventLoop
监听。第一个group
中的EventLoop
被称为Boss
,专门处理连接请求。
所有连接由NioServerSocketChannel
处理,同时新连接产生的SocketChannel
会注册到第二个group
内的某个EventLoop
,由该EventLoop
监听。这些EventLoop
被称为Worker
。处理读写请求。
上面程序中相当于有两个Worker
,因为创建EventLoopGroup
时参数为2。
所以小结一下。每个Channel都会由一个EventLoop
监听。并且在事件发生时调用对应的处理器进行处理。即完成对事件的监听和处理。
//客户端 //通过链式调用,拿到连接完毕的Channel Channel channel = new Bootstrap() .group(new NioEventLoopGroup(1)) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { System.out.println("init..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); } }) .channel(NioSocketChannel.class).connect("localhost", 8080) .sync() .channel(); // 发送消息 channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes())); Thread.sleep(2000); channel.writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes("wangwu".getBytes()));
这里多创建几个客户端发消息,会发现服务器中的两个Worker会分别处理属于自己管理Channel的事件。
增加非NIO工人
下面程序有两个handler
LoggingHandler
和"myhandler"
引入了一个我们自己创建的DefaultEventLoopGroup
去处理"myhandler"
的任务,即非NIO工人
//额外的工程组 //非NIO工人2个 DefaultEventLoopGroup normalWorkers = new DefaultEventLoopGroup(2); new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(normalWorkers,"myhandler", new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf byteBuf = msg instanceof ByteBuf ? ((ByteBuf) msg) : null; if (byteBuf != null) { byte[] buf = new byte[16]; ByteBuf len = byteBuf.readBytes(buf, 0, byteBuf.readableBytes()); log.debug(new String(buf)); } } }); } }).bind(8080).sync();
看完这个代码,我们看添加handler
时的参数,我们发现"myhandler"
被交给了我们自己一开始创建的DefaultEventLoopGroup normalWorkers
来执行。
关系见下图head
和tail
看做pipeline
中handler
链的头尾,可视为无意义。h1
代表LoggingHandler
h2
代表"myhandler"
那么这里老师巧妙的引出了pipeline
处理过程中的handler
的切换,因为两个handler
分别是两个不同的EventLoop
去执行,就需要一个执行完后交给另一个EventLoop
去执行。然后引出了源码,比较容易让人接受,理解。
//handler换人源码 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程 EventExecutor executor = next.executor(); // 是,直接调用 if (executor.inEventLoop()) { next.invokeChannelRead(m); } // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人) else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } }
- 如果两个 handler 绑定的是同一个线程(EventLoop),那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
NioEventLoop 处理普通任务与定时任务
NioEventLoop还可以执行普通任务和定时任务
//普通任务 NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); log.debug("server start..."); Thread.sleep(2000); nioWorkers.execute(()->{ log.debug("normal task..."); }); //定时任务 NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); log.debug("server start..."); Thread.sleep(2000); nioWorkers.scheduleAtFixedRate(() -> { log.debug("running..."); }, 0, 1, TimeUnit.SECONDS);
结语
仅仅说使用的话内容好像不多。
api使用的话还要之后自己进行网络程序的编写,学习过程中估计很难记住。还是要学完去实践才能熟练使用。
下篇应该是Netty的Future与Promise,或者Netty Channel相比于NIO的有什么不同(这块估计要看下书,课上没咋提)。
后续会有文章单独说明ServerBootstrap类的一些方法。
感谢阅读,欢迎批评指正。