【MapReduce】MR 框架原理 之 OutputFormat 数据输出

网友投稿 365 2022-11-06


【MapReduce】MR 框架原理 之 OutputFormat 数据输出

文章目录

​​常见的OutputFormat实现类​​

​​  ☠自定义OutputFormat​​

​​案例​​

​​▪ 需求分析​​​​▪ 代码实现​​

​​自定义FilterOutputFormat​​​​Mapper阶段​​​​Reducer阶段​​​​Driver阶段​​

OutputFormat是MR输出的基类,所有实现MR输出都实现了OutputFormat接口。

常见的OutputFormat实现类

1.文本输出TextOutputFormat

默认的输出格式是TestOutputFormat,它​​把每条记录写为文本行​​。它的键和值可以是任意类型,因为TestOutputFormat调用 toString()方法把它们转换为字符串。

2.SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MR任务的输入,这便是一种好的输出格式,因为它​​格式紧凑,很容易被压缩​​

3.自定义OutputFormat

根据用户需求,​​自定义实现输出​​

案例

▪ 需求分析

​​返回顶部​​

▪ 代码实现

自定义FilterOutputFormat

继承自FileOutputFormat,同时​​泛型应当与Reduce一致​​。同时重写RecordWriter,自定义输出流。

package 第三章_MR框架原理.OutputFormat数据输出;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class FilterOutputFormat extends FileOutputFormat { @Override public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new FliterRecordWriter(job); }}

package 第三章_MR框架原理.OutputFormat数据输出;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import java.io.IOException;public class FliterRecordWriter extends RecordWriter { // 创建流对象 FSDataOutputStream fosatguigu; FSDataOutputStream fosother; // 构造输出流 public FliterRecordWriter(TaskAttemptContext job) { try { // 1.获取文件系统 FileSystem fs = FileSystem.get(job.getConfiguration()); // 2.创建输出到atguigu.log fosatguigu = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\atguigu.log")); // 3.创建输出到other.log fosother = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\other.log")); } catch (Exception e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断key的内容是否包含atguigu if (key.toString().contains("atguigu")) { fosatguigu.write(key.toString().getBytes()); } else { fosother.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { // 关闭IO流 IOUtils.closeStream(fosatguigu); IOUtils.closeStream(fosother); }}

​​返回顶部​​

Mapper阶段

读取数据,由于不需要进行内部操作,直接写出

package 第三章_MR框架原理.OutputFormat数据输出;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class FilterMapper extends Mapper { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// // 写出 context.write(value,NullWritable.get()); }}

​​返回顶部​​

Reducer阶段

reducer阶段只是将读取的数据输出,这里​​注意将读取的内容进行格式化,增加换行​​,否则最终文件以字符串追加形式,一整行展现。

package 第三章_MR框架原理.OutputFormat数据输出;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class FilterReducer extends Reducer { Text k = new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // // 写出,添加换行符 String line = key.toString(); line = line + "\r\n"; k.set(line); // 循环防止有重复 for (NullWritable value:values){ context.write(k,NullWritable.get()); } }}

​​返回顶部​​

Driver阶段

package 第三章_MR框架原理.OutputFormat数据输出;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FilterDriver { public static void main(String[] args) { Job job = null; Configuration conf = new Configuration(); try { // 获取job job = Job.getInstance(conf); // 配置 job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setJarByClass(FilterDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); // 设置输入输出路径 // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\log.txt")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Filteroutput")); // 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java的jdk基础知识点总结
下一篇:【skLearn 回归模型】线性回归 ---- Linear Regression
相关文章

 发表评论

暂时没有评论,来抢沙发吧~