Worker是一个基于AKKA Actor 的Actor模型,和Master,Driver,进行通信的时候 都是通过在receiver方法中进行样例类的时间匹配,支持Worker同时实现了ActorLogReceive的trait,ActorLogReceive里面复写receive方法,对子类暴露出receiveWithLogging方法,worker只需要复写这个方法就可以了,这里面涉及到的设计模式就模板设计模式
Worker.scala
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf, //配置文件 val conf: SparkConf,
driverId, // DriverID
workDir, // Drover 本地工作目录,如果没有工作,目录将会自动创建
sparkHome, //sparkHome
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), //Driver 的描述信息
self, // this
akkaUrl) //这个应该是Worker 与 Client 通信的链接
将driver封装到hashMap中,进行管理,key是driverID
drivers(driverId) = driver
driver.start()
启动起来之后,driver中记录描述信息中的资源信息,比如几个core,比如memory大小,记录一下
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
}123456789101112131415161718
来找找 driver.start()
DriverRunner.scala
/**
DriverRunner是用来管理一个Driver进程,包括driver废掉的时候自动重启driver进程
这种方式仅用在standalone 集群模式
* Manages the execution of one driver, including automatically restarting the driver on failure.
* This is currently only used in standalone cluster deploy mode.
*/
/** Starts a thread to run and manage the driver. */
def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
try {
val driverDir = createWorkingDirectory() //创建工作目录
下载用户创建的jar ,也就是我们自编写的spark application程序,最后把它打jar包,
上传spark集群上面去执行,一般都是用maven 也有sbt,这种方式比较少用
在提交driver的时候,把用户自定义的jar上传到hdfs spark工作目录下面,在执行jar的时候
就直接就近从hdfs上面下载jar执行,避免了大量的网络传输
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// TODO: If we add ability to submit multiple jars they should also be added here
构建一个ProcessBuilder用来启动Driver进程
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
sparkHome.getAbsolutePath, substituteVariables)
launchDriver(builder, driverDir, driverDesc.supervise)
}
catch {
case e: Exception => finalException = Some(e)
}
val state =
if (killed) {
DriverState.KILLED
} else if (finalException.isDefined) {
DriverState.ERROR
} else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED
}
}
finalState = Some(state)
worker ! DriverStateChanged(driverId, state, finalException)
}
}.start()1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
}
createWorkingDirectory()方法
/**
创建一个工作目录 为这个driver
如果创建失败,就报错
* Creates the working directory for this driver.
* Will throw an exception if there are errors preparing the directory.
*/
private def createWorkingDirectory(): File = {
val driverDir = new File(workDir, driverId)
if (!driverDir.exists() && !driverDir.mkdirs()) {
throw new IOException("Failed to create directory " + driverDir)
}
driverDir12345678910111213
}
private def downloadUserJar(driverDir: File): String = {
val jarPath = new Path(driverDesc.jarUrl)
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val jarFileSystem = jarPath.getFileSystem(hadoopConf)
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
val jarFileName = jarPath.getName
val localJarFile = new File(driverDir, jarFileName)
val localJarFilename = localJarFile.getAbsolutePath
判断是否下载成功,没成功抛出异常
if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
logInfo(s"Copying user jar $jarPath to $destPath")
FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
}
if (!localJarFile.exists()) { // Verify copy succeeded
throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
}
返回下载到本地jar的路径
localJarFilename12345678910111213141516171819202122
Worker.scala
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
assert (workDir.isDirectory)
} catch {
case e: Exception =>
logError("Failed to create work directory " + workDir, e)
System.exit(1)
}1234567891011121314151617
}
DriverRunner.scala
var keepTrying = !killed
while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
synchronized {
if (killed) { return }
process = Some(command.start())
initialize(process.get)
}
val processStart = clock.getTimeMillis()
val exitCode = process.get.waitFor()
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
if (supervise && exitCode != 0 && !killed) {
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
keepTrying = supervise && exitCode != 0 && !killed
finalExitCode = Some(exitCode)
}
}1234567891011121314151617181920212223242526272829
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
新建一个 ExecutorRunner
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
akkaUrl,
conf,
appLocalDirs, ExecutorState.LOADING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
通知 master Executor状态改变
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None)
}
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
来看看ExecutorRunner.scala
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}12345678910111213
ExecutorRunner.scala
/**
* Download and run the executor described in our ApplicationDescription
*/
def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
看到了。Executor 状态是先 通知worker,然后由worker通知master,
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。需要注意的是:
1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。
2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
Spark之Worker工作原理
当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。
Worker启动与调度Driver
如果一个Driver需要启动的话那么首先Master就会给Driver发送一个启动消息LaunchDriver,其中包括Driver的id与DriverDesc也就是Driver的描述信息。在接收到信息之后首先会启动DriverRunner,后续的启动过程中的一些步骤都是DriverRunner去调度。
Worker在接收到工作目录的时候会给当前的Driver创建一个目录,这个目录用来存放与这个Driver一些相关的文件。
下载用户上传的jar到刚才的工作目录中。也就是我们计算的程序以及一些依赖的jar包。
构建ProcessBuilder,ProcessBuilder是Java中启动一个线程的类。在启动Driver的时候我们会将Driver的启动命令,内存大小,是否可监控等信息作为参数。最后通过waitFor方法去启动Driver。
当Driver进程终结时,DriverRunner会向Worker所属的线程发送DriverStateChanged事件。这时Worker也会向Master发送这个消息,Master再收到信息之后会将这个Driver从内存中移除。同时Worker也会释放Driver占用的资源同时改变自己是否可用的状态,也就是这个Worker又变得可用了。
Worker启动与调度Executor
这个过程与上面的Worker十分的相似。
整个前提就是Master通过自己的调度算法将Application所需要的资源分摊到各个Worker上面。每个Worker上面对应的计算存储资源也就构成了Executor。
Master向Worker发送LaunchExecutor。当然发送的信息包括,想要构建的Executor的id号、这个Executor所对应的ApplicationId的id号与描述信息、所分配的计算资源的核数,所分配的内存的大小。
Worker创建Executor所对应的工作目录。
创建ExecutorRunner,用来构建ProcessBuilder进而通过waitFor方法来启动真正的Executor。
当Executor启动之后,ExecutorRunner会向Worker发送消息然后Worker会将这个Executor反向注册到它对应的Driver上。
当Executor的状态发生改变,也会同⑤步骤一样告诉Master,如果执行失败那么会重新调度直到一定的次数。如果被判定为终结状态的话那么就将这个Executor所对应的资源释放也将这个Application判定为执行失败。
参考链接:
https://blog.csdn.net/qq_34993631/article/details/86667070?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~first_rank_v2~rank_v25-2-86667070.nonecase&utm_term=spark的worker作用
当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。
Worker
Worker是一个基于AKKA Actor 的Actor模型,和Master,Driver,进行通信的时候
都是通过在receiver方法中进行样例类的时间匹配,支持Worker同时实现了ActorLogReceive的trait,ActorLogReceive里面复写receive方法,对子类暴露出receiveWithLogging方法,worker只需要复写这个方法就可以了,这里面涉及到的设计模式就模板设计模式
Driver 开启过程
启动Driver方法,需要将要启动的Driver添加到相应的worker上面,
同时需要向Driver发送启动Driver的时间通知 LaunchDriver(driver.id, driver.desc) ,更改driver状态为RUNNING
}
来看一下Worker中的 LaunchDriver()方法,
来找找 driver.start()
}
}
}
上面DriverRunner中createWorkingDirectory()的创建,依托于Worker中createWorkDir()
}
- 当DriverRunner 把DriverDir 和 Jars包就近下载完后
通知worker, Driver状态已经改变,Worker收到之后,再将DriverState 改变的消息,通过Actor发送给Master
Executor 开启过程
首先当然是看
来看看Worker是如何处理launchExecutor()的
来看看ExecutorRunner.scala
ExecutorRunner.scala
}
}
然后当worker节点上的executor进程启动之后,会向Driver节点上相应的Driver进程反向注册,说明executor进程已经启动成功,等待执行task
当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。
来源于网络仅供参考
当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。
相关问题推荐
首先, Spark 非常好用。由于高级 API 剥离了对集群本身的关注,你可以专注于你所要做的计算本身, 只需在自己的笔记本电脑上就可以开发 Spark 应用。其次, Spark 很快,支持交互式使用和复杂算法。最后, Spark 是一个通用引擎,可用它来完成各种各样的运算...
现在企业中多数用的是相对稳定的Spark2.2.0版本。
Spark,是一种One Stack to rule them all的大数据计算框架,是一种基于内存计算的框架,是一种通用的大数据快速处理引擎。 这一站式的计算框架,包含了Spark RDD(这也是Spark Core用于离线批处理)、Spark SQL(交互式查询)、Spark Streaming(实时...
常用RDD算子(1)Action RDDforeach:遍历每个元素,无返回值,一般用在将结果存储到数据库中使用saveAsTextFile存储到hdfs,RDD每个partition存到hdfs的一个block块saveAsObjectFile:存储到hdfs,将每个partition的数据序列化后,以sequenceFile(序列化)...
1、RDD是一个分片的数据集合;2、RDD的函数针对每个分片进行计算;3、RDD之间是个依赖的集合;4、可选:key-value型RDD是根据哈希来分区的;5、可选:数据本地性优先计算。
在hadoop/bin目录下有yarn命令yarn application -kill
1.Spark SQLSpark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发...
Spark SQL 在 Spark1.x 中和传统 SQL 不完全一致。但是在 Spark2.x 版本中遵循的美国的ANSI的SQL2003完全标准sql 。oracle和mysql都有自己的语法规则,平时所使用的 SQL 语句都不是标准 SQL 。平时用的 mysql 和 oracle 以及 hive,都是部分遵循标准SQL 。...
#!/bin/bash #队列名 根据yarn的队列提交 realtime_queue=root #提交的任务名 my_job_name=OrderQZspark-shell --master yarn --deploy-mode client \--queue $realtime_queue \ #总的executors数 根据数据量与自己的集群资源来分配--num-executors...
Spark是一种快速、通用、可扩展的大数据分析引擎,于2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。项目是用Scala进行编写。Spark的结构:Spark生态系统已经发展成为一个包含多个子项目的集...
自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...
把数据从redis读出来放到kafka里呗,然后用spark-streaming去读kafka的数据,或者写个程序从redis把数据读出来用socket或文件的形式传给spark-streaming,spark-streaming支持很多种源的方式