使用MapReduce处理Jsonline数据
Map阶段
在Map阶段,主要任务是将输入的Jsonline文件解析成键值对,具体步骤如下:
1、读取每行数据:从输入的Jsonline文件中逐行读取数据。
2、解析为Json对象:使用工具类(如FastJson)将每一行数据解析成Json对象。
3、封装到Bean类中:将Json对象的属性值提取出来,并封装到自定义的Bean类中,一个学生信息的Bean类可能包含id、name、age、sex和className等属性。
4、输出键值对:根据业务需求,将Bean类实例作为键值对的Value部分,并选择合适的Key进行输出。
示例代码片段:
public class StudentInfoMapper extends Mapper<LongWritable, Text, Text, BeanTest> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 解析JSON字符串为JSONObject对象 JSONObject jsonObject = JSON.parseObject(value.toString()); // 创建BeanTest对象并设置属性 BeanTest bean = new BeanTest(); bean.setId(jsonObject.getString("id")); bean.setName(jsonObject.getString("name")); // ... 其他属性赋值 // 输出键值对 context.write(new Text(bean.getSex()), bean); } }
Partition阶段
Partition阶段用于将Map阶段的输出按键进行分区,以便在Reduce阶段能够更高效地处理数据,这一阶段是自动进行的,但可以通过自定义Partitioner来控制数据的分区逻辑。
示例代码片段:
public class CustomPartitioner extends Partitioner<Text, BeanTest> { @Override public int getPartition(Text key, BeanTest value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
Reduce阶段
Reduce阶段的主要任务是对相同Key的数据进行合并处理,生成最终结果,具体步骤如下:
1、接收数据:Reducer会接收到所有具有相同Key的中间数据。
2、合并处理:根据业务需求,对这些数据进行合并处理,可以计算每个性别的学生人数。
3、输出结果:将处理后的结果写入到HDFS或其他存储系统中。
示例代码片段:
public class StudentInfoReducer extends Reducer<Text, BeanTest, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<BeanTest> values, Context context) throws IOException, InterruptedException { int count = 0; for (BeanTest bean : values) { count++; } context.write(key, new IntWritable(count)); } }
Driver阶段
Driver阶段用于配置和启动MapReduce作业,包括设置输入输出路径、指定Mapper和Reducer类等,还可以进行一些优化操作,如自定义FileOutputFormat以实现特定的文件输出格式。
示例代码片段:
public class StudentInfoDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "student info process"); job.setJarByClass(StudentInfoDriver.class); job.setMapperClass(StudentInfoMapper.class); job.setReducerClass(StudentInfoReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
优化:自定义FileOutputFormat
为了实现特定的文件输出格式,可以自定义FileOutputFormat类,以下是一个简化的示例:
public class CustomFileOutputFormat extends FileOutputFormat<Text, BeanTest> { @Override public RecordWriter<Text, BeanTest> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new CustomRecordWriter(job); } } public class CustomRecordWriter extends RecordWriter<Text, BeanTest> { private BufferedWriter writer; private String delimiter; public CustomRecordWriter(TaskAttemptContext context) throws IOException { // 初始化writer和分隔符等资源 } @Override public void write(Text key, BeanTest value) throws IOException { // 将键值对写入文件,格式可自定义 } @Override public void close(TaskAttemptContext context) throws IOException { // 关闭writer等资源 } }
修改DriverTest类以使用自定义的FileOutputFormat:
FileOutputFormat.setOutputFormatClass(job, CustomFileOutputFormat.class);