try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new JsonDecoder());
pipeline.addLast(new JsonEncoder());
pipeline.addLast(new RpcInboundHandler(rpcServices));
}
})
.channel(NioServerSocketChannel.class);
ChannelFuture future = bootstrap.bind(port).sync();
log.info(“RPC 服务器启动, 监听端口:” + port);
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}).start();
}
}
RpcServerInboundHandler 负责处理RPC请求
@Slf4j
public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> rpcServices;
public RpcServerInboundHandler(Map<String, Object> rpcServices){
this.rpcServices = rpcServices;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info(“客户端连接成功,{}”, ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
log.info(“客户端断开连接,{}”, ctx.channel().remoteAddress());
ctx.channel().close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg){
RpcRequest rpcRequest = (RpcRequest) msg;
log.info(“接收到客户端请求, 请求接口:{}, 请求方法:{}”, rpcRequest.getClassName(), rpcRequest.getMethodName());
RpcResponse response = new RpcResponse();
response.setRequestId(rpcRequest.getRequestId());
Object result = null;
try {
result = this.handleRequest(rpcRequest);
response.setResult(result);
} catch (Exception e) {
e.printStackTrace();
response.setSuccess(false);
response.setErrorMessage(e.getMessage());
}
log.info(“服务器响应:{}”, response);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info(“连接异常”);
ctx.channel().close();
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
log.info(“客户端已超过60秒未读写数据, 关闭连接.{}”,ctx.channel().remoteAddress());
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
private Object handleRequest(RpcRequest rpcRequest) throws Exception{
Object bean = rpcServices.get(rpcRequest.getClassName());
if(bean == null){
throw new RuntimeException("未找到对应的服务: " + rpcRequest.getClassName());
}
Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
method.setAccessible(true);
return method.invoke(bean, rpcRequest.getParameters());
}
}
四、RPC客户端
========
/**
- RPC远程调用的客户端
*/
@Slf4j
@Component
public class RpcClient {
@Value(“${rpc.remote.ip}”)
private String remoteIp;
@Value(“${rpc.remote.port}”)
private int port;
private Bootstrap bootstrap;
// 储存调用结果
private final Map<String, SynchronousQueue> results = new ConcurrentHashMap<>();
public RpcClient(){
}
@PostConstruct
public void init(){
bootstrap = new Bootstrap().remoteAddress(remoteIp, port);
NioEventLoopGroup worker = new NioEventLoopGroup(1);
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 10));
pipeline.addLast(new JsonEncoder());
pipeline.addLast(new JsonDecoder());
pipeline.addLast(new RpcClientInboundHandler(results));
}
});
}
public RpcResponse send(RpcRequest rpcRequest) {
RpcResponse rpcResponse = null;
rpcRequest.setRequestId(UUID.randomUUID().toString());
Channel channel = null;
try {
channel = bootstrap.connect().sync().channel();
log.info(“连接建立, 发送请求:{}”, rpcRequest);
channel.writeAndFlush(rpcRequest);
SynchronousQueue queue = new SynchronousQueue<>();
results.put(rpcRequest.getRequestId(), queue);
// 阻塞等待获取响应
rpcResponse = queue.take();
results.remove(rpcRequest.getRequestId());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(channel != null && channel.isActive()){
channel.close();
}
}
return rpcResponse;
}
}
RpcClientInboundHandler负责处理服务端的响应
@Slf4j
public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter {
private Map<String, SynchronousQueue> results;
public RpcClientInboundHandler(Map<String, SynchronousQueue> results){
this.results = results;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcResponse rpcResponse = (RpcResponse) msg;
log.info(“收到服务器响应:{}”, rpcResponse);
if(!rpcResponse.isSuccess()){
throw new RuntimeException(“调用结果异常,异常信息:” + rpcResponse.getErrorMessage());
}
// 取出结果容器,将response放进queue中
SynchronousQueue queue = results.get(rpcResponse.getRequestId());
queue.put(rpcResponse);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state() == IdleState.ALL_IDLE){
log.info(“发送心跳包”);
RpcRequest request = new RpcRequest();
request.setMethodName(“heartBeat”);
ctx.channel().writeAndFlush(request);
}
}else{
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
log.info(“异常:{}”, cause.getMessage());
ctx.channel().close();
}
}
接口代理
为了使客户端像调用本地方法一样调用远程服务,我们需要对接口进行动态代理。
代理类实现
@Component
public class RpcProxy implements InvocationHandler {
@Autowired
private RpcClient rpcClient;
@Override
public Object invoke(Object proxy, Method method, Object[] args){
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameters(args);
rpcRequest.setParameterTypes(method.getParameterTypes());
RpcResponse rpcResponse = rpcClient.send(rpcRequest);
return rpcResponse.getResult();
}
}
实现FactoryBean接口,将生产动态代理类纳入 Spring 容器管理。
public class RpcFactoryBean implements FactoryBean {
private Class interfaceClass;
@Autowired
private RpcProxy rpcProxy;
public RpcFactoryBean(Class interfaceClass){
this.interfaceClass = interfaceClass;
}
@Override
public T getObject(){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy);
}
@Override
public Class<?> getObjectType() {
return interfaceClass;
}
}
自定义类路径扫描器,扫描包下的RPC接口,动态生产代理类,纳入 Spring 容器管理
public class RpcScanner extends ClassPathBeanDefinitionScanner {
public RpcScanner(BeanDefinitionRegistry registry) {
super(registry);
}
@Override
protected Set doScan(String… basePackages) {
自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!
如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)
完结
Redis基于内存,常用作于缓存的一种技术,并且Redis存储的方式是以key-value的形式。Redis是如今互联网技术架构中,使用最广泛的缓存,在工作中常常会使用到。Redis也是中高级后端工程师技术面试中,面试官最喜欢问的问题之一,因此作为Java开发者,Redis是我们必须要掌握的。
Redis 是 NoSQL 数据库领域的佼佼者,如果你需要了解 Redis 是如何实现高并发、海量数据存储的,那么这份腾讯专家手敲《Redis源码日志笔记》将会是你的最佳选择。
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)**
完结
Redis基于内存,常用作于缓存的一种技术,并且Redis存储的方式是以key-value的形式。Redis是如今互联网技术架构中,使用最广泛的缓存,在工作中常常会使用到。Redis也是中高级后端工程师技术面试中,面试官最喜欢问的问题之一,因此作为Java开发者,Redis是我们必须要掌握的。
Redis 是 NoSQL 数据库领域的佼佼者,如果你需要了解 Redis 是如何实现高并发、海量数据存储的,那么这份腾讯专家手敲《Redis源码日志笔记》将会是你的最佳选择。
[外链图片转存中…(img-39exHIx2-1712683894885)]
《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!