Hadoop学习(三)

avatar
作者
猴君
阅读量:0

一、MapReduce框架原理

1.1InputFormat数据输入

MapTask并行度决定机制

1)数据块(HDFS存储数据单位),物理上把数据分成一块一块

2)数据切片(MapReduce程序计算输入数据的单位):只是在逻辑上对输入进行分片,不在磁盘上将其切成分片进行存储。

1.2FileInputFormat

切片机制:

1)按照文件的内容长度进行切片

2)切片大小,默认等于Block大小

3)切片不考虑数据集整体,而是逐个针对每一个

//获取切片的文件名称 String name = inputSplit.getPath () .getName () ; //根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit ();

文件进行单独切片

案例分析:

1)源码中计算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize)); //设置切片的最大值和最小值 mapreduce input.fileinputformat.split.minsize=1 默认值为1 mapreduce input.fileinputformat.split maxsize= Long.MAXValue 

2)获取切片信息API

//获取切片的文件名称 String name = inputSplit.getPath () .getName () ; //根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit ();

1.3 TextInputFormat

介绍:是FileInputFormat的实现类,按行读取每条记录(key值存储该行在整个文件中的起始字节偏移量,为LongWritable类型;value值存储该行的内容,不包括任何行终止符,为Text类型

注:FileInputFormat常见的接口实现类包括:TextInputFormatKeyValueTextInputFormatNLineInputFormatCombineTextInputFormat和自定义InputFormat等。

//示例 (0,Rich learning form) (20,Intelligent learning engine) (49,Learning more convenient) (74,From the real demand for more close to the enterprise)

1.4CombineTextInputFormat切片机制

应用场景:适用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片(数据切片)中,多个小文件就可以交给一个MapTask处理。

虚拟存储切片最大值设置(最好根据实际的小文件大小情况来设置具体的值

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);//4m

切片机制:

1)虚拟存储过程:将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现小切片)

2)切片过程:判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片,若不大于则和下一个虚拟存储文件进行合并,共同形成一个切片。

二、MapReduce工作流程

三、Shuffle机制

1.Shuffle机制图解

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

2.Partition分区

//默认的Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<K, V> {   public int getPartition (K key, V value, int numReduceTasks) {     return (key. hashCode () & Integer .MAX VALUE) & numReduceTasks;   } }

3.自定义Partitioner

1)自定义类继承Partitioner,重写getPartition()方法

public class CustomPartitioner extends Partitioner<Text, FlowBean> {           @Override           public int getPartition(Text key, FlowBean value, int numPartitions) {          // 控制分区代码逻辑                    return partition;          } }

2)在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

Tasks(5); educejob.setNumR

案例分析:

3.WritableComparable排序

排序分类:

1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在
处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

3)辅助排序:(GroupingComparator分组)

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

自定义排序WritableComparable原理分析

bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override public int compareTo(FlowBean bean) {  	int result; 		 	// 按照总流量大小,倒序排列 	if (this.sumFlow > bean.getSumFlow()) { 		result = -1; 	}else if (this.sumFlow < bean.getSumFlow()) { 		result = 1; 	}else { 		result = 0; 	}  	return result; }

4.Combiner合并

4.1Combiner详细介绍

1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
2)Combiner组件的父类就是Reducer。
3)Combiner和Reducer的区别在于运行的位置:
  Combiner是在每一个MapTask所在的节点运行;
  Reducer是接收全局所有Mapper的输出结果;
4) Combiner的意义就是对每一个MapTask的输出进行局部汇总,以咸小网络传输量。
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv
应该跟Reducer的输入kv类型要对应起来。

4.2自定义Combiner实现

1)自定义一个Combiner继承Reducer,重写Reduce方法

public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {     private IntWritable outV = new IntWritable();     @Override     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {         int sum = 0;         for (IntWritable value : values) {             sum += value.get();         }         outV.set(sum);         context.write(key,outV);     } }

2)在Job驱动类设置

job.setCombinerClass(WordCountCombiner.class);

四、OutputFormat数据输出

1.OutputFormat接口实现类(默认输出格式是TextOutputFormat)

2.自定义OutputFormat步骤

1)自定义一个类继承FileOutputFormat

2)改写RecordWriter,具体改写write()方法。

五、Join应用

1.Reduce Join

Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

2.Map Join

2.1应用场景:适用于一张表特别小或特别大的情况

2.2优点:Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

2.3具体方法: 使用DistributedCache

1)Mapper的setup阶段,将文件读取到缓存集合中

2)在Driver驱动中加载缓存。

//缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

3.数据清洗(ETL)

E代表Extracte(抽取),T代表Transform(转换),L代表Load(加载)

运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。(清理过程一般只需要运行Mapper程序,不需要运行Reduce程序.

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!