列举几个常见的Spark的transformation算子,功能是什么?

2020-07-30 10:02发布

2条回答
D滴滴
2楼 · 2020-07-30 10:08

1)map(func):返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成.

2)mapPartitions(func):类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。

3)reduceByKey(func,[numTask]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

4)aggregateByKey (zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U: 在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

 5)combineByKey(createCombiner: V=>C, mergeValue: (C, V) =>C, mergeCombiners: (C, C) =>C):

对相同K,把V合并成一个集合。

1.createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

2.mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

3.mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。


我的网名不再改
3楼 · 2020-07-31 15:04

spark常用算子有两种:

  • transformation:RDD中所有转换算子都是延迟加载,从一个RDD到另一个RDD转换没有立即转换,仅记录数据的逻辑操作,只有要求结果还回到Driver时的动作时才会真正运行。

  • action:触发action时才会真正的执行action操作动作
    transformation常用算子类型如下:
    1.textFile (path: String) : RDD[String] 读取hdfs的文本数据,还回String元素的RDD,文本一行即为RDD一行;
    val lineRdd: RDD[String] = sc.textFile(file_path, numPartitions)
    2. mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]:该算子与map算子类似,每个分区对RDD先map再聚集在到一起,map元素时可共享分区内资源,当需要额外对象数据时mapPartitions算子比map效率高。
    mapPartitionsWithIndex [U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] f函数参数包含分区编号和该分区对应的数据集合两个参数,在转换的时候可以把分区index数据加上;参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。


  • 3.filterByRange(lower: K, upper: K): RDD[P]:以RDD中元素key的范围做过滤,包含lower和upper上下边界
    val rdd = sc.parallelize(List((2, 21), (9, 2), (5, 3), (6, 3), (3, 21), (10, 21)), 2)
    rdd.filterByRange(3, 9)
    4.flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)]:对元组的value进行业务逻辑操作还回集合,并分别与key进行组合



    5.combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,...): RDD[(K, C)]:该算子是比较底层的算子,groupByKey和reduceByKey是基于此实现;在shuffle前各个partition内先以key做local聚集,会还回每个分区内key对应的C中间值,在shuffle后再合并各个key对应的C。有三个关键的函数,首先分区内独立处理数据,在分区内遍历所有元素把相同key聚合到一起,若key首次出现createCombiner函数把元素V转为C类型还回,若分区内key已经存在mergeValue函数会把对应V与对应的C做合并;分区内处理完成后,若key对应2个以上分区mergeCombiners函数把key对应的各个分区结果C放在一起合并。
    reduceByKey (func: (V, V) => V, numPartitions: Int): RDD[(K, V)] 相同的key对value做聚合,先分区内再整体做聚合,还回与value相同的数据类型;
    foldByKey(zeroValue: V,...)(func: (V, V) => V): RDD[(K, V)]:该算子通过调用combineByKey算子实现,先在各个partition内以key做聚集,分区内首次出现key对应的value通过调用createCombiner对V进行V=>V+zeroValue操作,然后再按key通过func函数对分区内V与余下V进行合并调用mergeValue函数,各个分区的结果也按key聚合通过func函数完成合并调用mergeCombiners函数。


    6.aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)]:算子把相同的key聚集在一起,聚集流程与aggregate算子类似;seqOp函数在分区内先以相同的key做聚集,zeroValue与分区内第一个元素做聚集再依次与剩余元素聚集;在combOp函数中初始值zeroValue不参与聚集,各个分区聚集结果一起聚集还回结果。


    7.coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]:对RDD重新分区,默认不shuffle;
    repartition(numPartitions: Int): RDD[T]:重新分区会shuffle
    rdd.partitions.length 分区数量
    9.keyBy[K](f: T => K): RDD[(K, T)]:对RDD元素新加入一个key,旧的元素元素做为value组合成新的二元组元素
    keys: RDD[K]:获取RDD所有的key组合成新的RDD
    values: RDD[V]:获取所有value组合新的RDD


    1. val rdd = sc.parallelize(List("abc", "abcd", "ab", "bcd", "bc", "bcde"), 2)

    2. // keyBy

    3. val rdd2 = rdd.keyBy(_.size)

    4. println(rdd2.collect().toBuffer)

    5. // keys

    6. val keys = rdd2.keys

    7. println(keys.collect().toBuffer)

    8. // values

    9. val values = rdd2.values

    10. println(values.collect().toBuffer)

    11. /**

    12. ArrayBuffer((3,abc), (4,abcd), (2,ab), (3,bcd), (2,bc), (4,bcde))

    13. ArrayBuffer(3, 4, 2, 3, 2, 4)

    14. ArrayBuffer(abc, abcd, ab, bcd, bc, bcde)

    15. */


    16. /**

    17.    * 分区内初始值zeroValue与每个元素依次聚集

    18.    *

    19.    * @param zeroValue

    20.    * @param value

    21.    * @return

    22.    */

    23. def seqOp(zeroValue: ArrayBuffer[Int], value: Int): ArrayBuffer[Int] = {

    24. println("zeroValue:" + zeroValue + "\tvalue:" + value)

    25. zeroValue += value

    26. }


    27. /**

    28.    * 各个分区聚集后的结果再依次聚集

    29.    *

    30.    * @param a

    31.    * @param b

    32.    * @return

    33.    */

    34. def combOp(a: ArrayBuffer[Int], b: ArrayBuffer[Int]): ArrayBuffer[Int] = {

    35. println("a:" + a + "\tb:" + b)

    36. a ++ b

    37. }

    38. val rdd = sc.parallelize(List(("a", 21), ("b1", 2), ("a", 22), ("b2", 1), ("a", 23), ("c1", 1), ("a", 24),("c2", 1)), 2)

    39. val rdd2 = rdd.aggregateByKey(ArrayBuffer[Int](88))(seqOp, combOp)

    40. println(rdd2.collect().toBuffer)


    41. /**

    42. zeroValue:ArrayBuffer(88) value:21

    43. zeroValue:ArrayBuffer(88) value:2

    44. zeroValue:ArrayBuffer(88, 21) value:22

    45. zeroValue:ArrayBuffer(88) value:1

    46. zeroValue:ArrayBuffer(88) value:23

    47. zeroValue:ArrayBuffer(88) value:1

    48. zeroValue:ArrayBuffer(88, 23) value:24

    49. zeroValue:ArrayBuffer(88) value:1

    50. a:ArrayBuffer(88, 21, 22) b:ArrayBuffer(88, 23, 24)

    51. ArrayBuffer((b2,ArrayBuffer(88, 1)), (c1,ArrayBuffer(88, 1)), (a,ArrayBuffer(88, 21, 22, 88, 23, 24)), (c2,ArrayBuffer(88, 1)), (b1,ArrayBuffer(88, 2)))

    52. 说明:zeroValue在分区内先按相同key与每个元素依次聚集,各个分区结果再依次聚集zeroValue不参与,故2个分区结果有2个88

    53. */

    54. /**

    55.    * 分区内key首次出现第一个元素转为C

    56.    *

    57.    * @param value

    58.    * @return

    59.    */

    60. def createCombiner(value: Int): List[Int] = {

    61. println("create value:" + value)

    62. List(value)

    63. }


    64. /**

    65.    * 分区内key再次出现把V与对应C做合并

    66.    *

    67.    * @param list

    68.    * @param value

    69.    * @return

    70.    */

    71. def mergeValue(list: List[Int], value: Int): List[Int] = {

    72. println("merge value:" + value)

    73. list :+ (value)

    74. }


    75. /** *

    76.    * 若key对应2+个分区,则合并key对应的各个分区聚集结果C

    77.    *

    78.    * @param a

    79.    * @param b

    80.    * @return

    81.    */

    82. def mergeCombiners(a: List[Int], b: List[Int]): List[Int] = {

    83. println("a:" + a.toBuffer + "\tb:" + b.toBuffer)

    84. a ++ b

    85. }


    86. // rdd data

    87. val rdd = sc.parallelize(List(("a", 21), ("b1", 2), ("a", 22), ("b2", 1), ("a", 23), ("c1", 1), ("a", 24), ("c2", 1)), 2)

    88. // combineByKey

    89. val rdd2 = rdd.combineByKey(createCombiner, mergeValue, mergeCombiners)

    90. println("combineByKey result:" + rdd2.collect().toBuffer)

    91. // reduceByKey

    92. val rdd3 = rdd.reduceByKey((pre: Int, after: Int) => (pre + after))

    93. println("reduceByKey result:"+rdd3.collect().toBuffer)

    94. // reduceByKey

    95. val rdd4= rdd.foldByKey(100)(_+_)

    96. println("foldByKey result:"+rdd4.collect().toBuffer)

    97. /**

    98. create value:21

    99. create value:1

    100. merge value:22

    101. create value:1

    102. create value:23

    103. create value:1

    104. merge value:24

    105. create value:1

    106. a:ArrayBuffer(121, 22) b:ArrayBuffer(123, 24)

    107. combineByKey result:ArrayBuffer((b2,List(101)), (c1,List(101)), (a,List(121, 22, 123, 24)), (c2,List(101)), (b1,List(101)))

    108. reduceByKey result:ArrayBuffer((b2,1), (c1,1), (a,90), (c2,1), (b1,1))

    109. foldByKey result:ArrayBuffer((b2,101), (c1,101), (a,290), (c2,101), (b1,101))

    110. */

    111. val rdd = sc.parallelize(List((2, "a b c"), (5, "q w e"), (2, "x y z"), (6, "t y")), 2)

    112. rdd.flatMapValues(_.split(" ")).collect()

    113. /**

    114. Array((2,a), (2,b), (2,c), (5,q), (5,w), (5,e), (2,x), (2,y), (2,z), (6,t), (6,y))

    115. */

    1. val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)

    2. // mapPartitions算子

    3. rdd.mapPartitions {

    4. val value = { //map外连接资源 }

    5. iterator => iterator.map(_ * value)

    6. }

    7. // mapPartitionsWithIndex算子

    8. val partitionIndex = (index: Int, iter: Iterator[Int]) => {

    9. iter.toList.map(item => "index:" + index + ": value: " + item).iterator

    10. }

    11. rdd.mapPartitionsWithIndex(partitionIndex, true).foreach(println(_))

    12. /**

    13. index:0: value: 1

    14. index:0: value: 2

    15. index:1: value: 3

    16. index:1: value: 4

    17. index:2: value: 5

    18. index:2: value: 6

    19. */


    相关问题推荐

    • 什么是大数据时代?2021-01-13 21:23
      回答 100

      大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

    • 回答 84

      Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

    • 回答 52
      已采纳

      学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

    • 回答 29

      简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

    • 回答 14

      tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

    • 回答 18

      您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

    • 回答 33

      大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

    • 回答 17

      最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

    • 回答 5

      MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

    • mysql安装步骤mysql 2022-05-07 18:01
      回答 2

      mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

    • 回答 5

      1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

    • 回答 5

      CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

    • 回答 9

      学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

    • 回答 7

      添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

    • 回答 5

      看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

    • 回答 7

      查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

    没有解决我的问题,去提问