多平台统一管理软件接口,如何实现多平台统一管理软件接口
254
2022-11-06
RDD转换操作算子 --- key类
RDD转换操作算子 — key类
reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value
@Test def reduceByKeyTest(): Unit ={ // 1.生成rdd1 val rdd1: RDD[String] = sc.parallelize(Seq("hello china","hello usa","hello uk"))// // 2.执行flatMap转换操作// val rdd2: RDD[String] = rdd1.flatMap(line => line.split(" "))// // 3.执行map转换操作,转换为(key,values)// val rdd3 = rdd2.map(item=>(item,1))// // 3.执行reduceByKey转换操作// val rdd4 = rdd3.reduceByKey((curr,agg)=>curr+agg)// // 4.得到结果// val result = rdd4.collect()// result.foreach(item => println(item)) // 一步生成 val result = rdd1.flatMap(item=>item.split(" ")) .map(item=>(item,1)) .reduceByKey((curr,agg)=>curr+agg) .collect() result.foreach(item=>println(item)) // 5.关闭sc sc.stop() }
foldByKey 和 Spark 中的 reduceByKey 相似,都是按照Key分组去求聚合,但是foldByKey可以指定初始值foldByKey 和 Scala 中的 foldLeft/foldRight 区别是,foldByKey是为每一条数据附上初始值,而Scala中是为整体附上初始值
@Test def foldByKey: Unit ={ sc.parallelize(Seq(("a",1),("b",2),("c",3),("a",1))) .foldByKey(zeroValue = 10)((curr,agg)=>curr+agg) .collect() .foreach(println(_)) } (a,22) (b,12) (c,13)
reduceByKey 表示分组聚合操作,按照Key将数据分组,然后把每一组数据reduce,可以在map端做Combiner ---- 能不能减少I/OgroupByKey 表示分组操作,按照Key将数据分组,列举出Key对应的所有值,不可以在map端做Combiner
@Test def groupByKey: Unit ={ sc.parallelize(Seq(("a",1),("b",2),("c",3),("a",1))) .groupByKey() .collect() .foreach(println(_)) }(a,CompactBuffer(1, 1))(b,CompactBuffer(2))(c,CompactBuffer(3))
combineByKey 是 reduceByKey和groupByKey 的底层,可控性高,对数据集按照key来进行聚合
createCombiner — 将values初步进行转换 (类似于map)mergeValue — 将初步转换的结果聚合mergeCombiners — 在所有分区上把每个分区的聚合结果聚合partitioner — 可选,分区函数mapSideCombiner— 可选,是否在map端Combineserializer — 序列化器
@Test def combineByKeyTest: Unit ={ val rdd: RDD[(String, Double)] = sc.parallelize( Seq(("张三",98.0),("李四",97.0),("王五",87.0), ("张三",94.0),("李四",94.0),("王五",89.0), ("李四",94.0),("王五",88.0)) ) // 1.createCombiner 转换数据的函数(初始函数,只作用在第一条数据,用于开启整个计算) // 2. mergeValue 在分区上的进行聚合 // 3.mergeCombiners 把所有分区的聚合结果再次聚合成最终结果 val sumScore = rdd.combineByKey( createCombiner = (curr:Double) => (curr,1), mergeValue = (curr:(Double,Int),nextValue:Double) => (curr._1 + nextValue,curr._2 + 1), mergeCombiners = (curr:(Double,Int),agg:(Double,Int)) => (curr._1+agg._1,curr._2+agg._2) ) // ("张三“,(98.0+94.0,2)) // (姓名,(总分,科目数)) // 计算平均分 val meanScore = sumScore.map( item => (item._1,item._2._1/item._2._2)) // 获取结果 meanScore.collect().foreach( item => println(item)) }(张三,96.0)(李四,95.0)(王五,88.0)
aggregateByKey 也是按照Key聚合value
zeroValue 初始值seqOp 转换每一个值的函数combOp 聚合函数,聚合的是转换过的值
aggregateByKey 特别适用于先处理数据,后聚合的情况
@Test def aggregateByKeyTest: Unit ={ val rdd = sc.parallelize(Seq(("手机",10.0),("手机",15.0),("电脑",20.0))) rdd.aggregateByKey(zeroValue = 0.8)( (zeroValue,item)=>item*zeroValue , (curr,agg)=>curr+agg ) .collect() .foreach(println(_)) }(手机,20.0)(电脑,16.0)
sortByKey 根据 RDD 的数据类型,按照 Key 排序
含有Key的RDD也可以使用sortBy方法排序,但是普通的RDD不能使用sortByKey排序
@Test def sortByTest: Unit ={ val source = sc.parallelize(Seq(("c",19),("s",12),("h",5),("a",24))) // 含有Key的RDD也可以使用sortBy方法排序,但是普通的RDD不能使用sortByKey排序 source.sortBy(item => item._1) .collect() .foreach(println(_)) println("-"*50) source.sortBy(item => item._2) .collect() .foreach(println(_)) println("-"*50) // 使用sortByKey排序 source.sortByKey() .collect() .foreach(println(_)) }(a,24)(c,19)(h,5)(s,12)
RDD 中的排序操作有 sortBy 和 sortByKey,其中sortBy适用于普通的RDD,sortByKey适用于含有Key-Value键值对的RDD
ascending = true 默认升序 ;ascending = false 降序
@Test def sortByTest: Unit ={ val source = sc.parallelize(Seq(22,15,4,33,51,6,1,24,65,100,0)) source.sortBy(item => item,ascending = false) .collect() .foreach(println(_)) }1006551332422156410
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~