提交spark 任务可以设置哪些参数?

2021-04-28 20:11发布

3条回答
freediandianer
2楼 · 2021-04-29 18:18
#!/bin/bash
#队列名 根据yarn的队列提交
realtime_queue=root
#提交的任务名
my_job_name="OrderQZ"spark-shell --master yarn --deploy-mode client \--queue $realtime_queue \
#总的executors数 根据数据量与自己的集群资源来分配--num-executors 35 \
#每个executor的核数--executor-cores 5 \
#每个executor的内存--executor-memory 19G \
#diver 端jvm日志配置--conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \--conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn.properties \
#序列化--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
#数据本地化;一般会默认3s,重试5次的去分配,一旦超时失败,将会选择一个比上一个本地级别差的级别再一次分配,如果发生了数据传输,那么task首先通过blockmanager获取数据,如果本地没有数据,则通过getRemote方法从数据所在节点的blockmanager获取数据并返回至task所在节点--conf spark.locality.wait=5 \
#失败重试次数--conf spark.task.maxFailures=8 \
# 是否开启在webui杀死进程--conf spark.ui.killEnabled=false \
#SparkContext 启动时记录有效 SparkConf信息--conf spark.logConf=true \
#driver的堆外内存 内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机),这样做的结果就是能够在一定程度上减少垃圾回收对应用程序造成的影响。使用未公开的Unsafe和NIO包下ByteBuffer来创建堆外内存--conf spark.yarn.driver.memoryOverhead=512 \--conf spark.yarn.executor.memoryOverhead=5480 #提交申请的最大尝试次数, 小于等于YARN配置中的全局最大尝试次数。--conf spark.yarn.maxAppAttempts=4 \
#定义AM故障跟踪的有效时间间隔。如果AM至少在定义的时间间隔内运行,则AM故障计数将被重置。如果未配置,此功能未启用。--conf spark.yarn.am.attemptFailuresValidityInterval=1h \--conf spark.yarn.executor.failuresValidityInterval=1h \
#动态资源分配--conf spark.shuffle.service.enabled=true \--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
#推测执行--conf spark.speculation=true \--conf spark.speculation.quantile=0.9 \
#shffule task数目--conf spark.sql.shuffle.partitions=178 \
#,当它设置为true时,Spark SQL会把每条查询的语句在运行时编译为java的二进制代码。这有什么作用呢?它可以提高大型查询的性能,但是如果进行小规模的查询的时候反而会变慢,就是说直接用查询反而比将它编译成为java的二进制代码快。所以在优化这个选项的时候要视情况而定。--conf spark.sql.codegen=true \
#默认值为false 它的作用是自动对内存中的列式存储进行压缩--conf spark.sql.inMemoryColumnarStorage.compressed=true \
# join实现主要有3种,即BroadcastHashJoinExec、ShuffledHashJoinExec和SortMergeJoinExec,优先级为
#1 如果canBroadcast,则BroadcastHashJoinExec;
#2 如果spark.sql.join.preferSortMergeJoin=false,则ShuffledHashJoinExec;
#3 否则为SortMergeJoinExec;--conf spark.sql.join.preferSortMergeJoin=true \
# Spark底层shuffle的传输方式是使用netty传输,netty在进行网络传输的过程会申请堆外内存(netty是零拷贝),所以使用了堆外内存。--conf spark.reducer.maxSizeInFlight=96M


ban_gank
3楼 · 2021-05-07 09:42

提交任务时的几个重要参数

参数说明
executor-cores每个executor使用的内核数,默认为1
num-executors启动executor的数量,默认为2
executor-memoryexecutor的内存大小,默认为1G
driver-coresdriver使用的内核数,默认为1
driver-memorydriver的内存大小,默认为1G
queue指定了放在哪个队列里执行
spark.default.parallelism该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能,Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适
spark.storage.memoryFraction该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
spark.shuffle.memoryFraction该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
total-executor-cores所有executor的总核数


征戰撩四汸
4楼 · 2021-12-07 18:55
属性名称默认值含义

spark.app.name

(none)

你的应用程序的名字。这将在UI和日志数据中出现

spark.driver.cores

1

driver程序运行须要的cpu内核数

spark.driver.maxResultSize

1g

每一个Spark action(如collect)全部分区的序列化结果的总大小限制。设置的值应该不小于1m,0表明没有限制。若是总大小超过这个限制,程序将会终止。大的限制值可能致使driver出现内存溢出错误(依赖于spark.driver.memory和JVM中对象的内存消耗)。

spark.driver.memory

512m

driver进程使用的内存数

spark.executor.memory

512m

每一个executor进程使用的内存数。和JVM内存串拥有相同的格式(如512m,2g)

spark.extraListeners

(none)

注册监听器,须要实现SparkListener

spark.local.dir

/tmp

Spark中暂存空间的使用目录。在Spark1.0以及更高的版本中,这个属性被SPARK_LOCAL_DIRS(Standalone, Mesos)和LOCAL_DIRS(YARN)环境变量覆盖。

spark.logConf

false

当SparkContext启动时,将有效的SparkConf记录为INFO。

spark.master

(none)

集群管理器链接的地方


运行环境

属性名称默认值含义

spark.driver.extraClassPath

(none)

附加到driver的classpath的额外的classpath实体。

spark.driver.extraJavaOptions

(none)

传递给driver的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性须要用--driver-class-path设置。

spark.driver.extraLibraryPath

(none)

指定启动driver的JVM时用到的库路径

spark.driver.userClassPathFirst

false

(实验性)当在driver中加载类时,是否用户添加的jar比Spark本身的jar优先级高。这个属性能够下降Spark依赖和用户依赖的冲突。它如今仍是一个实验性的特征。

spark.executor.extraClassPath

(none)

附加到executors的classpath的额外的classpath实体。这个设置存在的主要目的是Spark与旧版本的向后兼容问题。用户通常不用设置这个选项

spark.executor.extraJavaOptions

(none)

传递给executors的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性须要用SparkConf对象或者spark-submit脚本用到的spark-defaults.conf文件设置。堆内存能够经过spark.executor.memory设置

spark.executor.extraLibraryPath

(none)

指定启动executor的JVM时用到的库路径

spark.executor.logs.rolling.maxRetainedFiles

(none)

设置被系统保留的最近滚动日志文件的数量。更老的日志文件将被删除。默认没有开启。

spark.executor.logs.rolling.size.maxBytes

(none)

executor日志的最大滚动大小。默认状况下没有开启。值设置为字节

spark.executor.logs.rolling.strategy

(none)

设置executor日志的滚动(rolling)策略。默认状况下没有开启。能够配置为timesize。对于time,用spark.executor.logs.rolling.time.interval设置滚动间隔;对于size,用spark.executor.logs.rolling.size.maxBytes设置最大的滚动大小

spark.executor.logs.rolling.time.interval

daily

executor日志滚动的时间间隔。默认状况下没有开启。合法的值是dailyhourlyminutely以及任意的秒。

spark.files.userClassPathFirst

false

(实验性)当在Executors中加载类时,是否用户添加的jar比Spark本身的jar优先级高。这个属性能够下降Spark依赖和用户依赖的冲突。它如今仍是一个实验性的特征。

spark.python.worker.memory

512m

在聚合期间,每一个python worker进程使用的内存数。在聚合期间,若是内存超过了这个限制,它将会将数据塞进磁盘中

spark.python.profile

false

在Python worker中开启profiling。经过sc.show_profiles()展现分析结果。或者在driver退出前展现分析结果。能够经过sc.dump_profiles(path)将结果dump到磁盘中。若是一些分析结果已经手动展现,那么在driver退出前,它们再不会自动展现

spark.python.profile.dump

(none)

driver退出前保存分析结果的dump文件的目录。每一个RDD都会分别dump一个文件。能够经过ptats.Stats()加载这些文件。若是指定了这个属性,分析结果不会自动展现

spark.python.worker.reuse

true

是否重用python worker。若是是,它将使用固定数量的Python workers,而不须要为每一个任务fork()一个Python进程。若是有一个很是大的广播,这个设置将很是有用。由于,广播不须要为每一个任务从JVM到Python worker传递一次

spark.executorEnv.[EnvironmentVariableName]

(none)

经过EnvironmentVariableName添加指定的环境变量到executor进程。用户能够指定多个EnvironmentVariableName,设置多个环境变量

spark.mesos.executor.home

driver side SPARK_HOME

设置安装在Mesos的executor上的Spark的目录。默认状况下,executors将使用driver的Spark本地(home)目录,这个目录对它们不可见。注意,若是没有经过 spark.executor.uri指定Spark的二进制包,这个设置才起做用

spark.mesos.executor.memoryOverhead

executor memory * 0.07, 最小384m

这个值是spark.executor.memory的补充。它用来计算mesos任务的总内存。另外,有一个7%的硬编码设置。最后的值将选择spark.mesos.executor.memoryOverhead或者spark.executor.memory的7%两者之间的大者


Shuffle行为

属性名称默认值含义

spark.reducer.maxMbInFlight

48

从递归任务中同时获取的map输出数据的最大大小(mb)。由于每个输出都须要咱们建立一个缓存用来接收,这个设置表明每一个任务固定的内存上限,因此除非你有更大的内存,将其设置小一点

spark.shuffle.blockTransferService

netty

实现用来在executor直接传递shuffle和缓存块。有两种可用的实现:nettynio。基于netty的块传递在具备相同的效率状况下更简单

spark.shuffle.compress

true

是否压缩map操做的输出文件。通常状况下,这是一个好的选择。

spark.shuffle.consolidateFiles

false

若是设置为”true”,在shuffle期间,合并的中间文件将会被建立。建立更少的文件能够提供文件系统的shuffle的效 率。这些shuffle都伴随着大量递归任务。当用ext4和dfs文件系统时,推荐设置为”true”。在ext3中,由于文件系统的限制,这个选项可 能机器(大于8核)下降效率

spark.shuffle.file.buffer.kb

32

每一个shuffle文件输出流内存内缓存的大小,单位是kb。这个缓存减小了建立只中间shuffle文件中磁盘搜索和系统访问的数量

spark.shuffle.io.maxRetries

3

Netty only,自动重试次数

spark.shuffle.io.numConnectionsPerPeer

1

Netty only

spark.shuffle.io.preferDirectBufs

true

Netty only

spark.shuffle.io.retryWait

5

Netty only

spark.shuffle.manager

sort

它的实现用于shuffle数据。有两种可用的实现:sorthash。基于sort的shuffle有更高的内存使用率

spark.shuffle.memoryFraction

0.2

若是spark.shuffle.spill为 true,shuffle中聚合和合并组操做使用的java堆内存占总内存的比重。在任什么时候候,shuffles使用的全部内存内maps的集合大小都受 这个限制的约束。超过这个限制,spilling数据将会保存到磁盘上。若是spilling太过频繁,考虑增大这个值

spark.shuffle.sort.bypassMergeThreshold

200

(Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no map-side aggregation and there are at most this many reduce partitions

spark.shuffle.spill

true

若是设置为”true”,经过将多出的数据写入磁盘来限制内存数。经过spark.shuffle.memoryFraction来指定spilling的阈值

spark.shuffle.spill.compress

true

在shuffle时,是否将spilling的数据压缩。压缩算法经过spark.io.compression.codec指定。


Spark UI

属性名称默认值含义

spark.eventLog.compress

false

是否压缩事件日志。须要spark.eventLog.enabled为true

spark.eventLog.dir

file:///tmp/spark-events

Spark事件日志记录的基本目录。在这个基本目录下,Spark为每一个应用程序建立一个子目录。各个应用程序记录日志到直到的目录。用户可能想设置这为统一的地点,像HDFS同样,因此历史文件能够经过历史服务器读取

spark.eventLog.enabled

false

是否记录Spark的事件日志。这在应用程序完成后,从新构造web UI是有用的

spark.ui.killEnabled

true

运行在web UI中杀死stage和相应的job

spark.ui.port

4040

你的应用程序dashboard的端口。显示内存和工做量数据

spark.ui.retainedJobs

1000

在垃圾回收以前,Spark UI和状态API记住的job数

spark.ui.retainedStages

1000

在垃圾回收以前,Spark UI和状态API记住的stage数


压缩和序列化

属性名称默认值含义

spark.broadcast.compress

true

在发送广播变量以前是否压缩它

spark.closure.serializer

org.apache.spark.serializer.JavaSerializer

闭包用到的序列化类。目前只支持java序列化器

spark.io.compression.codec

snappy

压缩诸如RDD分区、广播变量、shuffle输出等内部数据的编码解码器。默认状况下,Spark提供了三种选择:lz四、lzf和snappy,你也能够用完整的类名来制定。

spark.io.compression.lz4.block.size

32768

LZ4压缩中用到的块大小。下降这个块的大小也会下降shuffle内存使用率

spark.io.compression.snappy.block.size

32768

Snappy压缩中用到的块大小。下降这个块的大小也会下降shuffle内存使用率

spark.kryo.classesToRegister

(none)

若是你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类

spark.kryo.referenceTracking

true

当用Kryo序列化时,跟踪是否引用同一对象。若是你的对象图有环,这是必须的设置。若是他们包含相同对象的多个副本,这个设置对效率是有用的。若是你知道不在这两个场景,那么能够禁用它以提升效率

spark.kryo.registrationRequired

false

是否须要注册为Kyro可用。若是设置为true,而后若是一个没有注册的类序列化,Kyro会抛出异常。若是设置为false,Kryo将会同时写每一个对象和其非注册类名。写类名可能形成显著地性能瓶颈。

spark.kryo.registrator

(none)

若是你用Kryo序列化,设置这个类去注册你的自定义类。若是你须要用自定义的方式注册你的类,那么这个属性是有用的。不然spark.kryo.classesToRegister会更简单。它应该设置一个继承自KryoRegistrator的类

spark.kryoserializer.buffer.max.mb

64

Kryo序列化缓存容许的最大值。这个值必须大于你尝试序列化的对象

spark.kryoserializer.buffer.mb

0.064

Kyro序列化缓存的大小。这样worker上的每一个核都有一个缓存。若是有须要,缓存会涨到spark.kryoserializer.buffer.max.mb设置的值那么大。

spark.rdd.compress

true

是否压缩序列化的RDD分区。在花费一些额外的CPU时间的同时节省大量的空间

spark.serializer

org.apache.spark.serializer.JavaSerializer

序列化对象使用的类。默认的Java序列化类能够序列化任何可序列化的java对象可是它很慢。全部咱们建议用org.apache.spark.serializer.KryoSerializer

spark.serializer.objectStreamReset

100

当用org.apache.spark.serializer.JavaSerializer序列化时,序列化器经过缓存对象防止写多余的数据,然而这会形成这些对象的垃圾回收中止。经过请求’reset’,你从序列化器中flush这些信息并容许收集老的数据。为了关闭这个周期性的reset,你能够将值设为-1。默认状况下,每一百个对象reset一次


运行时行为

属性名称默认值含义

spark.broadcast.blockSize

4096

TorrentBroadcastFactory传输的块大小,太大值会下降并发,过小的值会出现性能瓶颈

spark.broadcast.factory

org.apache.spark.broadcast.TorrentBroadcastFactory

broadcast实现类

spark.cleaner.ttl

(infinite)

spark记录任何元数据(stages生成、task生成等)的持续时间。按期清理能够确保将超期的元数据丢弃,这在运行长时间任务是颇有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理

spark.default.parallelism

本地模式:机器核数;Mesos:8;其余:max(executor的core,2)

若是用户不设置,系统使用集群中运行shuffle操做的默认任务数(groupByKey、 reduceByKey等)

spark.executor.heartbeatInterval

10000

executor 向 the driver 汇报心跳的时间间隔,单位毫秒

spark.files.fetchTimeout

60

driver 程序获取经过SparkContext.addFile()添加的文件时的超时时间,单位秒

spark.files.useFetchCache

true

获取文件时是否使用本地缓存

spark.files.overwrite

false

调用SparkContext.addFile()时候是否覆盖文件

spark.hadoop.cloneConf

false

每一个task是否克隆一份hadoop的配置文件

spark.hadoop.validateOutputSpecs

true

是否校验输出

spark.storage.memoryFraction

0.6

Spark内存缓存的堆大小占用总内存比例,该值不能大于老年代内存大小,默认值为0.6,可是,若是你手动设置老年代大小,你能够增长该值

spark.storage.memoryMapThreshold

2097152

内存块大小

spark.storage.unrollFraction

0.2

Fraction of spark.storage.memoryFraction to use for unrolling blocks in memory.

spark.tachyonStore.baseDir

System.getProperty(“java.io.tmpdir”)

Tachyon File System临时目录

spark.tachyonStore.url

tachyon://localhost:19998

Tachyon File System URL


网络

属性名称默认值含义

spark.driver.host

(local hostname)

driver监听的主机名或者IP地址。这用于和executors以及独立的master通讯

spark.driver.port

(random)

driver监听的接口。这用于和executors以及独立的master通讯

spark.fileserver.port

(random)

driver的文件服务器监听的端口

spark.broadcast.port

(random)

driver的HTTP广播服务器监听的端口

spark.replClassServer.port

(random)

driver的HTTP类服务器监听的端口

spark.blockManager.port

(random)

块管理器监听的端口。这些同时存在于driver和executors

spark.executor.port

(random)

executor监听的端口。用于与driver通讯

spark.port.maxRetries

16

当绑定到一个端口,在放弃前重试的最大次数

spark.akka.frameSize

10

在”control plane”通讯中容许的最大消息大小。若是你的任务须要发送大的结果到driver中,调大这个值

spark.akka.threads

4

通讯的actor线程数。当driver有不少CPU核时,调大它是有用的

spark.akka.timeout

100

Spark节点之间的通讯超时。单位是秒

spark.akka.heartbeat.pauses

6000

This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). Acceptable heart beat pause in seconds for akka. This can be used to control sensitivity to gc pauses. Tune this in combination of spark.akka.heartbeat.interval and spark.akka.failure-detector.threshold if you need to.

spark.akka.failure-detector.threshold

300.0

This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). This maps to akka’s akka.remote.transport-failure-detector.threshold. Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.heartbeat.interval if you need to.

spark.akka.heartbeat.interval

1000

This is set to a larger value to disable failure detector that comes inbuilt akka. It can be enabled again, if you plan to use this feature (Not recommended). A larger interval value in seconds reduces network overhead and a smaller value ( ~ 1 s) might be more informative for akka’s failure detector. Tune this in combination of spark.akka.heartbeat.pauses and spark.akka.failure-detector.threshold if you need to. Only positive use case for using failure detector can be, a sensistive failure detector can help evict rogue executors really quick. However this is usually not the case as gc pauses and network lags are expected in a real Spark cluster. Apart from that enabling this leads to a lot of exchanges of heart beats between nodes leading to flooding the network with those.


调度相关属性

属性名称默认值含义

spark.task.cpus

1

为每一个任务分配的内核数

spark.task.maxFailures

4

Task的最大重试次数

spark.scheduler.mode

FIFO

Spark的任务调度模式,还有一种Fair模式

spark.cores.max


当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内核总数(不是指每 台机器,而是整个集群)。若是不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值,而Mesos将使 用集群中可用的内核

spark.mesos.coarse

False

若是设置为true,在Mesos集群中运行时使用粗粒度共享模式

spark.speculation

False

如下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,若是设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其余节点中从新启动,并将最早完成的Task的计算结果最为最终结果

spark.speculation.interval

100

Spark多长时间进行检查task运行状态用以推测,以毫秒为单位

spark.speculation.quantile


推测启动前,Stage必需要完成总Task的百分比

spark.speculation.multiplier

1.5

比已完成Task的运行速度中位数慢多少倍才启用推测

spark.locality.wait

3000

如下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,若是超出就启动下一本地优先级别 的task。该设置一样能够应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),固然,也能够经过spark.locality.wait.node等参数设置不一样优先级别的本地性

spark.locality.wait.process

spark.locality.wait

本地进程级别的本地等待时间

spark.locality.wait.node

spark.locality.wait

本地节点级别的本地等待时间

spark.locality.wait.rack

spark.locality.wait

本地机架级别的本地等待时间

spark.scheduler.revive.interval

1000

复活从新获取资源的Task的最长时间间隔(毫秒),发生在Task由于本地资源不足而将资源分配给其余Task运行后进入等待时间,若是这个等待时间内从新获取足够的资源就继续计算


Dynamic Allocation

属性名称默认值含义

spark.dynamicAllocation.enabled

false

是否开启动态资源搜集

spark.dynamicAllocation.executorIdleTimeout

600


spark.dynamicAllocation.initialExecutors

spark.dynamicAllocation.minExecutors


spark.dynamicAllocation.maxExecutors

Integer.MAX_VALUE


spark.dynamicAllocation.minExecutors

0


spark.dynamicAllocation.schedulerBacklogTimeout

5


spark.dynamicAllocation.sustainedSchedulerBacklogTimeout

schedulerBacklogTimeout



安全

属性名称默认值含义

spark.authenticate

false

是否Spark验证其内部链接。若是不是运行在YARN上,请看spark.authenticate.secret

spark.authenticate.secret

None

设置Spark两个组件之间的密匙验证。若是不是运行在YARN上,可是须要验证,这个选项必须设置

spark.core.connection.auth.wait.timeout

30

链接时等待验证的实际。单位为秒

spark.core.connection.ack.wait.timeout

60

链接等待回答的时间。单位为秒。为了不不但愿的超时,你能够设置更大的值

spark.ui.filters

None

应用到Spark web UI的用于过滤类名的逗号分隔的列表。过滤器必须是标准的javax servlet Filter。经过设置java系统属性也能够指定每一个过滤器的参数。spark..params='param1=value1,param2=value2'。例如-Dspark.ui.filters=com.test.filter1-Dspark.com.test.filter1.params='param1=foo,param2=testing'

spark.acls.enable

false

是否开启Spark acls。若是开启了,它检查用户是否有权限去查看或修改job。UI利用使用过滤器验证和设置用户

spark.ui.view.acls

empty

逗号分隔的用户列表,列表中的用户有查看Spark web UI的权限。默认状况下,只有启动Spark job的用户有查看权限

spark.modify.acls

empty

逗号分隔的用户列表,列表中的用户有修改Spark job的权限。默认状况下,只有启动Spark job的用户有修改权限

spark.admin.acls

empty

逗号分隔的用户或者管理员列表,列表中的用户或管理员有查看和修改全部Spark job的权限。若是你运行在一个共享集群,有一组管理员或开发者帮助debug,这个选项有用


加密

属性名称默认值含义

spark.ssl.enabled

false

是否开启ssl

spark.ssl.enabledAlgorithms

Empty

JVM支持的加密算法列表,逗号分隔

spark.ssl.keyPassword

None


spark.ssl.keyStore

None


spark.ssl.keyStorePassword

None


spark.ssl.protocol

None


spark.ssl.trustStore

None


spark.ssl.trustStorePassword

None



Spark Streaming

属性名称默认值含义

spark.streaming.blockInterval

200

在这个时间间隔(ms)内,经过Spark Streaming receivers接收的数据在保存到Spark以前,chunk为数据块。推荐的最小值为50ms

spark.streaming.receiver.maxRate

infinite

每秒钟每一个receiver将接收的数据的最大记录数。有效的状况下,每一个流将消耗至少这个数目的记录。设置这个配置为0或者-1将会不做限制

spark.streaming.receiver.writeAheadLogs.enable

false

Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures

spark.streaming.unpersist

true

强制经过Spark Streaming生成并持久化的RDD自动从Spark内存中非持久化。经过Spark Streaming接收的原始输入数据也将清除。设置这个属性为false容许流应用程序访问原始数据和持久化RDD,由于它们没有被自动清除。可是它会 形成更高的内存花费


集群管理 

Spark On YARN

属性名称默认值含义

spark.yarn.am.memory

512m

client 模式时,am的内存大小;cluster模式时,使用spark.driver.memory变量

spark.driver.cores

1

claster模式时,driver使用的cpu核数,这时候driver运行在am中,其实也就是am和核数;client模式时,使用spark.yarn.am.cores变量

spark.yarn.am.cores

1

client 模式时,am的cpu核数

spark.yarn.am.waitTime

100000

启动时等待时间

spark.yarn.submit.file.replication

3

应用程序上传到HDFS的文件的副本数

spark.yarn.preserve.staging.files

False

若为true,在job结束后,将stage相关的文件保留而不是删除

spark.yarn.scheduler.heartbeat.interval-ms

5000

Spark AppMaster发送心跳信息给YARN RM的时间间隔

spark.yarn.max.executor.failures

2倍于executor数,最小值3

致使应用程序宣告失败的最大executor失败次数

spark.yarn.applicationMaster.waitTries

10

RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败

spark.yarn.historyServer.address


Spark history server的地址(不要加 http://)。这个地址会在Spark应用程序完成后提交给YARN RM,而后RM将信息从RM UI写到history server UI上。

spark.yarn.dist.archives

(none)


spark.yarn.dist.files

(none)


spark.executor.instances

2

executor实例个数

spark.yarn.executor.memoryOverhead

executorMemory * 0.07, with minimum of 384

executor的堆内存大小设置

spark.yarn.driver.memoryOverhead

driverMemory * 0.07, with minimum of 384

driver的堆内存大小设置

spark.yarn.am.memoryOverhead

AM memory * 0.07, with minimum of 384

am的堆内存大小设置,在client模式时设置

spark.yarn.queue

default

使用yarn的队列

spark.yarn.jar

(none)


spark.yarn.access.namenodes

(none)


spark.yarn.appMasterEnv.[EnvironmentVariableName]

(none)

设置am的环境变量

spark.yarn.containerLauncherMaxThreads

25

am启动executor的最大线程数

spark.yarn.am.extraJavaOptions

(none)


spark.yarn.maxAppAttempts

yarn.resourcemanager.am.max-attempts in YARN

am重试次数

Spark on Mesos

使用较少,参考Running Spark on Mesos。

Spark Standalone Mode

参考Spark Standalone Mode。

Spark History Server

当你运行Spark Standalone Mode或者Spark on Mesos模式时,你能够经过Spark History Server来查看job运行状况。

Spark History Server的环境变量:

属性名称含义

SPARK_DAEMON_MEMORY

Memory to allocate to the history server (default: 512m).

SPARK_DAEMON_JAVA_OPTS

JVM options for the history server (default: none).

SPARK_PUBLIC_DNS


SPARK_HISTORY_OPTS

配置 spark.history.* 属性

Spark History Server的属性:

属性名称默认含义

spark.history.provider

org.apache.spark.deploy.history.FsHistoryProvide

应用历史后端实现的类名。 目前只有一个实现, 由Spark提供, 它查看存储在文件系统里面的应用日志

spark.history.fs.logDirectory

file:/tmp/spark-events


spark.history.updateInterval

10

以秒为单位,多长时间Spark history server显示的信息进行更新。每次更新都会检查持久层事件日志的任何变化。

spark.history.retainedApplications

50

在Spark history server上显示的最大应用程序数量,若是超过这个值,旧的应用程序信息将被删除。

spark.history.ui.port

18080

官方版本中,Spark history server的默认访问端口

spark.history.kerberos.enabled

false

是否使用kerberos方式登陆访问history server,对于持久层位于安全集群的HDFS上是有用的。若是设置为true,就要配置下面的两个属性。

spark.history.kerberos.principal

用于Spark history server的kerberos主体名称

spark.history.kerberos.keytab

用于Spark history server的kerberos keytab文件位置

spark.history.ui.acls.enable

false

受权用户查看应用程序信息的时候是否检查acl。若是启用,只有应用程序全部者和spark.ui.view.acls指定的用户能够查看应用程序信息;若是禁用,不作任何检查。


环境变量

经过环境变量配置肯定的Spark设置。环境变量从Spark安装目录下的conf/spark-env.sh脚本读取(或者windows的conf/spark-env.cmd)。在独立的或者Mesos模式下,这个文件能够给机器肯定的信息,如主机名。当运行本地应用程序或者提交脚本时,它也起做用。

注意,当Spark安装时,conf/spark-env.sh默认是不存在的。你能够复制conf/spark-env.sh.template建立它。

能够在spark-env.sh中设置以下变量:

环境变量含义

JAVA_HOME

Java安装的路径

PYSPARK_PYTHON

PySpark用到的Python二进制执行文件路径

SPARK_LOCAL_IP

机器绑定的IP地址

SPARK_PUBLIC_DNS

你Spark应用程序通知给其余机器的主机名


相关问题推荐

  • spark的优点有哪些?2021-07-05 16:55
    回答 20

    首先, Spark 非常好用。由于高级 API 剥离了对集群本身的关注,你可以专注于你所要做的计算本身, 只需在自己的笔记本电脑上就可以开发 Spark 应用。其次, Spark 很快,支持交互式使用和复杂算法。最后, Spark 是一个通用引擎,可用它来完成各种各样的运算...

  • 回答 5

    现在企业中多数用的是相对稳定的Spark2.2.0版本。

  • 回答 4

        Spark,是一种One Stack to rule them all的大数据计算框架,是一种基于内存计算的框架,是一种通用的大数据快速处理引擎。    这一站式的计算框架,包含了Spark RDD(这也是Spark Core用于离线批处理)、Spark SQL(交互式查询)、Spark Streaming(实时...

  • 回答 10

    常用RDD算子(1)Action RDDforeach:遍历每个元素,无返回值,一般用在将结果存储到数据库中使用saveAsTextFile存储到hdfs,RDD每个partition存到hdfs的一个block块saveAsObjectFile:存储到hdfs,将每个partition的数据序列化后,以sequenceFile(序列化)...

  • 回答 6

    主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。需要注意的是:1)worker会不会汇报当前信息给master,worker心...

  • RDD五大属性2020-07-15 13:45
    回答 3

    1、RDD是一个分片的数据集合;2、RDD的函数针对每个分片进行计算;3、RDD之间是个依赖的集合;4、可选:key-value型RDD是根据哈希来分区的;5、可选:数据本地性优先计算。

  • 回答 3

    在hadoop/bin目录下有yarn命令yarn application -kill 

  • 回答 3
    已采纳

    1.Spark SQLSpark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发...

  • 回答 4

    Spark SQL 在 Spark1.x 中和传统 SQL 不完全一致。但是在 Spark2.x 版本中遵循的美国的ANSI的SQL2003完全标准sql 。oracle和mysql都有自己的语法规则,平时所使用的 SQL 语句都不是标准 SQL 。平时用的 mysql 和 oracle 以及 hive,都是部分遵循标准SQL 。...

  • 回答 2

    Spark是一种快速、通用、可扩展的大数据分析引擎,于2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。项目是用Scala进行编写。Spark的结构:Spark生态系统已经发展成为一个包含多个子项目的集...

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 1

    把数据从redis读出来放到kafka里呗,然后用spark-streaming去读kafka的数据,或者写个程序从redis把数据读出来用socket或文件的形式传给spark-streaming,spark-streaming支持很多种源的方式

没有解决我的问题,去提问