阅读量:0
流程
第一步:上传jar 、配置文件、split 文件
提交任务的文件到hdfs 路径:本地配置获取: yarn.app.mapreduce.am.staging-dir (默认:/tmp/hadoop-yarn/staging)
/tmp/hadoop-yarn/staging/$user/.staging
/tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.split /tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.splitmetainfo /tmp/hadoop-yarn/staging/hdfs/.staging/job_1722390700850_0001/job.xml
第二步:构建ApplicationSubmissionContext
主要是am启动资源的准备:
- 需要下载到本地资源,第一步上传的文件split、job.xml、任务运行的jar
- token 信息
- 环境变量classpath
CLASSPATH=%PWD% ... %HADOOP_CONF_DIR% %HADOOP_COMMON_HOME%/share/hadoop/common/* %HADOOP_COMMON_HOME%/share/hadoop/common/lib/* %HADOOP_HDFS_HOME%/share/hadoop/hdfs/* %HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/* %HADOOP_YARN_HOME%/share/hadoop/yarn/* %HADOOP_YARN_HOME%/share/hadoop/yarn/lib/* %HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\* %HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib\* job.jar/* job.jar/classes/ job.jar/lib/* %PWD%/*
可以设置 mapreduce.job.user.classpath.first = true ,在生成classpath ,会把计算框架的jar ,拼接在后面。
%HADOOP_CONF_DIR%
%HADOOP_COMMON_HOME%/share/hadoop/common/*
%HADOOP_COMMON_HOME%/share/hadoop/common/lib/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/*
%HADOOP_HDFS_HOME%/share/hadoop/hdfs/lib/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/*
%HADOOP_YARN_HOME%/share/hadoop/yarn/lib/*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce*
%HADOOP_MAPRED_HOME%\share\hadoop\mapreduce\lib*
MRApps.java public static void setClasspath(Map<String, String> environment, Configuration conf) throws IOException { boolean userClassesTakesPrecedence = conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false); String classpathEnvVar = conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false) ? Environment.APP_CLASSPATH.name() : Environment.CLASSPATH.name(); MRApps.addToEnvironment(environment, classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD), conf); if (!userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); // 拼接计算框架的环境变量的jar } /* * We use "*" for the name of the JOB_JAR instead of MRJobConfig.JOB_JAR for * the case where the job jar is not necessarily named "job.jar". This can * happen, for example, when the job is leveraging a resource from the YARN * shared cache. */ MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "*", conf); MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, conf); MRApps.addToEnvironment( environment, classpathEnvVar, MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", conf); MRApps.addToEnvironment( environment, classpathEnvVar, crossPlatformifyMREnv(conf, Environment.PWD) + Path.SEPARATOR + "*", conf); // a * in the classpath will only find a .jar, so we need to filter out // all .jars and add everything else addToClasspathIfNotJar(JobContextImpl.getFileClassPaths(conf), JobContextImpl.getCacheFiles(conf), conf, environment, classpathEnvVar); addToClasspathIfNotJar(JobContextImpl.getArchiveClassPaths(conf), JobContextImpl.getCacheArchives(conf), conf, environment, classpathEnvVar); if (userClassesTakesPrecedence) { MRApps.setMRFrameworkClasspath(environment, conf); } }
- 启动AM命令
%JAVA_HOME%/bin/java -Djava.io.tmpdir=%PWD%/tmp -Dlog4j.configuration=container-log4j.properties -Dyarn.app.container.log.dir=<LOG_DIR> -Dyarn.app.container.log.filesize=0 -Dhadoop.root.logger=info,CLA -Dhadoop.root.logfile=syslog -Dfile.encoding=UTF-8 org.apache.hadoop.mapreduce.v2.app.MRAppMaster 1><LOG_DIR>/stdout 2><LOG_DIR>/stderr
- AM 需要的启动资源
{AllocationRequestId: -1, Priority: 0, Capability: <memory:1536, vCores:1>, # Containers: 1, Location: *, Relax Locality: true, Execution Type Request: null, Node Label Expression: null}
第三步、提交RM
远程调用 ClientRMService.submitApplication ,ClientRMService 默认8032 端口RPCServer ,client 端服务接口,实现ApplicationClientProtocol 协议。
ApplicationClientProtocol : getNewApplication submitApplication failApplicationAttempt forceKillApplication getClusterMetrics getClusterNodes getQueueInfo getQueueUserAcls moveApplicationAcrossQueues getNewReservation submitReservation updateReservation deleteReservation listReservations getNodeToLabels getLabelsToNodes getClusterNodeLabels updateApplicationPriority signalToContainer updateApplicationTimeouts getResourceProfiles getResourceProfile getResourceTypeInfo getAttributesToNodes getClusterNodeAttributes getNodesToAttributes