Java之Rsync并发迁移数据并校验详解

网友投稿 427 2022-10-05


Java之Rsync并发迁移数据并校验详解

java调用Rsync并发迁移数据并执行校验

java代码如下

RsyncFile.java

import lombok.NoArgsConstructor;

import lombok.SneakyThrows;

import java.io.*;

import java.util.ArrayList;

import java.util.Date;

import java.util.concurrent.*;

/**

* @ClassName RsyncFile

* @Descriptiom TODO rsync多线程同步迁移数据

* @Author KING

* @Date 2019/11/25 09:17

* @Version 1.2.2

* rsync -vzrtopg --progress --delete //镜像同步

**/

@NoArgsConstructor

public class RsyncFile implements Runnable{

private static final int availProcessors = Runtime.getRuntime().availableProcessors();

//构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为无界的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务)

private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,

availProcessors,

1L,

TimeUnit.SECONDS,

new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),

new ThreadPoolExecutor.DiscardOldestPolicy());

//保存扫描得到的文件列表

private static ArrayList fileNameList = new ArrayList();

private String shellname;

private String filename;

private String userip;

private CountDownLatch countDownLatch;

private static int deep = 0;

public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {

this.shellname = ShellName;

this.filename = filename;

this.userip = UserIP;

this.countDownLatch = countDownLatch;

}

public static void main(String[] args) {

try {

new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));

}catch (ArrayIndexOutOfBoundsException e){

System.out.println(e);

System.out.println("Error , args send fault");

System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");

System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");

}catch(NumberFormatException e1){

System.out.println(e1);

http:// System.out.println("please input Right Directory depth, this number must be int");

System.out.println("like this [ /home/test/ root@node1:/test/ 1 ]");

}

}

@SneakyThrows

private void Do(String content,String UserIP,int setdeep){

System.out.println("开始执行");

System.out.println("开始时间:" + new Date());

Long a = System.nanoTime();

File file = new File(content);

System.out.println("开始扫描本地指定目录");

GetAllFile(file,setdeep);//按深度扫描非空文件夹和文件

System.out.println("扫描本地目录完成");

//给脚本赋予权限

String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};

Runtime.getRuntime().exec(cmd);

//创建远端目录操作

System.out.println("开始创建远端目录结构");

//一次计数锁用于保证目录创建完成

CountDownLatch doDirLock = new CountDownLatch(1)wdblAQ;

ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));

doDirLock.await();

System.out.println("创建远端目录结构完成");

//开始同步工作

System.out.println("开始执行同步工作");

System.out.println("同步的文件夹或文件总数: " + fileNameList.size());

System.out.println("正在同步。。。。。");

//fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确)

CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());

System.out.println(fileNameList.size());

for (String fileName:fileNameList) {

//除去文件名中与UserIP重复的文件路径

String RemoteDir = UserIP.concat(fileName.replace(content, ""));

System.out.println("要同步的本地目录或文件: " + fileName);

System.out.println("要同步的远端目录或文件: " + RemoteDir);

ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));

}

rsyncLock.await();

System.out.println("执行同步工作完成");

//开始文件校验工作

System.out.println("执行文件校验及重传");

//一次计数锁用于保证校验完成

CountDownLatch chechSumLock = new CountDownLatch(1);

ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));

chechSumLock.await();

System.out.println("文件校验及重传完成");

ThreadPool.shutdown();

Long b = System.nanoTime();

Long aLong = (b - a)/1000000L;

System.out.println("处理时间" + aLong + "ms");

System.out.println("结束时间:" + new Date());

}

/**

* 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互

*/

@Override

public void run() {

try {

String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);

File wd = new File("/bin");

Process process = null;

process = Runtime.getRuntime().exec("/bin/bash", null, wd);

if (process != null) {

InputStream is = process.getInputStream();

BufferedReader reader = new BufferedReader(new InputStreamReader(is));

PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),

true);

//切换到当前class文件所在目录

out.println("cd " + System.getProperty("user.dir"));

out.println(command);

out.println("exit");

StringBuilder sb = new StringBuilder();

String line;

while ((line = reader.readLine()) != null) {

sb.append(line + System.lineSeparator());

}

process.waitFor();

reader.close();

out.close();

process.destroy();

System.out.println("result:" + sb.toString());

}else {

http://System.out.println("找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接");

}

} catch (Exception e) {

System.err.println(e.getMessage());

}finally {

//倒记数锁释放一次

countDownLatch.countDown();

}

}

/**遍历指定的目录并能指定深度

* @param file 指定要遍历的目录

* @param setDeep 设定遍历深度

*/

@SneakyThrows

private static void GetAllFile(File file, int setDeep) {

if(file != null){

if(file.isDirectory() && deep

deep++;

File f[] = file.listFiles();

if(f != null) {

int length = f.length;

for(int i = 0; i < length; i++)

GetAllFile(f[i],setDeep);

deep--;

}

} else {

if(file.isDirectory()){

//如果为目录末尾添加 / 保证rsync正常处理

fileNameList.add(file.getAbsolutePath().concat("/"));

}else {

fileNameList.add(file.getAbsolutePath());

}

}

}

}

}

doDir.sh

rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1

echo "创建目录结构操作"

doRsync.sh

rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1

doChecksum.sh

rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1

附录

rsync输出日志说明如下

YXcstpoguax path/to/file

|||||||||||

||||||||||╰- x: The extended attribute information changed

|||||||||╰-- a: The ACL information changed

||||||||╰--- u: The u slot is reserved for future use

|||||||╰---- g: Group is different

||||||╰----- o: Owner is different

|||||╰------ p: Permission are different

||||╰------- t: Modification time is different

|||╰-------- s: Size is different

||╰--------- c: Different checksum (for regular files), or

|| changed value (for symlinks, devices, and special files)

|╰---------- the file type:

| f: for a file,

| d: for a directory,

| L: for a symlink,

| D: for a device,

| S: for a special file (e.g. named sockets and fifos)

╰----------- the type of update being done::

<: being="" file="" host="" is="" remote="" the="" to="" transferred="">

>: file is being transferred to the local host (received)

c: local change/creation for the item, such as:

- the creation of a directory

- the changing of a symlink,

- etc.

h: the item is a hard link to another item (requires

--hard-links).

.: the item is not being updated (though it might have

attributes that are being modified)

*: means that the rest of the itemized-output area contains

a message (e.g. "deleting")

deep++;

File f[] = file.listFiles();

if(f != null) {

int length = f.length;

for(int i = 0; i < length; i++)

GetAllFile(f[i],setDeep);

deep--;

}

} else {

if(file.isDirectory()){

//如果为目录末尾添加 / 保证rsync正常处理

fileNameList.add(file.getAbsolutePath().concat("/"));

}else {

fileNameList.add(file.getAbsolutePath());

}

}

}

}

}

doDir.sh

rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1

echo "创建目录结构操作"

doRsync.sh

rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1

doChecksum.sh

rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1

附录

rsync输出日志说明如下

YXcstpoguax path/to/file

|||||||||||

||||||||||╰- x: The extended attribute information changed

|||||||||╰-- a: The ACL information changed

||||||||╰--- u: The u slot is reserved for future use

|||||||╰---- g: Group is different

||||||╰----- o: Owner is different

|||||╰------ p: Permission are different

||||╰------- t: Modification time is different

|||╰-------- s: Size is different

||╰--------- c: Different checksum (for regular files), or

|| changed value (for symlinks, devices, and special files)

|╰---------- the file type:

| f: for a file,

| d: for a directory,

| L: for a symlink,

| D: for a device,

| S: for a special file (e.g. named sockets and fifos)

╰----------- the type of update being done::

<: being="" file="" host="" is="" remote="" the="" to="" transferred="">

>: file is being transferred to the local host (received)

c: local change/creation for the item, such as:

- the creation of a directory

- the changing of a symlink,

- etc.

h: the item is a hard link to another item (requires

--hard-links).

.: the item is not being updated (though it might have

attributes that are being modified)

*: means that the rest of the itemized-output area contains

a message (e.g. "deleting")

>: file is being transferred to the local host (received)

c: local change/creation for the item, such as:

- the creation of a directory

- the changing of a symlink,

- etc.

h: the item is a hard link to another item (requires

--hard-links).

.: the item is not being updated (though it might have

attributes that are being modified)

*: means that the rest of the itemized-output area contains

a message (e.g. "deleting")


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:DDOS攻击分为几大类?网络安全入门教程(ddos攻击有几种)
下一篇:漏洞优先级技术(VPT)导论
相关文章

 发表评论

暂时没有评论,来抢沙发吧~