Netty实现RPC服务器之出、入站处理器
前言
本篇是用Netty实现一个RPC中间件的第三篇,未来还会持续更新,代码在最后一片更新完毕之后将会上传到github、gitee代码托管平台,供大家拉取学习,并进行讨论分享,让该RPC框架持续精进。
在Netty中,数据的处理是通过处理器链(pipeline)完成的,这些处理器可以被分类为出站处理器(outbound handlers)和入站处理器(inbound handlers)。出站处理器负责处理从应用程序向网络发送的数据,而入站处理器则处理从网络接收到的应用程序数据。
本篇主要接扫通过构建自定义的处理器,让我们的消息在发送、接收阶段进行一定的逻辑处理,包括心跳处理,编解码,业务处理器等等。
愿我们在技术的浩瀚中游刃有余。
软件中目录
rpc_core
模块
rpc_client
模块
rpc_server
模块
出/入站处理器
概述
netty使用了一种事件驱动的应用范式,因此数据处理的管道是一个经过处理器的事件链。事件和处理器可以关联到 xxInbound
入站 与 xxOutbound
出站 数据流。
处理器链
入站和出栈事件都会经过预设的处理器链(多个处理器);
- 即入站事件经过 入站处理器;出站事件经过出站处理器;多个处理器形成一个链或管道;
处理器举例:
网络传输全是字节形式,而业务逻辑处理是对象形式,所以需要编码器把对象转字节,需要解码器把字节转对象;
ByteToMessageDecoder
字节转消息(对象)解码器;MessageToByteEncoder
消息(对象)转字节编码器;- 业务逻辑处理器(如加工,统计,入库,消息转发等);
以客户端服务器模式介绍入站与出站处理器的事件处理过程
【图解】
客户端的处理器有:
- 解码处理器;
- 编码处理器;
- 客户端业务处理器;
服务端的处理器有:
- 解码处理器;
- 编码处理器;
- 服务器业务处理器;
补充:多个处理器封装到通道管道 ChannelPipeline;
处理器链调用机制代码实现
1)需求描述:
自定义编码器和解码器实现客户端与服务器间的数据传输; 2)通道管道``ChannelPipeline` 可以封装多个处理器;其处理器执行顺序特别重要(前后关系特别重要,如入栈解码处理器要第1个执行,又如出站编码器要最后一个执行),否则客户端与服务器将无法通信(因为事件或数据要经过所有的处理器);类似于如下:
java
代码解读
复制代码
for (event event : events) { handler1(event); handler2(event); handler3(event); }
3)入站与出站处理器执行顺序:
3.1)服务器初始化器,添加处理器到管道;
入站事件
Netty中的入站事件是指那些从网络流向应用程序的数据或状态变化。这些事件通常是由网络I/O操作触发的,然后由Netty框架传递给用户定义的处理器进行处理。以下是Netty中的一些典型入站事件:
- ChannelRegistered:
- 当Channel被注册到EventLoop之后触发。
- 这个时候Channel还不能进行I/O操作,因为还没有完成绑定。
- ChannelUnregistered:
- 当Channel从EventLoop注销时触发。
- 这通常发生在Channel关闭之前。
- ChannelActive:
- 当Channel准备好进行I/O操作时触发。
- 对于客户端来说,这意味着已经成功连接到了服务器;对于服务器来说,这意味着一个新的连接已经被接受。
- ChannelInactive:
- 当Channel不再能够进行I/O操作时触发。
- 这意味着Channel已经关闭或者连接已经断开。
- ChannelRead:
- 当从Channel中读取到数据时触发。
- 这个事件将读取的数据传递给下一个Inbound Handler。
- ChannelReadComplete:
- 在所有ChannelRead事件都已被触发且所有数据都被读取完毕后触发。
- 这是一个提示信号,表明当前Buffer中的所有数据都已经读取完成。
- UserEventTriggered:
- 用户可以自定义事件,这些事件也会被传递给Inbound Handler。
- 常见的用途包括心跳检测、定时任务等。
- ExceptionCaught:
- 当Channel或Handler抛出一个未处理的异常时触发。
- 这个事件允许你捕获异常并进行适当的错误处理。
在Netty中,这些事件会被传递给ChannelPipeline中的Inbound Handlers,这些Handlers按照它们在Pipeline中的顺序被调用。每个Handler都有机会处理事件,并可以选择继续传递给下一个Handler或停止传递。这允许开发者构建复杂的事件处理逻辑,同时保持代码的模块化和易于维护。
示例:处理ChannelRead事件
假设你需要实现一个入站处理器,该处理器用于解析接收到的消息:
整理了一份好像面试笔记包括了:Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafka 面试专题
需要全套面试笔记【点击此处即可】免费获取
java
代码解读
复制代码
public class MessageDecoderHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 检查msg是否是我们需要处理的对象类型 if (msg instanceof ByteBuf) { ByteBuf in = (ByteBuf) msg; try { // 解析ByteBuf中的数据 String receivedMessage = in.toString(CharsetUtil.UTF_8); System.out.println("Received: " + receivedMessage); // 处理数据,例如转发给其他组件 } finally { // 释放ByteBuf资源 in.release(); } } else { // 如果不是我们处理的对象类型,则直接传递给下一个处理器 ctx.fireChannelRead(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常信息 cause.printStackTrace(); // 关闭连接 ctx.close(); } }
在这个例子中,MessageDecoderHandler
会在接收到数据时解析ByteBuf中的内容,并打印出来。如果解析完成后,会释放ByteBuf资源以避免内存泄漏。
将入站处理器添加到Pipeline
要将入站处理器添加到ChannelPipeline中,可以使用如下方法:
java
代码解读
复制代码
channel.pipeline().addLast(new MessageDecoderHandler());
通过这种方式,你可以根据需要添加多个入站处理器,并且这些处理器会按照它们在Pipeline中的顺序被调用。这种设计使得网络编程更加模块化和灵活。
出站事件
在Netty中,出站处理器(Outbound Handlers)主要负责处理应用程序发起的网络请求或数据发送。出站处理器通常是按照相反的顺序执行的,即它们按照在ChannelPipeline中的逆序被调用。以下是一些常见的出站事件及其处理方式:
- ChannelRegistered:
- 当Channel被注册到EventLoop之后触发。
- 这个事件对出站处理器来说可能不是特别重要,但在某些场景下可能需要做一些初始化工作。
- ChannelActive:
- 当Channel准备好进行I/O操作时触发。
- 对于出站处理器而言,这意味着可以开始发送数据了。
- ChannelWritabilityChanged:
- 当Channel的写入能力发生变化时触发。
- 例如,当Channel由于背压(backpressure)而变得不可写入时,这个事件会被触发。
- 出站处理器可以在这个事件中决定是否继续写入数据。
- ChannelWrite:
- 当应用程序尝试通过ChannelHandlerContext.write()方法写入数据时触发。
- 这个事件会将要发送的数据传递给出站处理器。
- ChannelFlushed:
- 当ChannelHandlerContext.flush()被调用时触发。
- 这表示所有的待写入的数据应该被尽快写出到网络中。
处理出站事件
出站处理器通常会实现ChannelOutboundHandler
接口或其子接口。以下是如何处理这些事件的一些示例:
示例:处理ChannelWrite事件
假设你需要实现一个出站处理器,该处理器在数据被发送前对其进行加密:
java
代码解读
复制代码
public class EncryptionOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 检查msg是否是我们需要加密的对象类型 if (msg instanceof String) { String message = (String) msg; byte[] encryptedMessage = encrypt(message); // 自定义的加密逻辑 ByteBuf encoded = ctx.alloc().buffer(); encoded.writeBytes(encryptedMessage); ctx.write(encoded, promise); } else { // 如果不是我们处理的对象类型,则直接传递给下一个处理器 ctx.write(msg, promise); } } private byte[] encrypt(String message) { // 加密逻辑 return message.getBytes(); // 示例中简单地转换为字节数组 } @Override public void flush(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
在这个例子中,EncryptionOutboundHandler会在数据被发送之前进行加密处理,并使用ctx.write()
方法将加密后的数据传递给下一个出站处理器。如果没有需要加密的对象,则直接传递给下一个处理器。
将出站处理器添加到Pipeline
要将出站处理器添加到ChannelPipeline中,可以使用如下方法:
java
代码解读
复制代码
channel.pipeline().addLast(new EncryptionOutboundHandler());
RPC出/入站处理器
服务器端的RpcRequestMessageHandler
处理器
java
代码解读
复制代码
/** * RPC请求处理器 * @author XiaoSheng * @date 2024/8/21 上午11:26 */ @Slf4j @ChannelHandler.Sharable public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception { RpcResponseMessage responseMessage = new RpcResponseMessage(); // 设置请求的序号 responseMessage.setSequenceId(msg.getSequenceId()); Object result; try { //通过名称从工厂获取本地注解了@RpcServer的实例, 通过服务类名称获取对应的类实例 Object service = ServiceFactory.serviceFactory.get(msg.getServiceName()); // 获取方法,参数, Method method = service.getClass().getMethod(msg.getMethodName(), msg.getParameterTypes()); // 调用 result = method.invoke(service, msg.getParameterValue()); System.out.println("result: " + result); // 此处对该对象进行toString编码,防止序列化等原因导致复杂对象消息无法传递至客户端 responseMessage.setReturnValue(result.toString()); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { responseMessage.setExceptionValue(new Exception("远程调用出错: "+ e.getMessage())); } finally { ctx.writeAndFlush(responseMessage); /** * ReferenceCountUtil.release 方法是用来减少引用计数的。 * 当一个引用计数对象不再被需要时,你应该调用这个方法来减少它的引用计数。 * 如果引用计数减为零,Netty 将会自动释放该对象的内存。 */ ReferenceCountUtil.release(msg); } } /** * 读空闲 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.READER_IDLE) { log.info("长时间未收到心跳包,断开连接..."); ctx.close(); } } else { super.userEventTriggered(ctx, evt); } } }
客户端的RpcResponseMessageHandler
处理器
java
代码解读
复制代码
/** * @author XiaoSheng * @date 2024/8/22 上午11:48 */ @Slf4j @ChannelHandler.Sharable public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception { try { log.debug("{}", msg); // 每次使用完都要移除 Promise<Object> promise = RpcClientManager.PROMISES.remove(msg.getSequenceId()); if (promise != null) { Object returnValue = msg.getReturnValue(); System.out.println("returnValue: " + returnValue); Exception exceptionValue = msg.getExceptionValue(); if (exceptionValue != null) { promise.setFailure(exceptionValue); } else { promise.setSuccess(returnValue); } } else { promise.setFailure(new Exception("promise不存在")); } } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("出现异常"+cause); ctx.close(); } }
通过上述代码可以发现,这里我们都是将处理器类直接继承SimpleChannelInboundHandler
类(这里也可以继承ChannelInboundHandlerAdapter
来实现自定义的类),并且通过重写channelRead0()
方法来编写业务逻辑代码。
这里也对SimpleChannelInboundHandler
类进行介绍
SimpleChannelInboundHandler
SimpleChannelInboundHandler
是 Netty 中的一个抽象类,用于处理入站数据(即从客户端到服务器的数据)。它是 ChannelInboundHandlerAdapter
的一个简化版本,专门用于处理某种类型的消息。它自动释放消息,以避免内存泄漏。
io.netty.channel.SimpleChannelInboundHandler
是一个``ChannelInboundHandler`,它允许只处理一种明确的消息,这个消息的类型由泛型参数指定,例如String。- 这个类还有一个默认的特性,就是读过消息以后,可以将消息自动释放。如果读过消息以后不希望将消息继续传递给
ChannelPipeline
中后续的``ChannelHandler`,这个特性是有帮助的。 - 当然,如果读过消息以后不希望自动释放,那么可以在创建
SimpleChannelInboundHandler
子类的实例的时候,调用SimpleChannelInboundHandler
的构造函数SimpleChannelInboundHandler(boolean autoRelease)
,将构造函数中的参数autoRelease
的值设置为false
。如果不设置,默认为true。 SimpleChannelInboundHandler
这个是一个抽象类,一个必须子类实现的函数是channelRead0(ChannelHandlerContext ctx, I msg)
。但这个函数不是ChannelInboundHandler
中的方法,而是SimpleChannelInboundHandler
自己增加的方法。channelRead0(ChannelHandlerContext ctx, I msg)
这个方法会被channelRead(ChannelHandlerContext ctx, Object msg)
方法调用。- 当
Channel
从对端读取到消息后,会调用channelRead(ChannelHandlerContext ctx, Object msg)
方法,而channelRead(ChannelHandlerContext ctx, Object msg)
方法会调用channelRead0(ChannelHandlerContext ctx, I msg)
方法,所以SimpleChannelInboundHandler
的子类实现channelRead0(ChannelHandlerContext ctx, I msg)
方法即可。
我们看看SimpleChannelInboundHandler
中channelRead(ChannelHandlerContext ctx, Object msg)
方法的实现:
java
代码解读
复制代码
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
上面源代码中,channelRead0(ctx, imsg)这条语句就是调用了被子类实现的channelRead0方法。
这个if判断语句中,autoRelease就是构造函数传入的值,如果不传的话默认为true。当传入channelRead函数的消息类型跟泛型参数中的类型一致时,这个消息可以被处理,此时release的值为true。如果传入的消息跟泛型参数中指定的类型不同时,release的值被设置为false,这个消息不会被处理,只会简单地传递给ChannelPipeline
中后续的ChannelHandler
处理,这就是所谓的只明确处理一种消息的含义。
下面再分析下if条件判断的逻辑:
java
代码解读
复制代码
if (autoRelease && release) { ReferenceCountUtil.release(msg); }
ReferenceCountUtil.release(msg)
这个函数的作用就是将msg的引用计数减少1。如果引用计数减少到0,那么就将msg释放。当然,前提是msg实现了io.netty.util.ReferenceCounted
接口。如果没有实现该接口,那么ReferenceCountUtil.release(msg)
这条语句等于什么也没有做。
核心API
SimpleChannelInboundHandler
是一个泛型类,需要指定处理的消息类型。例如,如果你想处理字符串消息,你可以指定 String
作为泛型类型。
主要方法
channelRead0(ChannelHandlerContext ctx, I msg)
- 这是你需要重写的方法,用于处理特定类型的消息。
ctx
是ChannelHandlerContext
对象,用于与ChannelPipeline
交互。msg
是接收到的消息。
关键点总结
- 消息类型:
SimpleChannelInboundHandler
处理特定类型的消息,使用泛型指定。 - 自动释放:Netty 会在
channelRead0
方法完成后自动释放消息,避免内存泄漏。 - 线程安全:
SimpleChannelInboundHandler
中的代码通常在 I/O 线程中执行,确保线程安全。 - 错误处理:在
exceptionCaught
方法中处理异常,避免未捕获的异常导致应用程序崩溃。
通过使用 SimpleChannelInboundHandler
,你可以简化处理入站消息的代码,提高代码的可读性和可维护性。
编/解码器
rpc_core
模块的HeartBeatClientHandler
客户端心跳消息处理器
java
代码解读
复制代码
/** * 客户端的心跳 * @author XiaoSheng * @date 2024/8/21 上午10:40 */ @Slf4j @ChannelHandler.Sharable public class HeartBeatClientHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState idleState = event.state(); // 长时间没有写入数据,发送心跳包 if (idleState == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage pingMessage = new PingMessage(); pingMessage.setMessageType(PingMessage.PingMessage); pingMessage.setSequenceId(0); ctx.writeAndFlush(pingMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("远程调用出错"); cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); ctx.close(); super.channelUnregistered(ctx); } }
rpc_core
模块的HeartBeatClientHandler
服务端心跳消息处理器
java
代码解读
复制代码
/** * @author XiaoSheng * @date 2024/8/21 上午11:21 */ @Slf4j @ChannelHandler.Sharable public class HeartBeatServerHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.debug("长时间没有收到消息了,断开连接"); ctx.close(); } } super.userEventTriggered(ctx, evt); } }
上述的心跳消息,使用了继承ChannelDuplexHandler
,并重写userEventTriggered(ChannelHandlerContext ctx, Object evt)
下面对ChannelDuplexHandler
进行讲解:
ChannelDuplexHandler
ChannelDuplexHandler
是 Netty 中一个非常重要的类,它继承自 ChannelInboundHandlerAdapter
和 ChannelOutboundHandlerAdapter
,用于同时处理入站和出站的事件。通常情况下,处理入站和出站事件需要分别实现 ChannelInboundHandler
和 ChannelOutboundHandler
接口,而 ChannelDuplexHandler
允许你在一个类中同时处理这两种类型的事件。
继承结构
ChannelDuplexHandler
继承自 ChannelInboundHandlerAdapter
,而 ChannelInboundHandlerAdapter
又实现了 ChannelInboundHandler
接口。因此,ChannelDuplexHandler
主要用于:
- 处理入站事件(如数据读取、连接建立等)。
- 处理出站事件(如数据写入、连接关闭等)。
常用方法
以下是 ChannelDuplexHandler
中常用的方法,这些方法可以被重写来处理不同的事件:
入站事件方法
channelRead(ChannelHandlerContext ctx, Object msg)
: 读取数据。channelActive(ChannelHandlerContext ctx)
: 处理通道激活事件(连接建立)。channelInactive(ChannelHandlerContext ctx)
: 处理通道非激活事件(连接关闭)。exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
: 处理异常。
出站事件方法
write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
: 写入数据。flush(ChannelHandlerContext ctx)
: 刷新数据。close(ChannelHandlerContext ctx, ChannelPromise promise)
: 关闭通道。
自定义事件方法
userEventTriggered
方法是该类中的一个方法,用于处理用户自定义事件(user events)。这是 Netty 提供的一种机制,允许用户在管道(pipeline)中触发和处理自定义事件。
用户事件是在 ChannelHandlerContext.fireUserEventTriggered(Object event)
被调用时触发的。这些事件可以是任何你想要传递给处理器链下游的对象。通常情况下,这些事件被用来传递非标准的、应用程序特定的信息或通知。
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
参数
ctx
:ChannelHandlerContext
对象,提供了与 Channel 及其管道进行交互的各种操作。evt
:用户自定义事件,通常是一个对象,可以是任何类型。
抛出
Exception
:方法可以抛出异常,异常将由 Netty 的异常处理机制捕获和处理。
示例代码
示例二
以下是一个简单的示例,展示了如何使用 ChannelDuplexHandler
来同时处理入站和出站事件:
java
代码解读
复制代码
public class MyDuplexHandler extends ChannelDuplexHandler { // 处理入站事件:读取数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Inbound: Received message: " + msg); // 将消息传递给下一个处理器 ctx.fireChannelRead(msg); } // 处理入站事件:通道激活 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Inbound: Channel is active"); // 调用父类方法继续处理 super.channelActive(ctx); } // 处理入站事件:异常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("Inbound: Exception caught: " + cause.getMessage()); // 关闭通道 ctx.close(); } // 处理出站事件:写入数据 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("Outbound: Writing message: " + msg); // 将消息传递给下一个处理器 ctx.write(msg, promise); } // 处理出站事件:刷新数据 @Override public void flush(ChannelHandlerContext ctx) throws Exception { System.out.println("Outbound: Flushing data"); // 调用父类方法继续处理 super.flush(ctx); } // 处理出站事件:关闭通道 @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { System.out.println("Outbound: Closing channel"); // 调用父类方法继续处理 super.close(ctx, promise); } }
示例二
java
代码解读
复制代码
/** * 客户端的心跳handler * * @author chenlei */ @Slf4j public class HeartBeatClientHandler extends ChannelDuplexHandler { /** * idlStatus写事件 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState state = event.state(); //长时间没有写入数据 发送心跳包 if (state == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); log.error("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage message = new PingMessage(); message.setSequenceId(0); message.setMessageType(Message.PingMessage); ctx.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.debug("远程调用出错"); cause.printStackTrace(); ctx.close(); super.exceptionCaught(ctx, cause); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered"); ctx.close(); super.channelUnregistered(ctx); } }
示例三
java
代码解读
复制代码
public class MyHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof HeartbeatEvent) { // 假设 HeartbeatEvent 是自定义的心跳事件 HeartbeatEvent heartbeat = (HeartbeatEvent) evt; System.out.println("Received heartbeat: " + heartbeat); } else { super.userEventTriggered(ctx, evt); } } }
在这个例子中,我们创建了一个名为 MyHandler
的自定义处理器,它覆盖了 userEventTriggered
方法,并检查传入的事件是否为 HeartbeatEvent
类型。
注意事项
- 如果你不处理某个事件,或者不确定如何处理,可以调用
super.userEventTriggered(ctx, evt);
将事件传递给下一个处理器。 - 如果你在处理过程中抛出了异常,Netty 会捕获并记录这些异常,但不会终止事件的传播。
- 你可以通过
ChannelHandlerContext.fireUserEventTriggered(Object event)
来向处理器链中发送用户事件。
继续回到我们自定义的心跳处理类中的userEventTriggered
方法;
java
代码解读
复制代码
@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; IdleState idleState = event.state(); // 长时间没有写入数据,发送心跳包 if (idleState == IdleState.WRITER_IDLE) { //获取ip log.debug("发送心跳包 {}", ctx.channel().remoteAddress()); PingMessage pingMessage = new PingMessage(); pingMessage.setMessageType(PingMessage.PingMessage); pingMessage.setSequenceId(0); ctx.writeAndFlush(pingMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } return; }
这里通过结合IdleStateHandler
处理器,将在写空闲15s后触发
IdleStateHandler
调用源码如下:
最后通过定时任务对任务进行执行。
总结
上面就是关于Netty实现RPC中出入、站处理器的内容讲解,中间也介绍了SimpleChannelInbound
、ChannelDuplexHandler
的核心内容