《YARN源码分析一》client 任务提交准备阶段

avatar
作者
猴君
阅读量:0

流程

第一步:上传jar 、配置文件、split 文件

提交任务的文件到hdfs 路径:本地配置获取: yarn.app.mapreduce.am.staging-dir (默认:/tmp/hadoop-yarn/staging)
/tmp/hadoop-yarn/staging/$user/.staging

/tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.split         /tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.splitmetainfo /tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.xml 

第二步:构建ApplicationSubmissionContext

在这里插入图片描述
主要是am启动资源的准备:
在这里插入图片描述

  • 需要下载到本地资源,第一步上传的文件split、job.xml、任务运行的jar
  • token 信息
  • 环境变量classpath
CLASSPATH=%PWD% ... %HADOOP_CONF_DIR% %HADOOP_COMMON_HOME%/share/hadoop/common/* %HADOOP_COMMON_HOME%/share/hadoop/common/lib/* %HADOOP_HDFS_HOME%/share/hadoop/hdfs/* %HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/* %HADOOP_YARN_HOME%/share/hadoop/yarn/* %HADOOP_YARN_HOME%/share/hadoop/yarn/lib/* %HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\* %HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib\* job.jar/* job.jar/classes/ job.jar/lib/* %PWD%/* 

可以设置 mapreduce.job.user.classpath.first = true ,在生成classpath ,会把计算框架的jar ,拼接在后面。
%HADOOP_CONF_DIR%
%HADOOP_COMMON_HOME%/share/hadoop/common/*
%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib*

MRApps.java public static void setClasspath(Map<String, String> environment,       Configuration conf) throws IOException {     boolean userClassesTakesPrecedence =        conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);      String classpathEnvVar =       conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)         ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name();      MRApps.addToEnvironment(environment,       classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf);     if (!userClassesTakesPrecedence) {       MRApps.setMRFrameworkClasspath(environment, conf); // 拼接计算框架的环境变量的jar      }     /*      * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for      * the case where the job jar is not necessarily named "job.jar". This can      * happen, for example, when the job is leveraging a resource from the YARN      * shared cache.      */     MRApps.addToEnvironment(         environment,         classpathEnvVar,         MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf);     MRApps.addToEnvironment(         environment,         classpathEnvVar,         MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf);     MRApps.addToEnvironment(         environment,         classpathEnvVar,         MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf);     MRApps.addToEnvironment(         environment,         classpathEnvVar,         crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf);     // a * in the classpath will only find a .jar, so we need to filter out     // all .jars and add everything else     addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf),         JobContextImpl.getCacheFiles(conf),         conf,         environment, classpathEnvVar);     addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf),         JobContextImpl.getCacheArchives(conf),         conf,         environment, classpathEnvVar);     if (userClassesTakesPrecedence) {       MRApps.setMRFrameworkClasspath(environment, conf);     }   } 
  • 启动AM命令
%JAVA_HOME%/bin/java -Djava.io.tmpdir=%PWD%/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=info,CLA -Dhadoop.root.logfile=syslog   -Dfile.encoding=UTF-8  org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr  
  • AM 需要的启动资源
{AllocationRequestId: -1, Priority: 0, Capability: <memory:1536, vCores:1>, # Containers: 1, Location: *, Relax Locality: true, Execution Type Request: null, Node Label Expression: null} 

第三步、提交RM

远程调用 ClientRMService.submitApplication ,ClientRMService 默认8032 端口RPCServer ,client 端服务接口,实现ApplicationClientProtocol 协议。

ApplicationClientProtocol : getNewApplication submitApplication failApplicationAttempt forceKillApplication getClusterMetrics getClusterNodes getQueueInfo getQueueUserAcls moveApplicationAcrossQueues getNewReservation submitReservation updateReservation deleteReservation listReservations getNodeToLabels getLabelsToNodes getClusterNodeLabels updateApplicationPriority signalToContainer updateApplicationTimeouts getResourceProfiles getResourceProfile getResourceTypeInfo getAttributesToNodes getClusterNodeAttributes getNodesToAttributes 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!