java实现MapReduce对文件进行切分的示例代码

网友投稿 364 2022-09-02


java实现MapReduce对文件进行切分的示例代码

比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。那么我们该怎样解决海量数据的计算?

1、获取总行数2、计算每个文件中存多少数据3、split切分文件4、reduce将文件进行汇总

例如这里有百万条数据,单个文件操作太麻烦,所以我们需要进行切分在切分文件的过程中会出现文件不能整个切分的情况,可能有剩下的数据并没有被读取到,所以我们每个切分128条数据,不足128条再保留到一个文件中

创建MapTask

import java.io.*;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

public class MapTask extends Thread {

//用来接收具体的哪一个文件

private File file;

private int flag;

public MapTask(File file, int flag) {

this.file = file;

this.flag = flag;

}

@Override

public void run() {

try {

BufferedReader br = new BufferedReader(new FileReader(file));

String line;

HashMap map = new HashMap();

while ((line = br.readLine()) != null) {

/**

* 统计班级人数HashMap存储

*/

String clazz = line.split(",")[4];

if (!map.containsKey(clazz)) {

map.put(clazz, 1);

} else {

map.put(clazz, map.get(clazz) + 1);

}

}

br.close();

BufferedWriter bw = new BufferedWriter(

new FileWriter("F:\\IDEADEMO\\shujiabigdata\\part\\part---" + flag));

Set> entries = map.entrySet();

for (Map.Entry entry : entries) {

String key = entry.getKey();

Integer value = entry.getValue();

bw.write(key + ":" + value);

bw.newLine();

}

bw.flush();

bw.close();

} catch (Exception e) {

e.printStackTrace();

}

}

}

创建Map

import java.io.File;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class Map {

public static void main(String[] args) {

long start = System.currentTimeMillis();

// 多线程连接池(线程池)

ExecutorService executorService = Executors.newFixedThreadPool(8);

// 获取文件列表

File file = new File("F:\\IDEADEMO\\shujiabigdata\\split");

File[] files = file.listFiles();

//创建多线程对象

int flag = 0;

for (File f : files) {

//为每一个文件启动一个线程

MapTask mapTask = new MapTask(f, flag);

executorService.submit(mapTask);

flag++;

}

executorService.shutdown();

long end = System.currentTimeMillis();

System.out.println(end-start);

}

}

创建ClazzSum

import java.io.BufferedReader;

import java.io.FileReader;

import java.util.HashMap;

public class ClazzSum {

public static void main(String[] args) throws Exception {

long start = System.currentTimeMillis();

BufferedReader br = new BufferedReader(

new FileReader("F:\\IDEADEMO\\shujiabigdata\\datahttp://\bigstudents.txt"));

String line;

HashMap map = new HashMap();

while ((line = br.readLine()) != null) {

String clazz = line.split(",")[4];

if (!map.containsKey(clazz)) {

map.put(clazz, 1);

} else {

map.put(clazz, map.get(clazz) + 1);

}

}

System.out.println(map);

long end = System.currentTimeMillis();

System.out.println(end-start);

}

}

创建split128

import java.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.FileReader;

import java.io.FileWriter;

import java.util.ArrayList;

public class Split128 {

public static void main(String[] args) throws Exception {

BufferedReader br = new BufferedReader(

new FileReader("F:\\IDEADEMO\\shujiabigdata\\data\\students.txt"));

//用作标记文件,也作为文件名称

int index = 0;

BufferedWriter bw = new BufferedWriter(

new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));

ArrayList list = new ArrayList();

String line;

//用作累计读取了多少行数据

int flag = 0;

int row = 0;

while ((line = br.readLine()) != null) {

list.add(line);

flag++;

// flag = 140

if (flag == 140) {// 一个文件读写完成,生成新的文件

row = 0 + 128 * index;

for (int i = row; i <= row + 127; i++) {

bw.write(list.get(i));

bw.newLine();

}

bw.flush();

bw.close();

/**

* 生成新的文件

* 计数清零

*/

index++;

flag = 12;

bw = new BufferedWriter(

new FileWriter("F:\\IDEADEMO\\shujiabigdata\\split01\\split---" + index));

}

}

//文件读取剩余128*1.1范围之内

for (int i = list.size() - flag; i < list.size(); i++) {

bw.write(list.get(i));

bw.newLine();

}

bw.flush();

bw.close();

}

}

创建Reduce

import java.io.BufferedReader;

import java.io.File;

import java.io.FileReader;

import java.util.HashMap;

public class Reduce {

public static void main(String[] args) throws Exception {

long start = System.currentTimeMillis();

HashMap map = new HashMap();

File file = new File("F:\\IDEADEMO\\shujiabigdata\\part");

File[] files = file.listFiles();

for (File f : files) {

BufferedReader br = new BufferedReader(new FileReader(f));

String line;

while ((line = br.readLine()) != null) {

String clazz = line.split(":")[0];

int sum = Integer.valueOf(line.split(":")[1]);

if (!map.containsKey(clazz)) {

map.put(clazz, sum);

} else {

map.put(clazz, map.get(clazz) + sum);

}

}

}

long end = System.currentTimeMillis();

System.out.println(end-start);

System.out.println(map);

}

}

最后将文件切分了8份,这里采用了线程池,建立线程连接,多个线程同时启动,比单一文件采用多线程效率更高更好使。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Django学习笔记之Django视图View(django 类视图)
下一篇:Django学习笔记之django-debug-toolbar使用指南(django怎么debug)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~