flink1.19源码学习-RPC通信(2)

avatar
作者
筋斗云
阅读量:0

flink1.19源码学习-RPC通信

RPC概念

RPC,即远程过程调用(Remote Procedure Call),是一种通过网络从远程计算机程序上请求服务的技术,而无需了解底层网络技术的协议。在RPC中,客户机和服务器位于不同的机器上,客户端通过网络调用在服务器端运行的过程,并将结果发送回客户机。这种技术允许程序像调用本地过程一样调用远程过程,使得跨平台、跨机器的服务调用成为可能。
1.两个进程间的相互调用
2.集群中不同节点服务的通信

在flink中RPC通信主要用的是Apache pekko框架。pekko是akka的一个分支。感兴趣的小伙伴自行了解

案例

通过一个小的案例,帮助我们迅速了解pekko该如何使用

pom.xml

        <dependency>             <groupId>org.apache.pekko</groupId>             <artifactId>pekko-actor_2.12</artifactId>             <version>1.0.1</version>         </dependency>          <dependency>             <groupId>org.apache.pekko</groupId>             <artifactId>pekko-remote_2.12</artifactId>             <version>1.0.1</version>         </dependency> 

PekkoData

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;  @Data @AllArgsConstructor @NoArgsConstructor public class PekkoData {     private String info; }  

PekkoRpcReceiverActor

import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.japi.pf.ReceiveBuilder;  public class PekkoRpcReceiverActor extends AbstractActor {      @Override     public Receive createReceive() {         return ReceiveBuilder.create()                 /**接收到PekkoData消息交给handleMessage处理                  */                 .match(PekkoData.class, this::handleMessage)                 .build();     }      private void handleMessage(PekkoData message) {         /** 获取发送者,发送者对应的就是actorRef */         ActorRef sender = getSender();         ActorRef self = getSelf();         /** 打印 */         System.out.println("PekkoRpcReceiverActor类收到:" +sender + ":发送的=>" + message.getInfo());         /** 回复消息 向发送者sender 回复word 的消息 回复者是当前actorRef*/         /** 4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor  */         sender.tell(new PekkoData("word"),self);     }   }   

PekkoRpcSenderActor

import org.apache.pekko.actor.AbstractActor; import org.apache.pekko.actor.ActorRef; import org.apache.pekko.japi.pf.ReceiveBuilder;  /**  * 继承AbstractActor定义自己的actor  * Actor可以发送和接收消息  */ public class PekkoRpcSenderActor extends AbstractActor {     /**      * 实现接收消息      * @return      */     @Override     public Receive createReceive() {         return ReceiveBuilder.create()                 /**接收到PekkoData消息交给handleMessage处理                  * flink PekkoRpcActor 155行也是这样处理的                  */                 .match(PekkoData.class, this::handleMessage)                 .build();     }       private void handleMessage(final PekkoData message) {         /** 获取发送者,发送者对应的就是actorRef */         ActorRef sender = getSender();         /** 打印 */         System.out.println("PekkoRpcSenderActor类收到:" +sender + ":发送的=>" + message.getInfo());      } }   

Demo

import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Props;  public class Demo {     public static void main(String[] args) {         /**创建actorSystem*/         ActorSystem actorSystem = ActorSystem.create("flink");         /**构建PekkoRpcActor的ActorRef*/         ActorRef pekkoRpcRef = actorSystem.actorOf(Props.create(PekkoRpcReceiverActor.class), "PekkoRpcReceiverActor");         /**构建PekkoRpcSenderActor的ActorRef*/         ActorRef pekkoRpcSenderRef = actorSystem.actorOf(Props.create(PekkoRpcSenderActor.class), "PekkoRpcSenderActor");         /** pekkoRpcSenderActor作为发送者 向PekkoRpcActor发送 hello*/         pekkoRpcRef.tell(new PekkoData("hello"),pekkoRpcSenderRef);     } }  

结果

在这里插入图片描述

这里面有个注意的地方就是所有actor都是通过createReceive方法接收消息的,根据消息类型匹配对应的处理方法。

pekko特性如下:
1、ActorSystem 是管理 Actor 生命周期的组件,Actor 是负责进行通信的组件

2、每个 Actor 都有一个 MailBox,别的 Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。

3、每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。

4、Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor

5、每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中获取一个 Actor,则通过以下的方式来进行 Actor 的
获取:pekko.tcp://flink@localhost:6123/user/rpc/resourcemanager_* 来进行定位

6、如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。

7、通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。

8.如果构建actor进行通信,Pekko版本中必须继承AbstractActor 实现createReceive()方法

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!