hadoop上传文件功能实例代码

网友投稿 361 2023-03-30


hadoop上传文件功能实例代码

hdfs上的文件是手动执行命令从本地linux上传至hdfs的。在真实的运行环境中,我们不可能每次手动执行命令上传的,这样太过繁琐。那么,我们可以使用hdfs提供的java api实现文件上传至hdfs,或者直接从ftp上传至hdfs。

然而,需要说明一点,之前笔者是要运行MR,都需要每次手动执行yarn jar,在实际的环境中也不可能每次手动执行。像我们公司是使用了索答的调度平台/任务监控平台,可以定时的以工作流执行我们的程序,包括普通java程序和MR。其实,这个调度平台就是使用了quartz。当然,这个调度平台也提供其它的一些功能,比如web展示、日志查看等,所以也不是免费的。

首先,给大家简单介绍一下hdfs。hdfs是以流式数据访问模式来存储超大文件,hdfs的构建思路是一次写入,多次读取,这样才是最高效的访问模式。hdfs是为高数据吞吐量应用优化的,所以会以提高时间延迟为代价。对于低延时的访问需求,我们可以使用hbase。

然后,还要知道hdfs中块(block)的概念,默认为64MB。块是hdfs的数据读写的最小单位,通常每个map任务一次只处理一个block,像我们对集群性能评估就会使用到这个概念,比如目前有多少节点,每个节点的磁盘空间、cpu以及所要处理的数据量、网络带宽,通过这些信息来进行性能评估。我们可以使用Hadoop fsck / -files -blocks列出文件系统中各个文件由哪些块构成。

在yarn环境中是可以有多个nameNode的。此环境中没有SecondaryNameNode,当然也可以有。

好了,关于hdfs的基本概念就讲到这儿了,下面来看看具体的代码。

一、java实现上传本地文件至hdfs

这里,可以直接使用hdfs提供的java api即可实现,代码如下:

package com.bjpowernode.hdfs.local;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

* ClassName:UploadLocalFileToHdfs

* Function: 本地文件上传至hdfs.

* Date: 2016年3月28日 下午10:06:05

* @author qiyongkang

* @version

* @since JDK 1.6

* @see

*/

public class UploadLocalFileToHdfs {

public static void main(String[] args) {

Configuration conf = new Configuration();

String localDir = "/home/qiyongkang";

String hdfsDir = "/qiyongkang";

try{

Path localPath = new Path(localDir);

Path hdfsPath = new Path(hdfsDir);

FileSystem hdfs = FileSystem.get(conf);

hdfs.copyFromLocalFile(localPath, hdfsPath);

}catch(Exception e){

e.printStackTrace();

}

}

}

注意,这里hdfs上传目录如果不存在的话,hdfs会自动创建,比较智能。

打完包后,上传至服务器,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,然后执行hadoop fs -ls /qhttp://iyongkang便可看到:

二、java实现上传ftp上的文件至hdfs

首先,我们得准备一个ftp服务器,关于ftp服务器的搭建,大家可以查阅资料,笔者就不赘述了。

其实,从ftp上拉取文件上传到hdfs上,这个过程大家不要想复杂了,我们讲本地文件上传到hdfs,其实就是采用流的方式。因此,我们可以直接读取ftp上的文件流,然后以流的方式写入到hdfs。

下面,直接贴出代码:

package com.bjpowernode.hdfs.ftp;

import java.io.InputStream;

import org.apache.commons.net.ftp.FTP;

import org.apache.commons.net.ftp.FTPClient;

import org.apache.commons.net.ftp.FTPFile;

import org.apache.commons.net.ftp.FTPReply;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

/**

* ClassName:UploadFtpFileToHdfs

* Function: TODO ADD FUNCTION.

* Reason: TODO ADD REASON.

* Date: 2016年3月28日 下午10:50:37

*

* @author qiyongkang

* @version

* @since JDK 1.6

* @see

*/

public class UploadFtpFileToHdfs {

public static void main(String[] args) {

Configuration conf = new Configuration();

loadFromFtpToHdfs("172.31.26.200", "qiyongkang", "qyk123456", "/www/input/", "/qiyongkang/", conf);

}

/**

*

* loadFromFtpToHdfs:将数据从ftp上传到hdfs上. &http://lt;br/>

*

* @author qiyongkang

* @param ip

* @param username

* @param password

* @param filePath

* @param outputPath

* @param conf

* @return

* @since JDK 1.6

*/

private static boolean loadFromFtpToHdfs(String ip, String username, String password, String filePath,

String outputPath, Configuration conf) {

FTPClient ftp = new FTPClient();

InputStream inputStream = null;

FSDataOutputStream outputStream = null;

boolean flag = true;

try {

ftp.connect(ip);

ftp.login(username, password);

ftp.setFileType(FTP.BINARY_FILE_TYPE);

ftp.setControlEncoding("UTF-8");

int reply = ftp.getReplyCode();

if (!FTPReply.isPositiveCompletion(reply)) {

ftp.disconnect();

}

FTPFile[] files = ftp.listFiles(filePath);

FileSystem hdfs = FileSystem.get(conf);

for (FTPFile file : files) {

if (!(file.getName().equals(".") || file.getName().equals(".."))) {

inputStream = ftp.retrieveFileStream(filePath + file.getName());

outputStream = hdfs.create(new Path(outputPath + file.getName()));

IOUtils.copyBytes(inputStream, outputStream, conf, false);

if (inputStreamhttp:// != null) {

inputStream.close();

ftp.completePendingCommand();

}

}

}

ftp.disconnect();

} catch (Exception e) {

flag = false;

e.printStackTrace();

}

return flag;

}

}

然后同样打包上传后执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,便可看到http://:

总结

以上所述是给大家介绍的hadoop上传文件功能实例代码,希望对大家有所帮助,如果大家有任何疑问请给我留言,会及时回复大家的。在此也非常感谢大家对我们网站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring Boot Admin 的使用详解
下一篇:接口测试用例非空校验(接口测试用例方法)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~