java中的接口是类吗
199
2022-11-06
【MapReduce】MR 框架原理 之 Partitioner分区
文章目录
Partition分区
☠ 默认分区 --- HashPartitioner
▪ 案例 --- WordCount
Mapper阶段Reducer阶段Driver阶段
☠ 自定义Partitioner分区
▪ 自定义分区基本步骤▪ 案例
需求分析代码实现
PhoneBean封装类ProvincePartitioner分区类Mapper阶段Reducer阶段Driver阶段
★ 分区总结
Partition分区
在进行数据处理的时候要求将统计结果按照条件输出到不同文件中,这是就会将数据按照不同的条件进行区域划分,统计计算后输出。这就涉及到了分区的概念。比如:将统计结果按照手机归属地不同省份输出到不同文件中,这时就需要按照省份进行分区
☠ 默认分区 — HashPartitioner
public class HashPartitioner
默认分区是根据key的hashCode对ReduceTasks个数取模得到的,用户没法空值哪个key存储到那个分区。
▪ 案例 — WordCount
Mapper阶段
/** * Mapper 阶段 * KEYIN 输入数据的key类型 * VALUEIN 输入数据的value类型 * KEYOUT 输出数据的key类型 * VALUEOUT 输出数据的value类型 */public class wordCountMapper extends Mapper
Reducer阶段
/** * Reducer 阶段 * KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型 * KEYOUT 最终输出数据的key类型 * VALUEOUT 最终输出数据的value类型 */public class wordCountReducer extends Reducer
Driver阶段
public class wordCountDriver { public static void main(String[] args) { Configuration conf = new Configuration(); Job job = null; try { // 1.获取job对象 job = Job.getInstance(conf); // 2.设置jar存储位置 job.setJarByClass(wordCountDriver.class); // 3.关联map、reduce类 job.setMapperClass(wordCountMapper.class); job.setReducerClass(wordCountReducer.class); // 4.设置Mapper阶段输出数据的key、value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置Reducer阶段输出数据的key、value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置 ReduceTask 数量,默认为1 job.setNumReduceTasks(2); // 6.设置输入、出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output\\")); // 打jar包// FileInputFormat.setInputPaths(job,new Path(args[0]));// FileOutputFormat.setOutputPath(job,new Path(args[1])); // 7.提交job job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的,在这里我们设置ReduceTask的数量为2。
// 设置 ReduceTask 数量,默认为1job.setNumReduceTasks(2);
返回顶部
☠ 自定义Partitioner分区
▪ 自定义分区基本步骤
返回顶部
▪ 案例
需求分析
将统计结果按照手机归属地不同省份输出到不同文件中(分区)
(1)输入数据
(2)期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
返回顶部
代码实现
PhoneBean封装类
import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class PhoneBean implements Writable { private String ip; // ip private long upFlow; // 上行流量 private long downFlow; // 下行流量 private long sumFlow; // 总流量 public PhoneBean() { } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(ip); dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { ip = dataInput.readUTF(); upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); } @Override public String toString() { // 方便后续切割 return ip + "\t" +upFlow + "\t" + downFlow + "\t" + sumFlow; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void set(String ip1,long upFlow1,long downFlow1){ ip = ip1; upFlow = upFlow1 ; downFlow = downFlow1; sumFlow = upFlow1 + downFlow1; }}
返回顶部
ProvincePartitioner分区类
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner
返回顶部
Mapper阶段
import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper
返回顶部
Reducer阶段
import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer
返回顶部
Driver阶段
import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PhoneDriver { public static void main(String[] args) { Job job = null; Configuration conf = new Configuration(); try{ // 获取job对象 job = Job.getInstance(conf); // 配置 job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); job.setJarByClass(PhoneDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(PhoneBean.class); // 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner.class); // 同时指定相应数量的reduce task job.setNumReduceTasks(5); // 设置输入输出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\phone_data .txt")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output1\\")); // 提交job boolean result = job.waitForCompletion(true); System.exit(result?0:1); } catch (Exception e){ e.printStackTrace(); } }}
返回顶部
★ 分区总结
返回顶部
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~