详解HDFS多文件Join操作的实例

网友投稿 239 2023-03-25


详解HDFS多文件Join操作的实例

详解HDFS多文件Join操作的实例

最近在做HDFS文件处理之时,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,

下面是个简单的例子;采用两个表来做left join其中数据结构如下:

A 文件:

a|1b|2|c

B文件:

a|b|1|2|c

即:A文件中的第一、二列与B文件中的第一、三列对应;类似数据库中Table的主键/外键

代码如下:

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;

import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;

import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

import orhttp://g.apache.hadoop.util.ReflectionUtils;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import cn.eshore.traffic.hadoop.util.CommUtil;

import cn.eshore.traffic.hadoop.util.StringUtil;

/**

* @ClassName: DataJoin

* @Description: HDFS JOIN操作

* @author hadoop

* @date 2012-12-18 下午5:51:32

*/

public class InstallJoin extends Configured implements Tool {

private String static enSplitCode = "\\|";

private String static splitCode = "|";

//http:// 自定义Reducer

public static class ReduceClass extends DataJoinReducerBase {

@Override

protected TaggedMapOutput combine(Object[] tags, Object[] values) {

String joinedStr = "";

//该段判断用户生成Left join限制【其中tags表示文件的路径,install表示文件名称前缀】

//去掉则为All Join

if (tags.length == 1 && tags[0].toString().contains("install")) {

return null;

}

Map map = new HashMap();

for (int i = 0; i < values.length; i++) {

TaggedWritable tw = (TaggedWritable) values[i];

String line = ((Text) tw.getData()).toString();

String[] tokens = line.split(enSplitCode, 8);

String groupValue = tokens[6];

String type = tokens[7];

map.put(type, groupValue);

}

joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));

TaggedWritable retv = new TaggedWritable(new Text(joinedStr));

retv.setTag((Text) tags[0]);

return retv;

}

}

// 自定义Mapper

public static class MapClass extends DataJoinMapperBase {

//自定义Key【类似数据库中的主键/外键】

@Override

protected Text generateGroupKey(TaggedMapOutput aRecord) {

String line = ((Text) aRecord.getData()).toString();

String[] tokens = line.split(CommUtil.enSplitCode);

String key = "";

String type = tokens[7];

//由于不同文件中的Key所在列有可能不同,所以需要动态生成Key,其中type为不同文件中的数据标识;如:A文件最后一列为a用于表示此数据为A文件数据

if ("7".equals(type)) {

key = tokens[0]+"|"+tokens[1];

}else if ("30".equals(type)) {

key = tokens[0]+"|"+tokens[2];

}

return new Text(key);

}

@Override

protected Text generateInputTag(String inputFile) {

return new Text(inputFile);

}

@Override

protected TaggedMapOutput generateTaggedMapOutput(Object value) {

TaggedWritable retv = new TaggedWritable((Text) value);

retv.setTag(this.inputTag);

return retv;

}

}

public static class TaggedWritable extends TaggedMapOutput {

private Writable data;

// 自定义

public TaggedWritable() {

this.tag = new Text("");

}

public TaggedWritable(Writable data) {

this.tag = new Text("");

this.data = data;

}

@Override

public Writable getData() {

return data;

}

@Override

public void write(DataOutput out) throws IOException {

this.tag.write(out);

out.writeUTF(this.data.getClass().getName());

this.data.write(out);

}

@Override

public void readFields(DataInput in) throws IOException {

this.tag.readFields(in);

String dataClz = in.readUTF();

if (this.data == null

|| !this.data.getClass().getName().equals(dataClz)) {

try {

this.data = (Writable) ReflectionUtils.newInstance(

Class.forName(dataClz), null);

} catch (ClassNotFoundException e) {

e.printStackTrace();

}

}

this.data.readFields(in);

}

}

/**

* job运行

*/

@Override

public int run(String[] paths) throws Exception {

int no = 0;

try {

Configuration conf = getConf();

JobConf job = new JobConf(conf, InstallJoin.class);

FileInputFormat.setInputPaths(job, new Path(paths[0]));

FileOutputFormat.setOutputPath(job, new Path(paths[1]));

job.setJobName("join_data_test");

job.setMapperClass(MapClass.class);

job.setReducerClass(ReduceClass.class);

job.setInputFormat(TextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(TaggedWritable.class);

job.set("mapred.textoutputformat.separator", CommUtil.splitCode);

JobClient.runJob(job);

no = 1;

} catch (Exception e) {

throw new Exception();

}

return no;

}

//测试

public static void main(String[] args) {

String[] paths = {

"hdfs://master...:9000/home/hadoop/traffic/join/newtype",

"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }

int res = 0;

try {

res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);

} catch (Exception e) {

e.printStackTrace();

}

System.exit(res);

}

}

如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:5个JAVA入门必看的经典实例
下一篇:Hadoop多Job并行处理的实例详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~