【Spark】RDD的Shuffle和分区

网友投稿 273 2022-11-06


【Spark】RDD的Shuffle和分区

文章目录

​​RDD的特性一:RDD的Shuffle和分区​​

​​一、分区的作用​​​​二、分区和shuffle的关系、特点​​​​三、查看分区的方式​​​​四、指定分区的方式​​

​​1.创建RDD时指定分区​​​​2.读取数据时指定分区数​​​​3.重分区的方式​​​​4.通过其它算子指定分区数​​

​​(1)其他算子指定分区数​​​​(2)通过partitioner分区函数​​

​​五、Shuffle​​

RDD的特性一:RDD的Shuffle和分区

一、分区的作用

1. RDD 经常需要通过读取外部数据来创建,外部数据存储系统往往是支持分片的,分区后能够使得与外部系统的分片一一对应

2. RDD 分区后支持了并行运算,也就是说分区是实现RDD并行运算的一个手段

二、分区和shuffle的关系、特点

​​RDD分区​​的主要作用是​​支持并行运算​​,本质上与​​shuffle​​没什么关系。但是往往在进行数据处理的时候,例如 ​​reduceByKey​​、​​groupByKey​​等聚合操作,要把​​Key​​相同的​​value​​值聚合到一起,有可能相同的​​key​​的​​value​​不在同一个分区中,所以要理解了分区,​​shuffle​​自然就可以简单些理解

shuffle操作的特点:

除了 repartition 算子外,只有Key-Value型的RDD数据才可以进行shuffle操作shuffle 是从早期的 Hash base shuffle 进化而来的,更适用于大吞吐的场景

RDD 的 shuffle 原理 — Hash base shuffle 和 Sort base shuffle:

Hash base shuffle 大致原理是分桶: * 假设Reducer有R个,则每个Mapper中有R个桶,按照Key的Hash将数据映射到不同桶中,Reduce找到每个Mapper中对应自己的桶拉取数据 * 假设整个Mapper的数量为M,那么整个集群的文件数量为M*R,过多的文件会导致系统文件打开过多的文件描述符,占用资源,只适合处理中小文件。 Sort base shuffle 每个Map侧只产生一个输出文件,Reduce侧的Task来进行拉取: * Map侧将数据全部放入一个叫AppendOnlyMap的组件中,同时可以进行聚合操作 * 然后通过类似于MergeSort的排序算法TimeSort对AppendOnlyMap底层的Array排序 - 先按照Partition ID排,再按照HashCode排 * 最终每个MapTask生成一个输出文件,ReduceTask来来拉取自己对应的数据

​​返回顶部​​

三、查看分区的方式

1.使用Web UI

登陆​​spark-shell​​时,默认创建的是​​6​​个分区,并且系统会默认分配一个​​UI​​端口,登陆到该UI也可以看到分区数。

2.使用​​partitions.size​​查看

当我们在登陆​​spark-shell​​的时候通过​​--master local[m]​​可以配置初始分区数,这里我们设置为​​8​​,然后再用 ​​partitions.size​​ 进行查看。

​​返回顶部​​

四、指定分区的方式

1.创建RDD时指定分区

@Test def repartitionTest: Unit ={ // 创建RDD时指定分区 val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.repartition(4).partitions.size) // 4 println(rdd.repartition(1).partitions.size) // 1 }

​​返回顶部​​

2.读取数据时指定分区数

读取数据时指定分区的数量,但是这只是最小分区数

@Test def textFileTest: Unit ={ // 读取外部数据时也可以指定分区的数量,但是这只是最小分区数 val data: RDD[String] = sc.textFile("src/main/scala/Rdd算子/测验/fix_1.csv",2) println(data.partitions.size) // 3 }

​​返回顶部​​

3.重分区的方式

​​使用repartition、coalesce算子(详情参见)​​

@Test def repartitionTest: Unit ={ // 创建RDD时指定分区 val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.repartition(4).partitions.size) // 4 println(rdd.repartition(1).partitions.size) // 1 }

@Test def coalesceTest: Unit ={ val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.coalesce(4).partitions.size) // 不是4,还是2 println(rdd.coalesce(4,shuffle = true).partitions.size) // 4 println(rdd.coalesce(1).partitions.size) // 1 }

​​返回顶部​​

4.通过其它算子指定分区数

(1)其他算子指定分区数

除了上面的算子以外还有许多RDD聚合类算子也都支持设定分区数~

一般情况下涉及shuffle操作的算子都允许重新指定分区数;

一般这些算子,可以在最后一个参数的位置传入新的分区数,若果没有指定新的分区数,默认从父RDD中继承分区数。

(2)通过partitioner分区函数

通过 定义一个​​HashPartitioner​​分区函数实现:

​​​HashPartitioner​​底层是继承自​​Partitioner​​的:

需要复写两个函数,第一个指定要分区的数目,第二个通过key来进行分区。

​​返回顶部​​

五、Shuffle

从RDD代码运行的轨迹来看shuffle操作

从分区看shuffle过程 ----- 每一个分区都要将数据通过shuffle操作将不同的数据集从Mapper端发送到Reducer端。

一、RDD的Shuffle原理<两种shuffle方式> — Hash base shuffle 和 Sort base shuffle

概念区分: partitioner用于计算一个数据应该发往哪个机器上 hash base 和 sort base用于描述中间过程如何存放文件

Hash base shuffle 大致原理是分桶: * 假设Reducer有R个,则每个Mapper中有R个桶,按照Key的Hash将数据映射到不同桶中,Reduce找到每个Mapper中对应自己的桶拉取数据 * 假设整个Mapper的数量为M,那么整个集群的文件数量为M*R,过多的文件会导致系统文件打开过多的文件描述符,占用资源,只适合处理中小文件。

Sort base shuffle 每个Map侧只产生一个输出文件,Reduce侧的Task来进行拉取: * Map侧将数据全部放入一个叫AppendOnlyMap的组件中,同时可以进行聚合操作 * 然后通过类似于MergeSort的排序算法TimeSort对AppendOnlyMap底层的Array排序 - 先按照Partition ID排,再按照HashCode排 * 最终每个MapTask生成一个输出文件,ReduceTask来来拉取自己对应的数据

​​返回顶部​​


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:从零开始搭建springboot+springcloud+mybatis本地项目全过程(图解)
下一篇:Java.集合.泛型初步
相关文章

 发表评论

暂时没有评论,来抢沙发吧~