hadoop的wordcount实例代码

网友投稿 232 2023-02-19


hadoop的wordcount实例代码

可以通过一个简单的例子来说明MapReduce到底是什么:

我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是”Map”。然后把每个人统计的数字合并起来,这个就是“Reduce"。

上面的例子如果在MapReduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中的Map任务以完全并行的方式进行处理。MapReduce会对Map的输出地行收集,再将结果输出送给Reduce进行下一步的处理。

对于一个任务的具体执行过程,会有一个名为"JobTracker"的进程负责协调MapReduce执行过程中的所有任务。若干条TaskTracker进程用来运行单独的Map任务,并随时将任务的执行情况汇报给JobTracker。如果一个TaskTracker汇报任务失败或者长时间未对本身任务进行汇报,JobTracker会启动另外一个TaskTracker重新执行单独的Map任务。

下面的具体的代码实现:

1. 编写wordcount的相关job

(1)eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)

注意:要配置一个maven插件maven-jar-plugin,并指定mainClass

junit

junit

4.11

org.apache.hadoop

hadoop-mapreduce-client-core

2.5.2

org.apache.hadoop

hadoop-common

2.5.2

org.apache.maven.plugins

maven-jar-plugin

com.xxx.demo.hadoop.wordcount.WordCount

(2)根据MapReduce的运行机制,一个job至少要编写三个类分别用来完成Map逻辑、Reduce逻辑、作业调度这三件事。

Map的代码可继承org.apache.hadoop.mapreduce.Mapper类

public static class TokenizerMapper

extends Mapper{

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

//由于该例子未用到key的参数,所以该处key的类型就简单指定为Object

public void map(Object key, Text value, Context context

) throws IOException, InterruptedException {

StringTokenizer itr = new StringTokenizer(value.toString());

while (itr.hasMoreTokens()) {

word.set(itr.nextToken());

context.write(word, one);

}

}

}

Reduce的代码可继承org.apache.hadoop.mapreduce.Reducer类

public class IntSumReducer

extends Reducer {

private IntWritable result = new IntWritable();

public void reduce(Text key, IterwqupMluJjRable values,

Context context

) throws IOException, InterruptedException {

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

}

}

编写main方法进行作业调度

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf, "word count");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setCombinerClass(IntSumReducer.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true) ;

//System.exit(job.waitForCompletion(true) ? 0 : 1);

}

2. 上传数据文件到hadoop集群环境

执行mvn install把项目打成jar文件然后上传到linux集群环境,使用hdfs dfs -mkdir命令在hdfs文件系统中创建相应的命令,使用hdfs dfs -put 把需要处理的数据文件上传到hdfs系统中,示例:hdfs dfs -put ${linux_path/数据文件} ${hdfs_path}

3. 执行job

在集群环境中执行命令: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path} ${hdfs_output_path}

4. 查看统计结果

hdfs dfs -cat ${hdfs_output_path}/输出文件名

以上的方式在未启动hadoop集群环境时,是以Local模式运行,此时HDFS和YARN都不起作用。下面是在伪分布式模式下执行mapreduce job时需要做的工作,先把官网上列的步骤摘录出来:

配置主机名

# vi /etc/sysconfig/network

例如:

NETWORKING=yes

HOSTNAME=master

vi /etc/hosts

填入以下内容

127.0.0.1 localhost

配置ssh免密码互通

ssh-keygen -t rsa

# cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys

配置core-site.xml文件(位于${HADOOP_HOME}/etc/hadoop/

fs.defaultFS

hdfs://localhost:9000

配置hdfs-site.xml文件

dfs.replication

1

下面的命令可以在单机伪分布模式下运行mapreduce的job

1.Format the filesystem:

$ bin/hdfs namenode -format

2.Start NameNode daemon and DataNode daemon:

$ sbin/start-dfs.sh

3.The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

4.Browse the web interface for the NameNode; by default it is available at:

NameNode - http://localhost:50070/

Make the HDFS directories required to execute MapReduce jobs:

$ bin/hdfs dfs -mkdir /user

$ bin/hdfs dfs -mkdir /user/

5.Copy the input files into the distributed filesystem:

$ bin/hdfs dfs -put etc/hadoop input

6.Run some of the examples provided:

$ bin/hadoowqupMluJjRp jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'

7.Examine the output files:

Copy the output files from the distributed filesystem to the local filesystem and examine them:

$ bin/hdfs dfs -get output output

$ cat output/*

or

View the output files on the distributed filesystem:

$ bin/hdfs dfs -cat output/*

8.When you're done, stop the daemons with:

$ sbin/stop-dfs.sh

总结

以上就是本文关于hadoop的wordcount实例代码的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:web接口测试工具(Web接口测试)
下一篇:springcloud 中 zuul 修改请求参数信息的方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~