java 中自定义OutputFormat的实例详解

网友投稿 249 2023-04-17


java 中自定义OutputFormat的实例详解

java 中 自定义OutputFormat的实例详解

实例代码:

package com.ccse.hadoop.outputformat;

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.JobContext;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.OutputCommitter;

import org.apache.hadoop.mapreduce.OutputFormat;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;

public class MySelfOutputFormatApp {

public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";

public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";

public final static String OUTPUT_FILENAME = "/abc";

public static void main(String[] args) throws IOException, URISyntaxException,

ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);

fileSystem.delete(new Path(OUTPUT_PATH), true);

Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());

job.setJarByClass(MySelfOutputFormatApp.class);

FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));

job.setMapperClass(MyMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

job.setReducerClass(MyReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

job.setOuhttp://tputFormatClass(MyselfOutputFormat.class);

job.waitForCompletion(true);

}

public static class MyMapper extends Mapper {

private Text word = new Text();

private LongWritable writable = new LongWritable(1);

@Override

protected void map(LongWritable key, Text value,

Mapper.Context context)

throws IOException, InterruptedException {

if (value != null) {

String line = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

word.set(tokenizer.nextToken());

context.write(word, writable);

}

}

}

}

public static class MyReducer extends Reducer {

@Override

protected void reduce(Text key, Iterable values,

Reducer.Context context)

throws IOException, InterruptedException {

long sum = 0;

for (LongWritable value : values) {

sum += value.get();

}

context.write(key, new LongWritable(sum));

}

}

public static class MyselfOutputFormat extends OutputFormat {

private FSDataOutputStream outputStream = null;

@Override

public RecordWriter getRecordWriter(

TaskAttemptContext context) throws IOException,

InterruptedException {

try {

FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());

//指定文件的输出路径

final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH

+ MySelfOutputFormatApp.OUTPUT_FILENAME);

this.outputStream = fileSystem.create(path, false);

} catch (URISyntaxException e) {

e.printStackTrace();

}

return new MySelfRecordWriter(outputStream);

}

@Override

public void checkOutphQKfOutSpecs(JobContext context) throws IOException,

InterruptedException {

}

@Override

public OutputCommitter getOutputCommitter(TaskAttemptContext context)

throws IOException, InterruptedException {

return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);

}

}

public static class MySelfRecordWriter extends RecordWriter {

private FSDataOutputStream outputStream = null;

public MySelfRecordWriter(FSDataOutputStream outputStream) {

this.outputStream = outputStream;

}

@Override

public void write(Text key, LongWritable value) throws IOException,

InterruptedException {

this.outputStream.writeBytes(key.toString());

this.outputStream.writeBytes("\t");

this.outputStream.writeLong(value.get());

}

@Override

public void close(TaskAttemptContext context) throws IOException,

InterruptedException {

this.outputStream.close();

}

}

}

2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JAVA如何获取客户端IP地址和MAC地址
下一篇:模块接口测试(模块接口测试属于黑盒测试嘛)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~