java结合HADOOP集群文件上传下载

网友投稿 250 2023-08-03


java结合HADOOP集群文件上传下载

对HDFS上的文件进行上传和下载是对集群的基本操作,在《HADOOP权威指南》一书中,对文件的上传和下载都有代码的实例,但是对如何配置HADOOP客户端却是没有讲得很清楚,经过长时间的搜索和调试,总结了一下,如何配置使用集群的方法,以及自己测试可用的对集群上的文件进行操作的程序。首先,需要配置对应的环境变量:

复制代码 代码如下:

hadoop_HOME="/home/work/tools/java/hadoop-client/hadoop"

for f in $hadoop_HOME/hadoop-*.jar; do

        hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f

done

for f in $hadoop_HOME/lib/*.jar; do

        hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f

done

hadoopvfs_HOME="/home/work/tools/java/hadoop-client/hadoop-vfs"

for f in $hadoopvfs_HOME/lib/*.jar; do

        hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f

done

export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/work/tools/java/hadoop-client/hadoop/lib/native/linux-amd64-64/

其中LD_LIBRARY_PATH是在调用时需要用到的库的路径,hadoop_CLASSPATH则是我们hadoop客户端里各种jar包

有一点需要注意的是最好不要使用HADOOP_HOME这个变量,这个是一个系统使用的环境变量,最好不要和它冲突

编译类的方法:

复制代码 代码如下:

javac -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil.java

运行的方法:

复制代码 代码如下:

java -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil

但是在实际的使用过程中,会报No Permission之类的错误,或者你能保证代码没有问题的情况下,在运行的时候也会报一些奇奇怪怪的错误

那么问题来了,这是什么鬼?

答案:这是因为没有配置对应集群的配置文件

因为在《HADOOP权威指南》一书中,弱化了配置的东西,所以在具体使用集群的时候就会出现问题,如何解决呢,这样子:

复制代码 代码如下:

this.conf = new Configuration(false);

conf.addResource("./hadoop-site.xml");

conf.addResource("./hadoop-default.xml");

conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());conf.set("fsdlcMqOXA.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

为什么会这样,书上只是很简单的:

this.conf = new Configuration();

那是因为默认你的集群在本地,所以不需要做配置,但是在实际使用的过程中,各个集群的配置是不同的,所以我们要引入集群的配置

这是非常重要的一点,因为实际使用的过程中我们都是使用的HADOOP的客户端,而且是已经搭好环境的集群,所以我们需要做好本地的配置

hadoop-site.xml和hadoop-default.xml这两个文件在所使用的客户端的conf目录下,在addResource的时候指定好目录就行了

将以上所提到的配置,全部配完之后,这个程序才能真正运行起来,所以配置是非常重要的一环。

以下是对应的工具的代码,有兴趣的看一下吧,使用的是文件流的方式来搞的,这样子也可以打通FTP和HDFS之间文件的互传:

import java.io.BufferedInputStream;

import java.io.FileInputStream;

import java.io.FileNotFoundException;

import java.io.FileOutputStream;

import java.io.IOException;

import java.io.InputStream;

import java.io.OutputStream;

import java.net.URI;

import java.net.URL;

import java.io.*;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.util.Progressable;

public class HDFSUtil {

private String hdfs_node = "";

private String hdfs_path = "";

private String file_path = "";

private String hadoop_site = "";

private String hadoop_default = "";

private Configuration conf = null;

public HDFSUtil(String hdfs_node) {

this.hdfs_node = hdfs_node;

}

public String getHdfsNode() {

return this.hdfs_node;

}

public void setHdfsPath(String hdfs_path){

this.hdfs_path = hdfs_path;

}

public String getHdfsPath(){

return this.hdfs_path;

}

public void setFilePath(String file_path){

this.file_path = file_path;

}

public String getFilePath(){

return this.file_path;

}

public void setHadoopSite(String hadoop_site){

this.hadoop_site = hadoop_site;

}

public String getHadoopSite(){

return this.hadoop_site;

}

public void setHadoopDefault(String hadoop_default){

this.hadoop_default = hadoop_default;

}

public String getHadoopDefault(){

return this.hadoop_default;

}

public int setConfigure(boolean flag) {

if (flag == false){

if (this.getHadoopSite() == "" || this.getHadoopDefault() == ""){

return -1;

}

else {

this.conf = new Configuration(false);

conf.addResource(this.getHadoopDefault());

conf.addResource(this.getHadoopSite());

conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());

conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

return 0;

}

}

this.conf = new Configuration();

return 0;

}

public Configuration getConfigure() {

return this.conf;

}

public int upLoad(String localName, String remoteName) throws FileNotFoundException, IOException {

InputStream inStream = null;

FileSystem fs = null;

try{

inStream = new BufferedInputStream(new FileInputStream(localName));

fs = FileSystem.get(URI.create(this.hdfs_node), this.conf);

OutputStream outStream = fs.create(new Path(remoteName) ,new Progressable() {

public void progress(){

System.out.print('.');

}

});

IOUtils.copyBytes(inStream, outStream, 4096, true);

inStream.close();

return 0;

} catch (IOException e){

inStream.close();

e.printStackTrace();

return -1;

}

}

public int upLoad(InputStream inStream, String remoteName) throws FileNotFoundException, IOException {

FileSystem fs = null;

try{

fs = FileSystem.get(URI.create(this.hdfs_node), this.conf);

OutputStream outStream = fs.create(new Path(remoteName) ,new Progressable() {

public void progress(){

System.out.print('.');

}

});

IOUtils.copyBytes(inStream, outStream, 4096, true);

inStream.close();

return 0;

} catch (IOException e){

inStream.close();

e.printStackTrace();

return -1;

}

}

public int donwLoad(String remoteName, String localName, int lines) throws FileNotFoundException, IOException {

FileOutputStream fos = null;

InputStreamReader isr = null;

BufferedReader br = null;

String str = null;

OutputStreamWriter osw = null;

BufferedWriter buffw = null;

PrintWriter pw = null;

FileSystem fs = null;

InputStream inStream = null;

try {

fs = FileSystem.get(URI.create(this.hdfs_node + remoteName), this.conf);

inStream = fs.open(new Path(this.hdfs_node + remoteName));

fos = new FileOutputStream(localName);

osw = new OutputStreamWriter(fos, "UTF-8");

buffw = new BufferedWriter(osw);

pw = new PrintWriter(buffw);

isr = new InputStreamReader(inStream, "UTF-8");

br = new BufferedReader(isr);

while((str = br.readLine()) != null && lines > 0){

lines--;

pw.println(str);

}

} catch (IOException e){

throw new IOException("Couldn't write.", e);

} finally {

pw.close();

buffw.close();

osw.close();

fos.close();

inStream.close()

}

return 0;

}

//main to test

public static void main(String[] args){

String hdfspath = null;

String localname = null;

String hdfsnode = null;

int lines = 0;

if (args.length == 4){

hdfsnode = args[0];

hdfspath = args[1];

localname = args[2];

lines = Integer.parseInt(args[3]);

}

else{

hdfsnode = "hdfs://nj01-nanling-hdfs.dmop.baidu.com:54310";

hdfspath = "/app/ps/spider/wdmqa/wangweilong/test/HDFSUtil.java";

localname = "/home/work/workspace/project/dhc2-0/dhc/base/ftp/papapa";

lines = 5;

}

HDFSUtil hdfsutil = new HDFSUtil(hdfsnode);

hdfsutil.setFilePath(hdfsutil.getHdfsNode()+hdfspath);

hdfsutil.setHadoopSite("./hadoop-site.xml");

hdfsutil.setHadoopDefault("./hadoop-default.xml");

hdfsutil.setConfigure(false);

try {

hdfsutil.donwLoad(hdfspath, localname, lines);

} catch (IOException e){

e.printStackTrace();

}

}

如果想要了解FTP上文件的下载,请参考这篇文章:

ftp下载工具

如果想要打通FTP和HDFS文件互传,只要创建一个类,调用这两篇文章中的工具的接口就可以搞定,自己写的代码,实测有效。

以上就是本文的全部内容了,希望能够对大家熟练掌握java有所帮助。

请您花一点时间将文章分享给您的朋友或者留下评论。我们将会由衷感谢您的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java自动生成ID号的方法
下一篇:Java程序员必须知道的5个JVM命令行标志
相关文章

 发表评论

暂时没有评论,来抢沙发吧~