一、MapReduce框架原理
1.1InputFormat数据输入
MapTask并行度决定机制
1)数据块(HDFS存储数据单位),物理上把数据分成一块一块
2)数据切片(MapReduce程序计算输入数据的单位):只是在逻辑上对输入进行分片,不在磁盘上将其切成分片进行存储。
1.2FileInputFormat
切片机制:
1)按照文件的内容长度进行切片
2)切片大小,默认等于Block大小
3)切片不考虑数据集整体,而是逐个针对每一个
//获取切片的文件名称 String name = inputSplit.getPath () .getName () ; //根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit ();
文件进行单独切片
案例分析:
1)源码中计算切片大小的公式
Math.max(minSize, Math.min(maxSize, blockSize)); //设置切片的最大值和最小值 mapreduce input.fileinputformat.split.minsize=1 默认值为1 mapreduce input.fileinputformat.split maxsize= Long.MAXValue
2)获取切片信息API
//获取切片的文件名称 String name = inputSplit.getPath () .getName () ; //根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit) context.getInputSplit ();
1.3 TextInputFormat
介绍:是FileInputFormat的实现类,按行读取每条记录(key值存储该行在整个文件中的起始字节偏移量,为LongWritable类型;value值存储该行的内容,不包括任何行终止符,为Text类型
注:FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
//示例 (0,Rich learning form) (20,Intelligent learning engine) (49,Learning more convenient) (74,From the real demand for more close to the enterprise)
1.4CombineTextInputFormat切片机制
应用场景:适用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片(数据切片)中,多个小文件就可以交给一个MapTask处理。
虚拟存储切片最大值设置(最好根据实际的小文件大小情况来设置具体的值)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);//4m
切片机制:
1)虚拟存储过程:将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
2)切片过程:判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片,若不大于则和下一个虚拟存储文件进行合并,共同形成一个切片。
二、MapReduce工作流程
三、Shuffle机制
1.Shuffle机制图解
Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。
2.Partition分区
//默认的Partitioner分区 public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition (K key, V value, int numReduceTasks) { return (key. hashCode () & Integer .MAX VALUE) & numReduceTasks; } }
3.自定义Partitioner
1)自定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 控制分区代码逻辑 return partition; } }
2)在Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
Tasks(5); educejob.setNumR
案例分析:
3.WritableComparable排序
排序分类:
1)部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
2)全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在
处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
3)辅助排序:(GroupingComparator分组)
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
4)二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
自定义排序WritableComparable原理分析
bean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序
@Override public int compareTo(FlowBean bean) { int result; // 按照总流量大小,倒序排列 if (this.sumFlow > bean.getSumFlow()) { result = -1; }else if (this.sumFlow < bean.getSumFlow()) { result = 1; }else { result = 0; } return result; }
4.Combiner合并
4.1Combiner详细介绍
1)Combiner是MR程序中Mapper和Reducer之外的一种组件。
2)Combiner组件的父类就是Reducer。
3)Combiner和Reducer的区别在于运行的位置:
Combiner是在每一个MapTask所在的节点运行;
Reducer是接收全局所有Mapper的输出结果;
4) Combiner的意义就是对每一个MapTask的输出进行局部汇总,以咸小网络传输量。
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv
应该跟Reducer的输入kv类型要对应起来。
4.2自定义Combiner实现
1)自定义一个Combiner继承Reducer,重写Reduce方法
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } outV.set(sum); context.write(key,outV); } }
2)在Job驱动类设置
job.setCombinerClass(WordCountCombiner.class);
四、OutputFormat数据输出
1.OutputFormat接口实现类(默认输出格式是TextOutputFormat)
2.自定义OutputFormat步骤
1)自定义一个类继承FileOutputFormat
2)改写RecordWriter,具体改写write()方法。
五、Join应用
1.Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
2.Map Join
2.1应用场景:适用于一张表特别小或特别大的情况
2.2优点:在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
2.3具体方法: 使用DistributedCache
1)在Mapper的setup阶段,将文件读取到缓存集合中。
2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。 job.addCacheFile(new URI("file:///e:/cache/pd.txt")); //如果是集群运行,需要设置HDFS路径 job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
3.数据清洗(ETL)
E代表Extracte(抽取),T代表Transform(转换),L代表Load(加载)
运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。(清理过程一般只需要运行Mapper程序,不需要运行Reduce程序.