java中的接口是类吗
273
2022-11-06
【Spark】RDD的Shuffle和分区
文章目录
RDD的特性一:RDD的Shuffle和分区
一、分区的作用二、分区和shuffle的关系、特点三、查看分区的方式四、指定分区的方式
1.创建RDD时指定分区2.读取数据时指定分区数3.重分区的方式4.通过其它算子指定分区数
(1)其他算子指定分区数(2)通过partitioner分区函数
五、Shuffle
RDD的特性一:RDD的Shuffle和分区
一、分区的作用
1. RDD 经常需要通过读取外部数据来创建,外部数据存储系统往往是支持分片的,分区后能够使得与外部系统的分片一一对应
2. RDD 分区后支持了并行运算,也就是说分区是实现RDD并行运算的一个手段
二、分区和shuffle的关系、特点
RDD分区的主要作用是支持并行运算,本质上与shuffle没什么关系。但是往往在进行数据处理的时候,例如 reduceByKey、groupByKey等聚合操作,要把Key相同的value值聚合到一起,有可能相同的key的value不在同一个分区中,所以要理解了分区,shuffle自然就可以简单些理解
shuffle操作的特点:
除了 repartition 算子外,只有Key-Value型的RDD数据才可以进行shuffle操作shuffle 是从早期的 Hash base shuffle 进化而来的,更适用于大吞吐的场景
RDD 的 shuffle 原理 — Hash base shuffle 和 Sort base shuffle:
Hash base shuffle 大致原理是分桶: * 假设Reducer有R个,则每个Mapper中有R个桶,按照Key的Hash将数据映射到不同桶中,Reduce找到每个Mapper中对应自己的桶拉取数据 * 假设整个Mapper的数量为M,那么整个集群的文件数量为M*R,过多的文件会导致系统文件打开过多的文件描述符,占用资源,只适合处理中小文件。 Sort base shuffle 每个Map侧只产生一个输出文件,Reduce侧的Task来进行拉取: * Map侧将数据全部放入一个叫AppendOnlyMap的组件中,同时可以进行聚合操作 * 然后通过类似于MergeSort的排序算法TimeSort对AppendOnlyMap底层的Array排序 - 先按照Partition ID排,再按照HashCode排 * 最终每个MapTask生成一个输出文件,ReduceTask来来拉取自己对应的数据
返回顶部
三、查看分区的方式
1.使用Web UI
登陆spark-shell时,默认创建的是6个分区,并且系统会默认分配一个UI端口,登陆到该UI也可以看到分区数。
2.使用partitions.size查看
当我们在登陆spark-shell的时候通过--master local[m]可以配置初始分区数,这里我们设置为8,然后再用 partitions.size 进行查看。
返回顶部
四、指定分区的方式
1.创建RDD时指定分区
@Test def repartitionTest: Unit ={ // 创建RDD时指定分区 val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.repartition(4).partitions.size) // 4 println(rdd.repartition(1).partitions.size) // 1 }
返回顶部
2.读取数据时指定分区数
读取数据时指定分区的数量,但是这只是最小分区数
@Test def textFileTest: Unit ={ // 读取外部数据时也可以指定分区的数量,但是这只是最小分区数 val data: RDD[String] = sc.textFile("src/main/scala/Rdd算子/测验/fix_1.csv",2) println(data.partitions.size) // 3 }
返回顶部
3.重分区的方式
使用repartition、coalesce算子(详情参见)
@Test def repartitionTest: Unit ={ // 创建RDD时指定分区 val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.repartition(4).partitions.size) // 4 println(rdd.repartition(1).partitions.size) // 1 }
@Test def coalesceTest: Unit ={ val rdd = sc.parallelize(Seq(1,2,3,4,5,6,7,8),2) println(rdd.coalesce(4).partitions.size) // 不是4,还是2 println(rdd.coalesce(4,shuffle = true).partitions.size) // 4 println(rdd.coalesce(1).partitions.size) // 1 }
返回顶部
4.通过其它算子指定分区数
(1)其他算子指定分区数
除了上面的算子以外还有许多RDD聚合类算子也都支持设定分区数~
一般情况下涉及shuffle操作的算子都允许重新指定分区数;
一般这些算子,可以在最后一个参数的位置传入新的分区数,若果没有指定新的分区数,默认从父RDD中继承分区数。
(2)通过partitioner分区函数
通过 定义一个HashPartitioner分区函数实现:
HashPartitioner底层是继承自Partitioner的:
需要复写两个函数,第一个指定要分区的数目,第二个通过key来进行分区。
返回顶部
五、Shuffle
从RDD代码运行的轨迹来看shuffle操作
从分区看shuffle过程 ----- 每一个分区都要将数据通过shuffle操作将不同的数据集从Mapper端发送到Reducer端。
一、RDD的Shuffle原理<两种shuffle方式> — Hash base shuffle 和 Sort base shuffle
概念区分: partitioner用于计算一个数据应该发往哪个机器上 hash base 和 sort base用于描述中间过程如何存放文件
Hash base shuffle 大致原理是分桶: * 假设Reducer有R个,则每个Mapper中有R个桶,按照Key的Hash将数据映射到不同桶中,Reduce找到每个Mapper中对应自己的桶拉取数据 * 假设整个Mapper的数量为M,那么整个集群的文件数量为M*R,过多的文件会导致系统文件打开过多的文件描述符,占用资源,只适合处理中小文件。
Sort base shuffle 每个Map侧只产生一个输出文件,Reduce侧的Task来进行拉取: * Map侧将数据全部放入一个叫AppendOnlyMap的组件中,同时可以进行聚合操作 * 然后通过类似于MergeSort的排序算法TimeSort对AppendOnlyMap底层的Array排序 - 先按照Partition ID排,再按照HashCode排 * 最终每个MapTask生成一个输出文件,ReduceTask来来拉取自己对应的数据
返回顶部
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~