Spark Core】如何自定义累加器

2020-05-27 09:10发布

1条回答
金喆
2楼 · 2020-05-27 09:36




/**
  * Created by Namhwik on 2016/12/27.
  */
class MapAccumulator extends AccumulatorV2[(String,String),mutable.Map[String, String]] {
  private  val mapAccumulator = mutable.Map[String,String]()
  def add(keyAndValue:((String,String))): Unit ={
    val key = keyAndValue._1
    val value = keyAndValue._2
    if (!mapAccumulator.contains(key))
      mapAccumulator += key->value
    else if(mapAccumulator.get(key).get!=value) {
      mapAccumulator += key->(mapAccumulator.get(key).get+"||"+value)
    }
  }
  def isZero: Boolean = {
    mapAccumulator.isEmpty
  }
  def copy(): AccumulatorV2[((String,String)),mutable.Map[String, String]] ={
    val newMapAccumulator = new  MapAccumulator()
    mapAccumulator.foreach(x=>newMapAccumulator.add(x))
    newMapAccumulator
  }
  def value: mutable.Map[String,String] = {
    mapAccumulator
  }
  def merge(other:AccumulatorV2[((String,String)),mutable.Map[String, String]]) = other match
  {
    case map:MapAccumulator => {
      other.value.foreach(x =>
        if (!this.value.contains(x._1))
          this.add(x)
        else
          x._2.split("\\|\\|").foreach(
            y => {
              if (!this.value.get(x._1).get.split("\\|\\|").contains(y))
                this.add(x._1, y)
            }
          )
      )
    }
    case _  =>
      throw new UnsupportedOperationException(
        s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }
  def reset(): Unit ={
    mapAccumulator.clear()
  }
}


相关问题推荐

  • 回答 1

    自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:Collectio...

  • 回答 1

    我们必须把它放在 foreach() 这样的行动操作中。

  • 回答 1

    这俩是一个东西,combineByKey是Spark中一个比较核心的高级函数, groupByKey,reduceByKey的底层都是使用combineByKey实现的,1.6.0版的函数名更新为combineByKeyWithClassTag。

  • 回答 1

    Filter 技术是servlet 2.3 新增加的功能。servlet2.3是sun公司于2000年10月发布的,它的开发者包括许多个人和公司团体,充分体现了sun公司所倡导的代码开放性原则。在众多参与者的共同努力下,servlet2.3比以往功能都强大了许多,而且性能也有了大幅提高。...

  • 回答 1

    在使用spark-submit提交一个Spark应用之后,Driver程序会向集群申请一定的资源来启动东若干个Executors用来计算,当这些Executors启动后,它们会向Driver端的SchedulerBackend进行注册,告诉Driver端整个每一个Executor的资源情况。 那么在一个Spark Applicat...

没有解决我的问题,去提问