Python编写开发MapReduce程序入门教程(python mapreduce例子)

网友投稿 426 2022-06-14


Hadoop作为目前大数据处理的主流平台框架,MapReduce和HDFS是其主要的组成组件。使用MapReduce可以顺利完成大数据分析任务,其经典的应用场景就是使用Java语言编写MapReduce任务。除此之外,MapReduce还兼容了Streaming方式,为其他语言提供使用MapReduce完成大数据分析的接口。本文章主要介绍如何使用Python语言来编写MapReduce程序进行模拟分布式计算,对于不擅长Java语言但熟悉使Python语言(大数据分析必用语言)的人来说是福音。

模拟分布式计算

作为分布式计算的入门例子 wordCount ,是统计大文件中每个单词的个数。任务非常简单,如果这个文件的大小超过了单机的内存,处理起来很困难,我们需要借助集群来完成这个统计任务。下图是本文程序的流程:

首先导入使用的包

from functools import reduce

import numpy as np

from typing import List, Dict

from collections import defaultdict

首先我们模拟生成单词文件,每个单词假设3个字母。输出文件名称为 words.txt .

def generate_random_words_file(sample_num: int = 100):

"""

生成随机字母的样本文件

:param sample_num: 样本数

"""

# 97-122 ==> a-z

with open("words.txt", "w") as fd:

random_words = np.random.randint(97, 110, (3, sample_num))

for row in range(sample_num):

words = "".join([chr(each) for each in random_words[:, row]])

fd.write(words + "\n")

接下来读取单词文件,然后进行分块。我们就直接分块读取,默认块大小为10个单词。

def read_file_by_chunk(lines: int = 10) -> List[List[str]]:

"""

分块读取文件

:param lines: 行数

"""

res = []

with open("words.txt") as fd:

tmp_chunk = []

for idx, line in enumerate(fd.readlines(), start=1):

tmp_chunk.append(line.strip())

if idx % lines == 0:

res.append(tmp_chunk)

tmp_chunk = []

return res

对大文件进行分块以后,接下来对每个块文件进行map映射,统计出每个块的单词数量,返回字典数据,映射函数作为map的参数。

def map_count(data: List[str]) -> Dict[str, int]:

"""

统计单词个数

"""

word_count = defaultdict(int)

for item in data:

word_count[item] += 1

return word_count

有了映射函数,接下来写归纳操作。归纳操作的函数只需要完成两个map的计算,这个函数将作为reduce的参数,将所有map结果进行归纳。

def reduce_count(data1: Dict[str, int], data2: Dict[str, int]) -> Dict[str, int]:

"""

合并两个map

"""

for k, v in data2.items():

data1[k] = data1[k] + data2[k]

return data1

定义完map和reduce以后接下来完成主函数.模拟生成10万个单词的文件,每个数据块100个单词,通过MapReduce计算以后,输出词频最高的前10个单词。

if __name__ == '__main__':

generate_random_words_file(sample_num=100000)

data_chunk = read_file_by_chunk(lines=100) # 数据切片

map_res = map(map_count, data_chunk) # map

reduce_res = reduce(reduce_count, map_res) # reduce

reduce_res = sorted(reduce_res.items(), key=lambda x: x[1], reverse=True) # 排序

for each in reduce_res[:10]:

print(each)

输出结果:

('ckk', 72)

('cak', 72)

('lae', 72)

('mah', 68)

('abe', 67)

('gcg', 67)

('jlg', 66)

('hmf', 66)

('bmd', 65)

('jem', 64)

总结


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何用Python兼职赚外快,月入上万,再也不用手头紧了!(Python怎么兼职赚钱)
下一篇:python初级编程实现九九乘法表(python九九乘法表编程解释)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~