Spark Core】如何自定义累加器?

2020-08-25 09:01发布

1条回答
十一郎
2楼 · 2020-08-25 09:10

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。官方同时给出了一个实现的示例:CollectionAccumulator类,这个类允许以集合的形式收集spark应用执行过程中的一些信息。例如,我们可以用这个类收集Spark处理数据时的一些细节,当然,由于累加器的值最终要汇聚到driver端,为了避免 driver端的outofmemory问题,需要对收集的信息的规模要加以控制,不宜过大。
实现自定义类型累加器需要继承AccumulatorV2并至少覆写下例中出现的方法。

相关问题推荐

  • 回答 1

    /** *CreatedbyNamhwikon2016/12/27. */ classMapAccumulatorextendsAccumulatorV2[(String,String),mutable.Map[String,String]]{ privatevalmapAccumulator=mutable.Map[String,String]() defadd(keyAndValue:((St...

  • 回答 1

    我们必须把它放在 foreach() 这样的行动操作中。

  • 回答 1

    这俩是一个东西,combineByKey是Spark中一个比较核心的高级函数, groupByKey,reduceByKey的底层都是使用combineByKey实现的,1.6.0版的函数名更新为combineByKeyWithClassTag。

  • 回答 1

    Filter 技术是servlet 2.3 新增加的功能。servlet2.3是sun公司于2000年10月发布的,它的开发者包括许多个人和公司团体,充分体现了sun公司所倡导的代码开放性原则。在众多参与者的共同努力下,servlet2.3比以往功能都强大了许多,而且性能也有了大幅提高。...

  • 回答 1

    在使用spark-submit提交一个Spark应用之后,Driver程序会向集群申请一定的资源来启动东若干个Executors用来计算,当这些Executors启动后,它们会向Driver端的SchedulerBackend进行注册,告诉Driver端整个每一个Executor的资源情况。 那么在一个Spark Applicat...

没有解决我的问题,去提问