Hadoop MultipleOutputs输出到多个文件中的实现方法

网友投稿 356 2023-03-23


Hadoop MultipleOutputs输出到多个文件中的实现方法

Hadoop MultipleOutputs输出到多个文件中的实现方法

1.输出到多个文件或多个文件夹:

驱动中不需要额外改变,只需要在MapClass或Reduce类中加入如下代码

private MultipleOutputs mos;

public void setup(Context context) throws IOException,InterruptedException {

  mos = new MultipleOutputs(context);

}

public void cleanup(Context context) throws IOException,InterruptedException {

  mos.close();

}

然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value);

在MapClass或Reduce中使用,输出时也会有默认的文件part-m-00*或part-r-00*,不过这些文件是无内容的,大小为0. 而且只有part-m-00*会传给Reduce。

注意:multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。

如果baseOutputPath不包含文件分隔符“/”,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);

如果包含文件分隔符“/”,例如baseOutputPath=“029070-99999/1901/part”,那么输出文件则为029070-99999/1901/part-r-nnnnn

2.案例-需求

需求,下面是有些测试数据,要对这些数据按类目输出到output中:

1512,iphone5s,4英寸,指纹识别,A7处理器,64位,M7协处理器,低功耗

1512,iphone5,4英寸,A6处理器,IOS7

1512,iphone4s,3.5英寸,A5处理器,双核,经典

50019780,ipad,9.7英寸,retina屏幕,丰富的应用

50019780,yoga,联想,待机18小时,外形独特

50019780,nexus 7,华硕&google,7英寸

50019780,ipad mini 2,retina显示屏,苹果,7.9英寸

1101,macbook air,苹果超薄,OS X mavericks

1101,macbook pro,苹果,OS X lion

1101,thinkpad yoga,联想,windows 8,超级本

3.Mapper程序:

package cn.edu.bjut.multioutput;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWhttp://ritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class MultiOutPutMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString().trim();

if(null != line && 0 != line.length()) {

String[] arr = line.split(",");

context.write(new IntWritable(Integer.parseInt(arr[0])), value);

}

}

}

4.Reducer程序:

package cn.edu.bjut.multioutput;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class MultiOutPutReducer extends

Reducer {

private MultipleOutputs multipleOutputs = null;

@Override

protected void reduce(IntWritable key, Iterable values, Context context)

throws IOException, InterruptedException {

for(Text text : values) {

multipleOutputs.write("KeySpilt", NullWritable.get(), text, key.toString()+"/");

multipleOutputs.write("AllPart", NullWritable.get(), text);

}

}

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

multipleOutputs = new MultipleOutputs(context);

}

@Override

protected void cleanup(Context context)

throws IOException, InterruptedException {

if(null != multipleOutputs) {

multipleOutputs.close();

multipleOutputs = null;

}

}

}

5.主程序:

package cn.edu.bjut.multioutput;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MainJob {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = new Job(conf, "aaa");

job.setJarByClass(MainJob.class);

job.setMapperClass(MultiOutPutMapper.class);

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(Text.class);

job.setReducerClass(MultiOutPutReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

MultipleOutputs.addNamedOutput(job, "KeySpilt", TextOutputFormat.class, NullWritable.class, Text.class);

MultipleOutputs.addNamedOutput(job, "AllPart", TextOutputFormat.class, NullWritable.class, Text.class);

Path outPath = new Path(args[1]);

FileSystem fs = FileSystem.get(conf);

if(fs.exists(outPath)) {

fs.delete(outPath, true);

}

FileOutputFormat.setOutputPath(job, outPath);

job.waitForCompletion(true);

}

}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:BootStrap 标题设置跨行无效的解决方法
下一篇:Spring的连接数据库以及JDBC模板(实例讲解)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~