教你怎么使用hadoop来提取文件中的指定内容

网友投稿 226 2022-10-23


教你怎么使用hadoop来提取文件中的指定内容

一、需求

把以下txt中含“baidu”字符串的链接输出到一个文件,否则输出到另外一个文件。

二、步骤

1.LogMapper.java

package com.whj.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

// 不做任何处理

context.write(value,NullWritable.get());

}

}

2.LogReducer.java

package com.whj.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer {

@Override

protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

for (NullWritable value : values) {

context.write(key,NullWritable.get());

}

}

}

3.LogOutputFormat.java

package com.whj.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat {

@Override

public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {

LogRecordWriter lrw = new LogRecordWriter(job);

return lrw;

}

}

4.LogRecordWriter.java

package com.whj.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.RecordWriter;

import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter {

private FSDataOutputStream baiduOut;//ctrl+alt+f

private FSDataOutputStream otherOut;

public LogRecordWriter(TaskAttemptContext job) throws IOException {

//创建两条流

FileSystem fs = FileSystem.get(job.getConfiguration());

baiduOut = fs.create(new Path("D:\\temp\\outputformat.log"));

otherOut = fs.create(new Path("D:\\temp\\other.log"));

}

@Override

public void write(Text key, NullWritable nullWritable) throws IOException, InterruptedException {

// 具体写

String log = key.toString();

if(log.contains("baidu")){

baiduOut.writeBytes(log+"\n");

}else{

otherOut.writeBytes(log+"\n");

}

}

@Override

public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {

//关流

IOUtils.closeStream(baiduOut);

IOUtils.closeStream(otherOut);

}

}

5.LogDriver.java

package com.whj.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {

public static void main(String[] args)NwnQGxA throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(LogDriver.class);

job.NwnQGxAsetMapperClass(LogMapper.class);

job.setReducerClass(LogReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(NullWritable.class);

//设置自定义的 outputformat

job.setOutputFormatClass(LogOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path("D:\\input"));

// 虽 然 我 们 自 定 义 了 outputformat , 但 是 因 为 我 们 的 outputformat 继承自fileoutputformat

//而 fileoutputformat 要输出一个_SUCCESS 文件,所以在这还得指定一个输出目录

FileOutputFormat.setOutputPath(job, new Path("D:\\temp\\logoutput"));

boolean b = job.waitForCompletion(true);

System.exit(b ? 0 : 1);

} }

三、结果


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:2021Kali系列 -- 目录扫描(nikto)
下一篇:SSH远程登录软件的使用教程
相关文章

 发表评论

暂时没有评论,来抢沙发吧~