【MapReduce】MR 框架原理 之 Partitioner分区

网友投稿 199 2022-11-06


【MapReduce】MR 框架原理 之 Partitioner分区

文章目录

​​Partition分区​​

​​☠ 默认分区 --- HashPartitioner​​

​​▪ 案例 --- WordCount​​

​​Mapper阶段​​​​Reducer阶段​​​​Driver阶段​​

​​☠ 自定义Partitioner分区​​

​​▪ 自定义分区基本步骤​​​​▪ 案例​​

​​需求分析​​​​代码实现​​

​​PhoneBean封装类​​​​ProvincePartitioner分区类​​​​Mapper阶段​​​​Reducer阶段​​​​Driver阶段​​

​​★ 分区总结​​

Partition分区

在进行数据处理的时候要求将统计结果按照条件输出到不同文件中,这是就会将数据按照不同的条件进行区域划分,统计计算后输出。这就涉及到了分区的概念。比如:将统计结果按照手机归属地​​不同省份​​输出到不同文件中,这时就需​​要按照省份进行分区​​

☠ 默认分区 — HashPartitioner

public class HashPartitioner extends Partitioner{ public int get Partition(k key,v value,int numReduceTasks){ return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}

​​默认分区是根据key的hashCode对ReduceTasks个数取模得到的​​,用户没法空值哪个key存储到那个分区。

▪ 案例 — WordCount

Mapper阶段

/** * Mapper 阶段 * KEYIN 输入数据的key类型 * VALUEIN 输入数据的value类型 * KEYOUT 输出数据的key类型 * VALUEOUT 输出数据的value类型 */public class wordCountMapper extends Mapper { // 创建对象 Text k = new Text(); IntWritable v = new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println(key.toString()); // 1.获取一行数据 // atguigu atguigu String line = value.toString(); // 2.切分 String[] words = line.split(" "); // 3.循环写出 for (String word:words){ // 设置键 atguigu k.set(word); // 设置词频为 1 , 也可以在上面创建对象时默认为1 v.set(1); // 生成键值对 (atguigu,1) context.write(k,v); } }}

Reducer阶段

/** * Reducer 阶段 * KEYIN ,VALUEIN Reducer阶段输入(Mapper阶段输出)数据的类型 * KEYOUT 最终输出数据的key类型 * VALUEOUT 最终输出数据的value类型 */public class wordCountReducer extends Reducer { IntWritable v = new IntWritable(); @Override // Iterable values 对key的value值进行迭代实现词频统计 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // atguigu,1 // atguigu,1 // 1.累加求和 int sum = 0; for (IntWritable value:values){ // value是IntWritable类型数据,通过get转为int型,才好计算 sum += value.get(); } // 2.写出结果 v.set(sum); context.write(key,v); }}

Driver阶段

public class wordCountDriver { public static void main(String[] args) { Configuration conf = new Configuration(); Job job = null; try { // 1.获取job对象 job = Job.getInstance(conf); // 2.设置jar存储位置 job.setJarByClass(wordCountDriver.class); // 3.关联map、reduce类 job.setMapperClass(wordCountMapper.class); job.setReducerClass(wordCountReducer.class); // 4.设置Mapper阶段输出数据的key、value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5.设置Reducer阶段输出数据的key、value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置 ReduceTask 数量,默认为1 job.setNumReduceTasks(2); // 6.设置输入、出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output\\")); // 打jar包// FileInputFormat.setInputPaths(job,new Path(args[0]));// FileOutputFormat.setOutputPath(job,new Path(args[1])); // 7.提交job job.waitForCompletion(true); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的,在这里我们设置ReduceTask的数量为2。

// 设置 ReduceTask 数量,默认为1job.setNumReduceTasks(2);

​​返回顶部​​

☠ 自定义Partitioner分区

▪ 自定义分区基本步骤

​​返回顶部​​

▪ 案例

需求分析

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

(1)输入数据

(2)期望输出数据

手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

​​返回顶部​​

代码实现

PhoneBean封装类

import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class PhoneBean implements Writable { private String ip; // ip private long upFlow; // 上行流量 private long downFlow; // 下行流量 private long sumFlow; // 总流量 public PhoneBean() { } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(ip); dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } @Override public void readFields(DataInput dataInput) throws IOException { ip = dataInput.readUTF(); upFlow = dataInput.readLong(); downFlow = dataInput.readLong(); sumFlow = dataInput.readLong(); } @Override public String toString() { // 方便后续切割 return ip + "\t" +upFlow + "\t" + downFlow + "\t" + sumFlow; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void set(String ip1,long upFlow1,long downFlow1){ ip = ip1; upFlow = upFlow1 ; downFlow = downFlow1; sumFlow = upFlow1 + downFlow1; }}

​​返回顶部​​

ProvincePartitioner分区类

import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;public class ProvincePartitioner extends Partitioner { @Override public int getPartition(Text text, PhoneBean phoneBean, int numPartitions) { // key 是手机号 // value 是户主信息 // 1.获取手机号前三位 String phoneNum = text.toString().substring(0,3); // 2.定义分区数 注意:分区数必须从0开始 int partition = 4; if ("136".equals(phoneNum)){ partition = 0; } else if ("137".equals(phoneNum)){ partition = 1; } else if ("138".equals(phoneNum)){ partition = 2; }else if ("139".equals(phoneNum)){ partition = 3; } else { partition = 4; } return partition; }}

​​返回顶部​​

Mapper阶段

import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper { Text k = new Text(); PhoneBean v = new PhoneBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 读取一行数据 String line = value.toString(); // 2. 拆分 String[] words = line.split("\t"); // 3. 封装对象 k.set(words[1]); String ip = words[2]; long upFlow = Long.parseLong(words[words.length-3]); long dowmFlow = Long.parseLong(words[words.length-2]); v.setIp(ip); v.setUpFlow(upFlow); v.setDownFlow(dowmFlow); // 4.写出 context.write(k,v); }}

​​返回顶部​​

Reducer阶段

import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class PhoneReducer extends Reducer { String ip = ""; long sum_upFlow = 0; long sum_downFlow = 0; PhoneBean v = new PhoneBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1.累加求和 for (PhoneBean phoneBean:values){ ip = phoneBean.getIp(); sum_upFlow += phoneBean.getUpFlow(); sum_downFlow += phoneBean.getDownFlow(); } v.set(ip,sum_upFlow,sum_downFlow); // 2.写出 context.write(key,v); ip = ""; sum_upFlow = 0; sum_downFlow = 0; }}

​​返回顶部​​

Driver阶段

import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PhoneDriver { public static void main(String[] args) { Job job = null; Configuration conf = new Configuration(); try{ // 获取job对象 job = Job.getInstance(conf); // 配置 job.setMapperClass(PhoneMapper.class); job.setReducerClass(PhoneReducer.class); job.setJarByClass(PhoneDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PhoneBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(PhoneBean.class); // 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner.class); // 同时指定相应数量的reduce task job.setNumReduceTasks(5); // 设置输入输出路径 FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\dataset\\phone_data .txt")); FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Partition分区\\output1\\")); // 提交job boolean result = job.waitForCompletion(true); System.exit(result?0:1); } catch (Exception e){ e.printStackTrace(); } }}

​​返回顶部​​

★ 分区总结

​​返回顶部​​


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring引入外部属性文件配置数据库连接的步骤详解
下一篇:【数据分析与预处理】 ---- 数据标准化
相关文章

 发表评论

暂时没有评论,来抢沙发吧~