2020-09-02 08:58发布
累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
举例代码:
object TestAccu { def main(args: Array[String]): Unit = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TestHBase") //2.创建SparkContext val sc = new SparkContext(sparkConf) //3.定义一个变量 var sum = 0 //4.创建一个RDD val numRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4), 2) //5.转换为元组并将变量自增 val numToOne: RDD[(Int, Int)] = numRDD.map(x => { sum+=1 (x, 1) }) //6.打印结果 numToOne.foreach(println) numToOne.collect() println("********************") //7.打印变量 println(sum) //8.关闭连接 sc.stop() } }
运行结果
分析:
Driver端的sum是通过序列化形式传输到executor端,因此executor端对sum所做的操作,对Driver端没有影响,并且print(sum)是直接在Driver端执行的。
因此,这个时候需要用到累加器
代码修改:
object TestAccu { def main(args: Array[String]): Unit = { //1.创建SparkConf val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TestHBase") //2.创建SparkContext val sc = new SparkContext(sparkConf) //3.定义一个变量 var sum: LongAccumulator = sc.longAccumulator("sum") //4.创建一个RDD val numRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4), 2) //5.转换为元组并将变量自增 val numToOne: RDD[(Int, Int)] = numRDD.map(x => { sum.add(1) (x, 1) }) //6.打印结果 numToOne.foreach(println) numToOne.collect() println("********************") //7.打印变量 println(sum.value) //8.关闭连接 sc.stop() } }
运行结果:
最多设置5个标签!
累加器
累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。
举例代码:
运行结果
分析:
Driver端的sum是通过序列化形式传输到executor端,因此executor端对sum所做的操作,对Driver端没有影响,并且print(sum)是直接在Driver端执行的。
因此,这个时候需要用到累加器
代码修改:
运行结果:
一周热门 更多>