【flink番外篇】11、Flink 并行度设置

avatar
作者
猴君
阅读量:0

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录


本文介绍了Flink的并行度设置的几种方式以及并行度最大的值。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、并行执行

一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,你可以改变特定算子或着整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

1、设置并行度

一个 task 的并行度可以从多个层次指定:

1)、算子层次

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

 // 设置 算子 并行度  static void test1() throws Exception {      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();       DataStream<String> source = env.socketTextStream("server2", 8888)              .map(o -> {                  String[] lines = o.split(",");                  return "name:" + lines[0] + " age: " + lines[1];              }).setParallelism(8);// 设置map的并行度       source.print();      env.execute();  } 

2)、执行环境层次

Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

// 设置 执行环境层次 并行度 static void test2() throws Exception {     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();     env.setParallelism(8);     DataStream<String> source = env.socketTextStream("server2", 8888)             .map(o -> {                 String[] lines = o.split(",");                 return "name:" + lines[0] + " age: " + lines[1];             });      source.print();     env.execute(); } 

3)、客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar 

或者在 Java 程序中,可以通过如下方式指定并行度:
说明:
1、该种方法比较复杂,是不是相当于把Flink自身的客户端实现重新实现了一遍呢?大致逻辑如下,代码示例
2、具体实现可以参考其客户端的实现以及测试用例中的实现。
3、客户端的入口类为org.apache.flink.client.cli.CliFrontend;其测试用例类为org.apache.flink.client.program.ClientTest

import static org.apache.flink.util.Preconditions.checkNotNull;  import java.io.File; import java.net.URL; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.stream.Stream;  import javax.annotation.Nonnull;  import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.client.ClientUtils; import org.apache.flink.client.FlinkPipelineTranslationUtil; import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.deployment.ClusterClientJobClientAdapter; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.MiniClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.DefaultExecutorServiceLoader; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.PipelineExecutor; import org.apache.flink.core.execution.PipelineExecutorFactory; import org.apache.flink.core.execution.PipelineExecutorServiceLoader; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.testutils.InternalMiniClusterExtension; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;  /*  * @Author: alanchan  * @LastEditors: alanchan  * @Description:   */ public class TestParallelismByClientDemo {     private static final String TEST_EXECUTOR_NAME = "test_executor";     private static Plan plan; 	private static Configuration config;     private static final InternalMiniClusterExtension MINI_CLUSTER_RESOURCE = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());     public static void main(String[] args) throws Exception { 		ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); 		env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>()); 		plan = env.createProgramPlan();  		config = new Configuration(); 		config.setString(JobManagerOptions.ADDRESS, "localhost");  		config.set(AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());          // 1、构造PackagedProgram         Configuration configuration = new Configuration(); 		configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME); 		configuration.set(CoreOptions.DEFAULT_PARALLELISM, 2);  		// <dependency> 		// <groupId>org.apache.flink</groupId> 		// <artifactId>flink-runtime</artifactId> 		// <version>${flink.version}</version> 		// </dependency>          String entryPointClass = TestExecute.class.getName();         String jarFilePath = "../examples/flinktest.jar";//打包jar文件的路径         File jarFile = new File(jarFilePath);         List<URL> classpaths = PackagedProgram.getJobJarAndDependencies(jarFile,entryPointClass);      //    Creates an instance that wraps the plan defined in the jar file using the given arguments     //    For generating the plan the class defined in the className parameter is used.     //    private PackagedProgram(     //            @Nullable File jarFile, //jarFile The jar file which contains the plan.     //            List<URL> classpaths, //classpaths Additional classpath URLs needed by the Program.     //            @Nullable String entryPointClassName, //entryPointClassName Name of the class which generates the plan. Overrides the class defined in the jar file manifest.     //            Configuration configuration, //configuration Flink configuration which affects the classloading policy of the Program execution.     //            SavepointRestoreSettings savepointRestoreSettings,     //            String... args) //args Optional. The arguments used to create the pact plan, depend on implementation of the pact plan. See getDescription().         PackagedProgram program = PackagedProgram.newBuilder()                     .setJarFile(jarFile)                     .setUserClassPaths(classpaths)                     .setEntryPointClassName(entryPointClass)                     .setConfiguration(configuration)                     .setSavepointRestoreSettings(SavepointRestoreSettings.fromConfiguration(configuration))                     .setArguments(args)                     .build();          // 2、构造客户端执行环境         // public static void executeProgram(         //     PipelineExecutorServiceLoader executorServiceLoader,         //     Configuration configuration,         //     PackagedProgram program,         //     boolean enforceSingleJobExecution,         //     boolean suppressSysout)         // ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program, false, false);           ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());         ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, program, false, false);     }      public static final class TestExecute {  		public static void main(String[] args) throws Exception { 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();  			for (int i = 0; i < 2; i++) { 				env.fromElements(1, 2).output(new DiscardingOutputFormat<>()); 				JobClient jc = env.executeAsync();  				jc.getJobExecutionResult(); 			} 		} 	}      private static final class TestExecutorServiceLoader implements PipelineExecutorServiceLoader {  		private final ClusterClient<?> clusterClient;  		private final Plan plan;  		TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) { 			this.clusterClient = checkNotNull(clusterClient); 			this.plan = checkNotNull(plan); 		}  		@Override 		public PipelineExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) { 			return new PipelineExecutorFactory() {  				@Override 				public String getName() { 					return "my-name"; 				}  				@Override 				public boolean isCompatibleWith(@Nonnull Configuration configuration) { 					return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET)); 				}  				@Override 				public PipelineExecutor getExecutor(@Nonnull Configuration configuration) { 					return (pipeline, config, classLoader) -> { 						final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM); 						final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(classLoader, plan, config, parallelism);  						final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config); 						jobGraph.addJars(accessor.getJars()); 						jobGraph.setClasspaths(accessor.getClasspaths());  						final JobID jobID = clusterClient.submitJob(jobGraph).get(); 						return CompletableFuture.completedFuture(new ClusterClientJobClientAdapter<>(() -> clusterClient, jobID, classLoader)); 					}; 				} 			}; 		}  		@Override 		public Stream<String> getExecutorNames() { 			throw new UnsupportedOperationException("not implemented"); 		} 	}  } 

4)、系统层次

可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

更多的信息参考下文链接:
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)

2、设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,你可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768。

为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

以上,本文介绍了Flink的并行度设置的几种方式以及并行度最大的值。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!