Java多线程实现快速切分文件的程序

网友投稿 243 2023-07-14


Java多线程实现快速切分文件的程序

前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。

import org.apache.log4j.LogManager;

import org.apache.log4j.Logger;

import java.io.*;

import java.util.*;

import java.util.concurrent.*;

public class FileSplitUtil {

private final static Logger log = LogManager.getLogger(FileSplitUtil.class);

private static final long originFileSize = 1024 * 1024 * 100;// 100M

private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文乱码,必须取2的N次方

/**

* CVS文件分隔符

*/

private static final char cvsSeparator = '^';

public static void main(String args[]){

long start = System.currentTimeMillis();

try {

String fileName = "D:\\csvtest\\aa.csv";

File sourceFile = new File(fileName);

if (sourceFile.length() >= originFileSize) {

String cvsFileName = fileName.replaceAll("\\\\", "/");

FileSplitUtil fileSplitUtVcgOmyquil = new FileSplitUtil();

List parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);

for(String part:parts){

System.out.println("partName is:"+part);

}

}

System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:" + (System.currentTimeMillis() - start) + "ms.");

}catch (Exception e){

log.info(e.getStackTrace());

}

}

/**

* 拆分文件

*

* @param fileName 待拆分的完整文件名

* @param byteSize 按多少字节大小拆分

* @return 拆分后的文件名列表

*/

public List splitBySize(String fileName, int byteSize)

throws IOException, InterruptedException {

List parts = new ArrayList();

File file = new File(fileName);

int count = (int) Math.ceil(file.length() / (double) byteSize);

int countLen = (count + "").length();

RandomAccessFile raf = new RandomAccessFile(fileName, "r");

long totalLen = raf.length();

CountDownLatch latch = new CountDownLatch(count);

for (int i = 0; i < count; i++) {

String partFileName = file.getPath() + "."

+ leftPad((i + 1) + "", countLen, '0') + ".cvs";

int readSize=byteSize;

long startPos=(long)i * byteSize;

long nextPos=(long)(i+1) * byteSize;

if(nextPos>totalLen){

readSize= (int) (totalLen-startPos);

}

new SplitRunnable(readSize, startPos, partFileName, file, latch).run();

parts.add(partFileName);

}

latch.await();//等待所有文件写完

//由于切割时可能会导致行被切断,加工所有的的分割文件,合并行

mergeRow(parts);

return parts;

}

/**

* 分割处理Runnable

*

* @author supeidong

*/

private class SplitRunnable implements Runnable {

int byteSize;

String partFileName;

File originFile;

long startPos;

CountDownLatch latch;

public SplitRunnable(int byteSize, long startPos, String partFileName,

File originFile, CountDownLatch latch) {

this.startPos = startPos;

this.byteSize = byteSize;

this.partFileName = partFileName;

this.originFile = originFile;

this.latch = latch;

}

public void run() {

RandomAccessFile rFile;

OutputStream os;

try {

rFile = new RandomAccessFile(originFile, "r");

byte[] b = new byte[byteSize];

rFile.seek(startPos);// 移动指针到每“段”开头

int s = rFile.read(b);

os = new FileOutputStream(partFileName);

os.write(b, 0, s);

os.flush();

os.close();

latch.countDown();

} catch (IOException e) {

log.error(e.getMessage());

latch.countDown();

}

}

}

/**

* 合并被切断的行

*

* @param parts

*/

private void mergeRow(List parts) {

List partFiles = new ArrayList();

try {

//组装被切分表对象

for (int i=0;i

String partFileName=parts.get(i);

File splitFileTemp = new File(partFileName);

if (splitFileTemp.exists()) {

PartFile partFile = new PartFile();

BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));

String firstRow = reader.readLine();

String secondRow = reader.readLine();

String endRow = readLastLine(partFileName);

partFile.setPartFileName(partFileName);

partFile.setFirstRow(firstRow);

partFile.setEndRow(endRow);

if(i>=1){

String prePartFile=parts.get(i - 1);

String preEndRow = readLastLine(prePartFile);

partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));

}

partFiles.add(partFile);

reader.close();

}

}

//进行需要合并的行的写入

for (int i = 0; i < partFiles.size() - 1; i++) {

PartFile partFile = partFiles.get(i);

PartFile partFileNext = partFiles.get(i + 1);

StringBuilder sb = new StringBuilder();

if (partFileNext.getFirstIsFull()) {

sb.append("\r\n");

sb.append(partFileNext.getFirstRow());

} else {

sb.append(partFileNext.getVcgOmyquFirstRow());

}

writeLastLine(partFile.getPartFileName(),sb.toString());

}

} catch (Exception e) {

log.error(e.getMessage());

}

}

/**

http://* 得到某个字符出现的次数

* @param s

* @return

*/

private int getCharCount(String s) {

int count = 0;

for (int i = 0; i < s.length(); i++) {

if (s.charAt(i) == cvsSeparator) {

count++;

}

}

return count;

}

/**

* 采用BufferedInputStream方式读取文件行数

*

* @param filename

* @return

*/

public int getFileRow(String filename) throws IOException {

InputStream is = new BufferedInputStream(new FileInputStream(filename));

byte[] c = new byte[1024];

int count = 0;

int readChars = 0;

while ((readChars = is.read(c)) != -1) {

for (int i = 0; i < readChars; ++i) {

if (c[i] == '\n')

++count;

}

}

is.close();

return count;

}

/**

* 读取最后一行数据

* @param filename

* @return

* @throws IOException

*/

private String readLastLine(String filename) throws IOException {

// 使用RandomAccessFile , 从后找最后一行数据

RandomAccessFile raf = new RandomAccessFile(filename, "r");

long len = raf.length();

String lastLine = "";

if(len!=0L) {

long pos = len - 1;

while (pos > 0) {

pos--;

raf.seek(pos);

if (raf.readByte() == '\n') {

lastLine = raf.readLine();

lastLine=new String(lastLine.getBytes("8859_1"), "gbk");

break;

}

}

}

raf.close();

return lastLine;

}

/**

* 修改最后一行数据

* @param fileName

* @param lastString

* @return

* @throws IOException

*/

private void writeLastLine(String fileName,String lastString){

try {

// 打开一个随机访问文件流,按读写方式

RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");

// 文件长度,字节数

long fileLength = randomFile.length();

//将写文件指针移到文件尾。

randomFile.seek(fileLength);

//此处必须加gbk,否则会出现写入乱码

randomFile.write(lastString.getBytes("gbk"));

randomFile.close();

} catch (IOException e) {

log.error(e.getMessage());

}

}

/**

* 左填充

*

* @param str

* @param length

* @param ch

* @return

*/

public static String leftPad(String str, int length, char ch) {

if (str.length() >= length) {

return str;

}

char[] chs = new char[length];

Arrays.fill(chs, ch);

char[] src = str.toCharArray();

System.arraycopy(src, 0, chs, length - src.length, src.length);

return new String(chs);

}

/**

* 合并文件行内部类

*/

class PartFile {

private String partFileName;

private String firstRow;

private String endRow;

private boolean firstIsFull;

public String getPartFileName() {

return partFileName;

}

public void setPartFileName(String partFileName) {

this.partFileName = partFileName;

}

public String getFirstRow() {

return firstRow;

}

public void setFirstRow(String firstRow) {

this.firstRow = firstRow;

}

public String getEndRow() {

return endRow;

}

public void setEndRow(String endRow) {

this.endRow = endRow;

}

public boolean getFirstIsFull() {

return firstIsFull;

}

public void setFirstIsFull(boolean firstIsFull) {

this.firstIsFull = firstIsFull;

}

}

}

以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。

String partFileName=parts.get(i);

File splitFileTemp = new File(partFileName);

if (splitFileTemp.exists()) {

PartFile partFile = new PartFile();

BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));

String firstRow = reader.readLine();

String secondRow = reader.readLine();

String endRow = readLastLine(partFileName);

partFile.setPartFileName(partFileName);

partFile.setFirstRow(firstRow);

partFile.setEndRow(endRow);

if(i>=1){

String prePartFile=parts.get(i - 1);

String preEndRow = readLastLine(prePartFile);

partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));

}

partFiles.add(partFile);

reader.close();

}

}

//进行需要合并的行的写入

for (int i = 0; i < partFiles.size() - 1; i++) {

PartFile partFile = partFiles.get(i);

PartFile partFileNext = partFiles.get(i + 1);

StringBuilder sb = new StringBuilder();

if (partFileNext.getFirstIsFull()) {

sb.append("\r\n");

sb.append(partFileNext.getFirstRow());

} else {

sb.append(partFileNext.getVcgOmyquFirstRow());

}

writeLastLine(partFile.getPartFileName(),sb.toString());

}

} catch (Exception e) {

log.error(e.getMessage());

}

}

/**

http://* 得到某个字符出现的次数

* @param s

* @return

*/

private int getCharCount(String s) {

int count = 0;

for (int i = 0; i < s.length(); i++) {

if (s.charAt(i) == cvsSeparator) {

count++;

}

}

return count;

}

/**

* 采用BufferedInputStream方式读取文件行数

*

* @param filename

* @return

*/

public int getFileRow(String filename) throws IOException {

InputStream is = new BufferedInputStream(new FileInputStream(filename));

byte[] c = new byte[1024];

int count = 0;

int readChars = 0;

while ((readChars = is.read(c)) != -1) {

for (int i = 0; i < readChars; ++i) {

if (c[i] == '\n')

++count;

}

}

is.close();

return count;

}

/**

* 读取最后一行数据

* @param filename

* @return

* @throws IOException

*/

private String readLastLine(String filename) throws IOException {

// 使用RandomAccessFile , 从后找最后一行数据

RandomAccessFile raf = new RandomAccessFile(filename, "r");

long len = raf.length();

String lastLine = "";

if(len!=0L) {

long pos = len - 1;

while (pos > 0) {

pos--;

raf.seek(pos);

if (raf.readByte() == '\n') {

lastLine = raf.readLine();

lastLine=new String(lastLine.getBytes("8859_1"), "gbk");

break;

}

}

}

raf.close();

return lastLine;

}

/**

* 修改最后一行数据

* @param fileName

* @param lastString

* @return

* @throws IOException

*/

private void writeLastLine(String fileName,String lastString){

try {

// 打开一个随机访问文件流,按读写方式

RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");

// 文件长度,字节数

long fileLength = randomFile.length();

//将写文件指针移到文件尾。

randomFile.seek(fileLength);

//此处必须加gbk,否则会出现写入乱码

randomFile.write(lastString.getBytes("gbk"));

randomFile.close();

} catch (IOException e) {

log.error(e.getMessage());

}

}

/**

* 左填充

*

* @param str

* @param length

* @param ch

* @return

*/

public static String leftPad(String str, int length, char ch) {

if (str.length() >= length) {

return str;

}

char[] chs = new char[length];

Arrays.fill(chs, ch);

char[] src = str.toCharArray();

System.arraycopy(src, 0, chs, length - src.length, src.length);

return new String(chs);

}

/**

* 合并文件行内部类

*/

class PartFile {

private String partFileName;

private String firstRow;

private String endRow;

private boolean firstIsFull;

public String getPartFileName() {

return partFileName;

}

public void setPartFileName(String partFileName) {

this.partFileName = partFileName;

}

public String getFirstRow() {

return firstRow;

}

public void setFirstRow(String firstRow) {

this.firstRow = firstRow;

}

public String getEndRow() {

return endRow;

}

public void setEndRow(String endRow) {

this.endRow = endRow;

}

public boolean getFirstIsFull() {

return firstIsFull;

}

public void setFirstIsFull(boolean firstIsFull) {

this.firstIsFull = firstIsFull;

}

}

}

以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:浅谈javaSE 面向对象(Object类toString)
下一篇:实例讲解Java读取一般文本文件和word文档的方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~