java实现对Hadoop的操作

网友投稿 288 2022-10-14


java实现对Hadoop的操作

基本操作

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.junit.Test;

import org.junit.jupiter.api.BeforeEach;

import org.junit.jupiter.api.DisplayName;

import org.junit.ruhttp://nner.RunWith;

import org.junit.runners.JUnit4;

import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.Arrays;

@RunWith(JUnit4.class)

@DisplayName("Test using junit4")

public class HadoopClientTest {

private FileSystem fileSystem = null;

@BeforeEach

public void init() throws URISyntaxException, IOException, InterruptedException {

Configuration configuration = new Configuration();

configuration.set("dfs.replication", "1");

configuration.set("dfs.blocksize", "64m");

fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000"), configuration, "root");

}

/**

* 从本地复制文件到Hadoop

*

* @throws URISyntaxException

* @throws IOException

* @throws InterruptedException

*/

@Test

public void copyFileFromLocal() throws URISyntaxException, IOException, InterruptedException {

// 上传文件

fileSystem.copyFromLocalFile(new Path("C:\\Users\\Administrator\\Desktop\\win10激活.txt"), new Path("/even1"));

// 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

fileSystem.close();

}

/**

* 从Hadoop下载文件到本地,下载需要配置Hadoop环境,并添加winutils到bin目录

*

* @throws URISyntaxException

* @throws IOException

* @throws InterruptedException

*/

@Test

public void copyFileToLocal() throws URISyntaxException, IOException, InterruptedException {

// 下载文件

fileSystem.copyToLocalFile(new Path("/win10激活.txt"), new Path("E:/"));

// 关闭流,报错winUtils,因为使用了linux的tar包,如果windows要使用,则需要编译好这个winUtils包才能使用

fileSystem.close();

}

/**

* 创建文件夹

*

* @throws IOException

*/

@Test

public void hdfsMkdir() throws IOException {

// 调用创建文件夹方法

fileSystem.mkdirs(new Path("/even1"));

// 关闭方法

fileSystem.close();

}

/**

* 移动文件/修改文件名

*/

public void hdfsRename() throws IOException {

fileSystem.rename(new Path(""), new Path(""));

fileSystem.close();

}

/**

* 删除文件/文件夹

*

* @throws IOException

*/

@Test

public void hdfsRm() throws IOException {

// fileSystem.delete(new Path(""));

// 第二个参数表示递归删除

fileSystem.delete(new Path(""), true);

fileSystem.close();

}

/**

* 查看hdfs指定目录的信息

*

* @throws IOException

*/

@Test

public void hdfsLs() throws IOException {

// 调用方法返回远程迭代器,第二个参数是把目录文件夹内的文件也列出来

RemoteIterator listFiles = fileSystem.listFiles(new Path("/"), true);

while (listFiles.hasNext()) {

LocatedFileStatus locatedFileStatus = listFiles.next();

System.out.println("文件路径:" + locatedFileStatus.getPath());

System.out.println("块大小:" + locatedFileStatus.getBlockSize());

System.out.println("文件长度:" + locatedFileStatus.getLen());

System.out.println("副本数量:" + locatedFileStatus.getReplication());

System.out.println("块信息:" + Arrays.toString(locatedFileStatus.getBlockLocations()));

}

fileSystem.close();

}

/**

* 判断是文件还是文件夹

*/

@Test

public void findHdfs() throws IOException {

// 1,展示状态信息

FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));

// 2,遍历所有文件

for (FileStatus fileStatus : listStatus) {

if (fileStatus.isFile())

System.out.println("是文件:" + fileStatus.getPath().getName());

else if (fileStatus.isDirectory())

System.out.println("是文件夹:" + fileStatus.getPath().getName());

}

fileSystem.close();

}

}

文件读写

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.junit.Before;

import org.junit.Test;

import org.junit.jupiter.api.DisplayName;

import org.junit.runner.RunWith;

import org.junit.runners.JUnit4;

import java.io.*;

import java.net.URI;

import java.net.URISyntaxException;

import java.nio.charset.StandardCharsets;

import java.util.Arrays;

@RunWith(JUnit4.class)

@DisplayName("this is read write test!")

public class HadoopReadWriteTest {

FileSystem fileSystem = null;

Configuration configuration = null;

@Before

public void init() throws URISyntaxException, IOException, InterruptedException {

// 1,加载配置

configuration = new Configuration();

// 2,构建客户端

fileSystem = FileSystem.get(new URI("hdfs://hd-even-01:9000/"), configuration, "root");

}

@Test

public void testReadData() throws IOException {

// 1,获取hdfs文件流

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 2,设置一次获取的大小

byte[] bytes = new byte[1024];

// 3,读取数据

while (open.read(bytes) != -1)

System.out.println(Arrays.toString(bytes));

open.close();

fileSystem.close();

}

/**

* 使用缓存流

*

* @throws IOException

*/

@Test

public void testReadData1() throws IOException {

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 使用缓冲流会快点

BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, StandardCharsets.UTF_8));

String line = "";

while ((line = bufferedReader.readLine()) != null) {

System.out.println(line);

}

bufferedReader.close();

open.close();

fileSystem.close();

}

/**

* 指定偏移量来实现只读部分内容

*/

@Test

public void readSomeData() throws IOException {

FSDataInputStream open = fileSystem.open(new Path("/win10激活.txt"));

// 指定开始的index

open.seek(14);

// 指定读的多少

byte[] bytes = new byte[5];

while (open.read(bytes) != -1)

System.out.println(new String(bytes));

open.close();

fileSystem.close();

}

/**

* 流方式写数据

http://* @throws IOException

*/

@Test

public void writeData() throws IOException {

// 1,获取输出流

FSDataOutputStream out = fileSystem.create(new Path("/win11.txt"), false);

// 2,获取需要写的文件输入流

FileInputStream in = new FileInputStream(new File("C:\\Users\\Administrator\\Desktop\\xixi.txt"));

byte[] b = new byte[1024];

int read = 0;

while ((read = in.read(b)) != -1) {

out.write(b, 0, read);

}

in.close();

out.close();

fileSystem.close();

}

/**

* 直接写字符串

*/

@Test

public void writeData1() throws IOException {

// 1,创建输出流

FSDataOutputStream out = fileSystem.create(new Path("/aibaobao.txt"), false);

// 2,写数据

out.write("wochaoaibaobao".getBytes());

// 3,关闭流

IOUtils.closeStream(out);

fileSystem.close();

}

/**

* IOUtils方式上传

*

* @throws IOException

*/

@Test

public void putToHdfs() throws IOException {

// 1,获取输入流

FileInputStream in = new FileInputStream(new File("C:\\Users\\Administrator\\Desktop\\xixi.txt"));

// 2,获取输出流

FSDataOutputStream out = fileSystem.create(new Path("/haddopPut.txt"), false);

// 3,拷贝

IOUtils.copyBytes(in, out, configuration);

// 4,关闭流

IOUtils.closeStream(in);

IOUtils.closeStream(out);

fileSystem.close();

}

/**

* IOUtils方式下载

* @throws IOException

*/

@Test

public void getFromHdfs() throws IOException {

// 1,获取输入流

FSDataInputStream open = fileSystem.open(new Path("/haddopPut.txt"));

// 2,获取输出流

FileOutputStream out = new FileOutputStream(new File("C:\\Users\\Administrator\\Desktop\\haddopPut.txt"));

// 3,拷贝

IOUtils.copyBytes(open, out, configuration);

// 4,关闭流

IOUtils.closeStream(open);

IOUtils.closeStream(out);

fileSystem.close();

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IRF链型堆叠
下一篇:三层架构学习笔记
相关文章

 发表评论

暂时没有评论,来抢沙发吧~