Spark累加器有哪些特点和优势?

2020-09-02 08:58发布

2条回答
我是大脸猫
2020-09-02 10:30

累加器

累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

 

举例代码:

object TestAccu {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TestHBase")

    //2.创建SparkContext
    val sc = new SparkContext(sparkConf)

    //3.定义一个变量
    var sum = 0

    //4.创建一个RDD
    val numRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4), 2)

    //5.转换为元组并将变量自增
    val numToOne: RDD[(Int, Int)] = numRDD.map(x => {

      sum+=1

      (x, 1)
    })

    //6.打印结果
    numToOne.foreach(println)

    numToOne.collect()

    println("********************")

    //7.打印变量
    println(sum)

    //8.关闭连接
    sc.stop()

  }

}

运行结果

分析:

Driver端的sum是通过序列化形式传输到executor端,因此executor端对sum所做的操作,对Driver端没有影响,并且print(sum)是直接在Driver端执行的。

因此,这个时候需要用到累加器

代码修改:

object TestAccu {

  def main(args: Array[String]): Unit = {

    //1.创建SparkConf
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("TestHBase")

    //2.创建SparkContext
    val sc = new SparkContext(sparkConf)

    //3.定义一个变量
    var sum: LongAccumulator = sc.longAccumulator("sum")

    //4.创建一个RDD
    val numRDD: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4), 2)

    //5.转换为元组并将变量自增
    val numToOne: RDD[(Int, Int)] = numRDD.map(x => {

      sum.add(1)

      (x, 1)
    })

    //6.打印结果
    numToOne.foreach(println)

    numToOne.collect()

    println("********************")

    //7.打印变量
    println(sum.value)

    //8.关闭连接
    sc.stop()

  }

}

运行结果:


一周热门 更多>