Spark】如何处理在spark开发过程中出现的数据倾斜

2020-05-14 09:11发布

2条回答
那些年很冒险的梦。
2楼 · 2020-05-14 09:31

1、隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。

2、对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。

3、使用reduceByKey 代替 groupByKey

4、使用map join。


若梦
3楼 · 2020-05-14 09:58

1. 避免shuffle 过程:为了避免数据倾斜,我们可以考虑避免shuffle 过程,如果避免了shuffle 过程,那么从根本上就消除了发生数据倾斜问题的可能

      2.过滤导致倾斜的key:如果在Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key 进行过滤,滤除可能导致数据倾斜的key 对应的数据,这样,在Spark 作业中就不会发生数据倾斜了。

      3.提高shuffle 操作中的reduce 并行度 当方案一和方案二对于数据倾斜的处理没有很好的效果时, 可以考虑提高shuffle 过程中的reduce 端并行度, reduce 端并行度的提高就增加了reduce 端task的数量,那么每个task 分配到的数据量就会相应减少,由此缓解数据倾斜问题。



相关问题推荐

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 9

    1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...

  • 回答 2

    搭建高可用模式用的协同处理框架。

  • 回答 1

    多数是版本不对、、、、、、引用错误,,,等

  • 回答 1

    目前,还是选择saprkflink还有成长的空间

  • 回答 2

            org.springframework.boot         spring-boot-starter-parent         1.3.2.RELEASE                             2.10.4         1.6.2                       ...

  • 回答 1

    一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...

  • 回答 4

    该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...

  • 回答 2

    Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...

  • 回答 1

    json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....

  • 什么叫闭包?Spark 2020-05-25 22:30
    回答 2

    python中    方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer():     name = out     print(name)     def inner():         name = inner         print(name)     return inner   s...

  • 回答 2

    一:Spark集群部署二:Job提交解密三:Job生成和接受四:Task的运行五:再论shuffle1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , Co...

  • 回答 3

    1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd

  • 回答 3

    1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...

  • 回答 1

    Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...

  • 回答 1

    一、spark普通shuffle的基本原理      1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理      2、这样可以有两个shufflemaptask可以同时执行,在每一...

没有解决我的问题,去提问