reduceByKey与groupByKey的区别?

2020-07-30 10:04发布

2条回答
D滴滴
2楼 · 2020-07-30 10:07

reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。

groupByKey:按照key进行分组,直接进行shuffle。

开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。


我的网名不再改
3楼 · 2020-07-31 15:03

在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。

 

针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接下来讲解《在spark中如何实现SQL中的group_concat功能?》时会用到这两个operations。

 

首先,看一看spark官网[1]是怎么解释的:

reduceByKey(func, numPartitions=None)

 

Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

也就是,reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey(numPartitions=None)

 

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

也就是,groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。

 

为了更好的理解上面这段话,下面我们使用两种不同的方式去计算单词的个数[2]:

 

[java] view plain copy

  1. val words = Array("one", "two", "two", "three", "three", "three")  

  2.   

  3. val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))  

  4.   

  5. val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)  

  6.   

  7. val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))  

上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一样的,但是,它们的内部运算过程是不同的。

 

 

(1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:

 

(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:

因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey。

另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
  (1)、combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
  (2)、foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。

 

最后,对reduceByKey中的func做一些介绍:

如果是用Python写的spark,那么有一个库非常实用:operator[3],其中可以用的函数包括:大小比较函数,逻辑操作函数,数学运算函数,序列操作函数等等。这些函数可以直接通过“from operator import *”进行调用,直接把函数名作为参数传递给reduceByKey即可。如下:

 

[python] view plain copy

  1. from operator import add  

  2. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])  

  3. sorted(rdd.reduceByKey(add).collect())  

  4.   

  5. [('a', 2), ('b', 1)]  

 

下面是附加源码更加详细的解释

转自:https://blog.csdn.net/ZMC921/article/details/75098903

 

一、首先他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,。reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点。

案例:

[plain] view plain copy

 

  1. object GroupyKeyAndReduceByKeyDemo {  

  2.   def main(args: Array[String]): Unit = {  

  3.     Logger.getLogger("org").setLevel(Level.WARN)  

  4.     val config = new SparkConf().setAppName("GroupyKeyAndReduceByKeyDemo").setMaster("local")  

  5.     val sc = new SparkContext(config)  

  6.     val arr = Array("val config", "val arr")  

  7.     val socketDS = sc.parallelize(arr).flatMap(_.split(" ")).map((_, 1))  

  8.     //groupByKey 和reduceByKey 的区别:  

  9.     //他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,  

  10.     //reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点  

  11.     socketDS.groupByKey().map(tuple => (tuple._1, tuple._2.sum)).foreach(x => {  

  12.       println(x._1 + " " + x._2)  

  13.     })  

  14.     println("----------------------")  

  15.     socketDS.reduceByKey(_ + _).foreach(x => {  

  16.       println(x._1 + " " + x._2)  

  17.     })  

  18.     sc.stop()  

  19.   }  

  20. }  

二 、首先groupByKey有三种

查看源码groupByKey()实现了 groupByKey(defaultPartitioner(self))

[java] view plain copy

 

  1. /** 

  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 

  3.    * resulting RDD with the existing partitioner/parallelism level. The ordering of elements 

  4.    * within each group is not guaranteed, and may even differ each time the resulting RDD is 

  5.    * evaluated. 

  6.    * 

  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 

  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 

  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 

  10.    */  

  11.   def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {  

  12.     groupByKey(defaultPartitioner(self))  

  13.   }  

查看源码 groupByKey(numPartitions: Int) 实现了 groupByKey(new HashPartitioner(numPartitions))

[java] view plain copy

 

  1. /** 

  2.    * Group the values for each key in the RDD into a single sequence. Hash-partitions the 

  3.    * resulting RDD with into `numPartitions` partitions. The ordering of elements within 

  4.    * each group is not guaranteed, and may even differ each time the resulting RDD is evaluated. 

  5.    * 

  6.    * @note This operation may be very expensive. If you are grouping in order to perform an 

  7.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 

  8.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 

  9.    * 

  10.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 

  11.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 

  12.    */  

  13.   def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {  

  14.     groupByKey(new HashPartitioner(numPartitions))  

  15.   }  

其实上面两个都是实现了groupByKey(partitioner: Partitioner)

[java] view plain copy

 

  1. /** 

  2.    * Group the values for each key in the RDD into a single sequence. Allows controlling the 

  3.    * partitioning of the resulting key-value pair RDD by passing a Partitioner. 

  4.    * The ordering of elements within each group is not guaranteed, and may even differ 

  5.    * each time the resulting RDD is evaluated. 

  6.    * 

  7.    * @note This operation may be very expensive. If you are grouping in order to perform an 

  8.    * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` 

  9.    * or `PairRDDFunctions.reduceByKey` will provide much better performance. 

  10.    * 

  11.    * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any 

  12.    * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. 

  13.    */  

  14.   def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {  

  15.     // groupByKey shouldn't use map side combine because map side combine does not  

  16.     // reduce the amount of data shuffled and requires all map side data be inserted  

  17.     // into a hash table, leading to more objects in the old gen.  

  18.     val createCombiner = (v: V) => CompactBuffer(v)  

  19.     val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v  

  20.     val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2  

  21.     val bufs = combineByKeyWithClassTag[CompactBuffer[V]](  

  22.       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)  

  23.     bufs.asInstanceOf[RDD[(K, Iterable[V])]]  

  24.   }  

而groupByKey(partitioner: Partitioner)有实现了combineByKeyWithClassTag,所以可以说groupByKey其实底层都是combineByKeyWithClassTag的实现,只是实现的方式不同。

 

 

三、再查看reduceByKey也有三种方式


 

[java] view plain copy

 

  1. /** 

  2.    * Merge the values for each key using an associative and commutative reduce function. This will 

  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 

  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ 

  5.    * parallelism level. 

  6.    */  

  7.   def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {  

  8.     reduceByKey(defaultPartitioner(self), func)  

  9.   }  

[java] view plain copy

 

  1. /** 

  2.    * Merge the values for each key using an associative and commutative reduce function. This will 

  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 

  4.    * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. 

  5.    */  

  6.   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {  

  7.     reduceByKey(new HashPartitioner(numPartitions), func)  

  8.   }  

[java] view plain copy

 

  1. /** 

  2.    * Merge the values for each key using an associative and commutative reduce function. This will 

  3.    * also perform the merging locally on each mapper before sending results to a reducer, similarly 

  4.    * to a "combiner" in MapReduce. 

  5.    */  

  6.   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {  

  7.     combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)  

  8.   }  

通过查看这三种reduceByKey不难发现,前两种是最后一种的实现。而最后一种是又实现了combineByKeyWithClassTag。

 

### groupByKey是这样实现的

combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)

### reduceByKey是这样实现的

combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)

对比上面发现,groupByKey设置了mapSideCombine = false,在map端不进行合并,那就是在shuffle前不合并。而reduceByKey没有设置

难道reduceByKey默认合并吗????

四、接下来,我们仔细看一下combineByKeyWithClassTag

[java] view plain copy

 

  1. /** 

  2.    * :: Experimental :: 

  3.    * Generic function to combine the elements for each key using a custom set of aggregation 

  4.    * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C 

  5.    * 

  6.    * Users provide three functions: 

  7.    * 

  8.    *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) 

  9.    *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) 

  10.    *  - `mergeCombiners`, to combine two C's into a single one. 

  11.    * 

  12.    * In addition, users can control the partitioning of the output RDD, and whether to perform 

  13.    * map-side aggregation (if a mapper can produce multiple items with the same key). 

  14.    * 

  15.    * @note V and C can be different -- for example, one might group an RDD of type 

  16.    * (Int, Int) into an RDD of type (Int, Seq[Int]). 

  17.    */  

  18.   @Experimental  

  19.   def combineByKeyWithClassTag[C](  

  20.       createCombiner: V => C,  

  21.       mergeValue: (C, V) => C,  

  22.       mergeCombiners: (C, C) => C,  

  23.       partitioner: Partitioner,  

  24.       mapSideCombine: Boolean = true,  

  25.       serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {  

  26.     require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0  

  27.     if (keyClass.isArray) {  

  28.       if (mapSideCombine) {  

  29.         throw new SparkException("Cannot use map-side combining with array keys.")  

  30.       }  

  31.       if (partitioner.isInstanceOf[HashPartitioner]) {  

  32.         throw new SparkException("HashPartitioner cannot partition array keys.")  

  33.       }  

  34.     }  

  35.     val aggregator = new Aggregator[K, V, C](  

  36.       self.context.clean(createCombiner),  

  37.       self.context.clean(mergeValue),  

  38.       self.context.clean(mergeCombiners))  

  39.     if (self.partitioner == Some(partitioner)) {  

  40.       self.mapPartitions(iter => {  

  41.         val context = TaskContext.get()  

  42.         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  

  43.       }, preservesPartitioning = true)  

  44.     } else {  

  45.       new ShuffledRDD[K, V, C](self, partitioner)  

  46.         .setSerializer(serializer)  

  47.         .setAggregator(aggregator)  

  48.         .setMapSideCombine(mapSideCombine)  

  49.     }  

  50.   }  

通过查看combineByKeyWithClassTag的,发现reduceByKey默认在map端进行合并,那就是在shuffle前进行合并,如果合并了一些数据,那在shuffle时进行溢写则减少了磁盘IO,所以reduceByKey会快一些。


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问