flink1.19源码学习-RPC通信
RPC概念
RPC,即远程过程调用(Remote Procedure Call),是一种通过网络从远程计算机程序上请求服务的技术,而无需了解底层网络技术的协议。在RPC中,客户机和服务器位于不同的机器上,客户端通过网络调用在服务器端运行的过程,并将结果发送回客户机。这种技术允许程序像调用本地过程一样调用远程过程,使得跨平台、跨机器的服务调用成为可能。
1.两个进程间的相互调用
2.集群中不同节点服务的通信
在flink中RPC通信主要用的是Apache pekko框架。pekko是akka的一个分支。感兴趣的小伙伴自行了解
案例
通过一个小的案例,帮助我们迅速了解pekko该如何使用
pom.xml
<dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-actor_2.12</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>org.apache.pekko</groupId> <artifactId>pekko-remote_2.12</artifactId> <version>1.0.1</version> </dependency>
PekkoData
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class PekkoData { private String info; }
PekkoRpcReceiverActor
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.japi.pf.ReceiveBuilder; public class PekkoRpcReceiverActor extends AbstractActor { @Override public Receive createReceive() { return ReceiveBuilder.create() /**接收到PekkoData消息交给handleMessage处理 */ .match(PekkoData.class, this::handleMessage) .build(); } private void handleMessage(PekkoData message) { /** 获取发送者,发送者对应的就是actorRef */ ActorRef sender = getSender(); ActorRef self = getSelf(); /** 打印 */ System.out.println("PekkoRpcReceiverActor类收到:" +sender + ":发送的=>" + message.getInfo()); /** 回复消息 向发送者sender 回复word 的消息 回复者是当前actorRef*/ /** 4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor */ sender.tell(new PekkoData("word"),self); } }
PekkoRpcSenderActor
import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.japi.pf.ReceiveBuilder; /** * 继承AbstractActor定义自己的actor * Actor可以发送和接收消息 */ public class PekkoRpcSenderActor extends AbstractActor { /** * 实现接收消息 * @return */ @Override public Receive createReceive() { return ReceiveBuilder.create() /**接收到PekkoData消息交给handleMessage处理 * flink PekkoRpcActor 155行也是这样处理的 */ .match(PekkoData.class, this::handleMessage) .build(); } private void handleMessage(final PekkoData message) { /** 获取发送者,发送者对应的就是actorRef */ ActorRef sender = getSender(); /** 打印 */ System.out.println("PekkoRpcSenderActor类收到:" +sender + ":发送的=>" + message.getInfo()); } }
Demo
import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props; public class Demo { public static void main(String[] args) { /**创建actorSystem*/ ActorSystem actorSystem = ActorSystem.create("flink"); /**构建PekkoRpcActor的ActorRef*/ ActorRef pekkoRpcRef = actorSystem.actorOf(Props.create(PekkoRpcReceiverActor.class), "PekkoRpcReceiverActor"); /**构建PekkoRpcSenderActor的ActorRef*/ ActorRef pekkoRpcSenderRef = actorSystem.actorOf(Props.create(PekkoRpcSenderActor.class), "PekkoRpcSenderActor"); /** pekkoRpcSenderActor作为发送者 向PekkoRpcActor发送 hello*/ pekkoRpcRef.tell(new PekkoData("hello"),pekkoRpcSenderRef); } }
结果
这里面有个注意的地方就是所有actor都是通过createReceive方法接收消息的,根据消息类型匹配对应的处理方法。
pekko特性如下:
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责进行通信的组件
2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor
5、每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中获取一个 Actor,则通过以下的方式来进行 Actor 的
获取:pekko.tcp://flink@localhost:6123/user/rpc/resourcemanager_* 来进行定位
6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。
8.如果构建actor进行通信,Pekko版本中必须继承AbstractActor 实现createReceive()方法