spark中worker 的主要工作是什么?

2020-08-18 09:23发布

6条回答
那些年很冒险的梦。
2楼 · 2020-08-18 09:45

主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。需要注意的是:

1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,只有出现故障的时候才会发送资源。

2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。


我是大脸猫
3楼 · 2020-08-18 09:47

Spark之Worker工作原理

当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。

Worker启动与调度Driver

  • 如果一个Driver需要启动的话那么首先Master就会给Driver发送一个启动消息LaunchDriver,其中包括Driver的id与DriverDesc也就是Driver的描述信息。在接收到信息之后首先会启动DriverRunner,后续的启动过程中的一些步骤都是DriverRunner去调度。

  • Worker在接收到工作目录的时候会给当前的Driver创建一个目录,这个目录用来存放与这个Driver一些相关的文件。

  • 下载用户上传的jar到刚才的工作目录中。也就是我们计算的程序以及一些依赖的jar包。

  • 构建ProcessBuilder,ProcessBuilder是Java中启动一个线程的类。在启动Driver的时候我们会将Driver的启动命令,内存大小,是否可监控等信息作为参数。最后通过waitFor方法去启动Driver。

  • 当Driver进程终结时,DriverRunner会向Worker所属的线程发送DriverStateChanged事件。这时Worker也会向Master发送这个消息,Master再收到信息之后会将这个Driver从内存中移除。同时Worker也会释放Driver占用的资源同时改变自己是否可用的状态,也就是这个Worker又变得可用了。

 

Worker启动与调度Executor

这个过程与上面的Worker十分的相似。

  • 整个前提就是Master通过自己的调度算法将Application所需要的资源分摊到各个Worker上面。每个Worker上面对应的计算存储资源也就构成了Executor。

  • Master向Worker发送LaunchExecutor。当然发送的信息包括,想要构建的Executor的id号、这个Executor所对应的ApplicationId的id号与描述信息、所分配的计算资源的核数,所分配的内存的大小。

  • Worker创建Executor所对应的工作目录。

  • 创建ExecutorRunner,用来构建ProcessBuilder进而通过waitFor方法来启动真正的Executor。

  • 当Executor启动之后,ExecutorRunner会向Worker发送消息然后Worker会将这个Executor反向注册到它对应的Driver上。

  • 当Executor的状态发生改变,也会同⑤步骤一样告诉Master,如果执行失败那么会重新调度直到一定的次数。如果被判定为终结状态的话那么就将这个Executor所对应的资源释放也将这个Application判定为执行失败。

参考链接:

https://blog.csdn.net/qq_34993631/article/details/86667070?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~first_rank_v2~rank_v25-2-86667070.nonecase&utm_term=spark的worker作用

Sophia
4楼 · 2020-08-18 09:51

当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。

我的网名不再改
5楼 · 2020-08-21 13:23

Worker


Worker是一个基于AKKA Actor 的Actor模型,和Master,Driver,进行通信的时候
都是通过在receiver方法中进行样例类的时间匹配,支持Worker同时实现了ActorLogReceive的trait,ActorLogReceive里面复写receive方法,对子类暴露出receiveWithLogging方法,worker只需要复写这个方法就可以了,这里面涉及到的设计模式就模板设计模式

Driver 开启过程

  • 启动Driver方法,需要将要启动的Driver添加到相应的worker上面,
    同时需要向Driver发送启动Driver的时间通知 LaunchDriver(driver.id, driver.desc) ,更改driver状态为RUNNING


Master.scala
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING1234567

}

  • 来看一下Worker中的 LaunchDriver()方法,


  Worker.scala
case LaunchDriver(driverId, driverDesc) => {
  logInfo(s"Asked to launch driver $driverId")
  val driver = new DriverRunner(
    conf,   //配置文件  val conf: SparkConf,
    driverId, // DriverID 
    workDir,  // Drover 本地工作目录,如果没有工作,目录将会自动创建
    sparkHome,   //sparkHome
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),  //Driver 的描述信息
    self,     // this 
    akkaUrl)  //这个应该是Worker 与 Client 通信的链接
    将driver封装到hashMap中,进行管理,key是driverID
  drivers(driverId) = driver
  driver.start()
   启动起来之后,driver中记录描述信息中的资源信息,比如几个core,比如memory大小,记录一下
  coresUsed += driverDesc.cores
  memoryUsed += driverDesc.mem
}123456789101112131415161718

来找找 driver.start()


DriverRunner.scala
/**
    DriverRunner是用来管理一个Driver进程,包括driver废掉的时候自动重启driver进程
    这种方式仅用在standalone 集群模式
 * Manages the execution of one driver, including automatically restarting the driver on failure.
 * This is currently only used in standalone cluster deploy mode.
 */
/** Starts a thread to run and manage the driver. */
def start() = {
new Thread("DriverRunner for " + driverId) {
  override def run() {
    try {
      val driverDir = createWorkingDirectory() //创建工作目录
        下载用户创建的jar ,也就是我们自编写的spark   application程序,最后把它打jar包,
        上传spark集群上面去执行,一般都是用maven  也有sbt,这种方式比较少用
         在提交driver的时候,把用户自定义的jar上传到hdfs  spark工作目录下面,在执行jar的时候
        就直接就近从hdfs上面下载jar执行,避免了大量的网络传输
      val localJarFilename = downloadUserJar(driverDir) 

      def substituteVariables(argument: String): String = argument match {
        case "{{WORKER_URL}}" => workerUrl
        case "{{USER_JAR}}" => localJarFilename
        case other => other
      }

      // TODO: If we add ability to submit multiple jars they should also be added here

        构建一个ProcessBuilder用来启动Driver进程
      val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem,
        sparkHome.getAbsolutePath, substituteVariables)
      launchDriver(builder, driverDir, driverDesc.supervise)
    }
    catch {
      case e: Exception => finalException = Some(e)
    }

    val state =
      if (killed) {
        DriverState.KILLED
      } else if (finalException.isDefined) {
        DriverState.ERROR
      } else {
        finalExitCode match {
          case Some(0) => DriverState.FINISHED
          case _ => DriverState.FAILED
        }
      }

    finalState = Some(state)

    worker ! DriverStateChanged(driverId, state, finalException)
  }
}.start()1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253

}

    createWorkingDirectory()方法
   /**
     创建一个工作目录 为这个driver
     如果创建失败,就报错
   * Creates the working directory for this driver.
   * Will throw an exception if there are errors preparing the directory.
   */
    private def createWorkingDirectory(): File = {
    val driverDir = new File(workDir, driverId)
    if (!driverDir.exists() && !driverDir.mkdirs()) {
      throw new IOException("Failed to create directory " + driverDir)
    }
    driverDir12345678910111213

}

private def downloadUserJar(driverDir: File): String = {

    val jarPath = new Path(driverDesc.jarUrl)

    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
    val jarFileSystem = jarPath.getFileSystem(hadoopConf)

    val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
    val jarFileName = jarPath.getName
    val localJarFile = new File(driverDir, jarFileName)
    val localJarFilename = localJarFile.getAbsolutePath
    判断是否下载成功,没成功抛出异常
    if (!localJarFile.exists()) { // May already exist if running multiple workers on one node
      logInfo(s"Copying user jar $jarPath to $destPath")
      FileUtil.copy(jarFileSystem, jarPath, destPath, false, hadoopConf)
    }

    if (!localJarFile.exists()) { // Verify copy succeeded
      throw new Exception(s"Did not see expected jar $jarFileName in $driverDir")
    }
    返回下载到本地jar的路径
    localJarFilename12345678910111213141516171819202122

}

上面DriverRunner中createWorkingDirectory()的创建,依托于Worker中createWorkDir()

Worker.scala
def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
  // This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
  // So attempting to create and then check if directory was created or not.
  workDir.mkdirs()
  if ( !workDir.exists() || !workDir.isDirectory) {
    logError("Failed to create work directory " + workDir)
    System.exit(1)
  }
  assert (workDir.isDirectory)
} catch {
  case e: Exception =>
    logError("Failed to create work directory " + workDir, e)
    System.exit(1)
}1234567891011121314151617

}

DriverRunner.scala 

var keepTrying = !killed

while (keepTrying) {
  logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

  synchronized {
    if (killed) { return }
    process = Some(command.start())
    initialize(process.get)
  }

  val processStart = clock.getTimeMillis()
  val exitCode = process.get.waitFor()
  if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
    waitSeconds = 1
  }

  if (supervise && exitCode != 0 && !killed) {
    logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
    sleeper.sleep(waitSeconds)
    waitSeconds = waitSeconds * 2 // exponential back-off
  }

  keepTrying = supervise && exitCode != 0 && !killed
  finalExitCode = Some(exitCode)
}
 }1234567891011121314151617181920212223242526272829

- 当DriverRunner 把DriverDir 和 Jars包就近下载完后

 worker ! DriverStateChanged(driverId, state, finalException)1

通知worker, Driver状态已经改变,Worker收到之后,再将DriverState 改变的消息,通过Actor发送给Master

    Worker.scala
    case DriverStateChanged(driverId, state, exception) => {
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
      将DriverState 改变的消息,通过Actor发送给Master
      master ! DriverStateChanged(driverId, state, exception)
      我们可以看出来drivers是正在开启的Driver的集合,完成开启,将会被移出dirvers
      val driver = drivers.remove(driverId).get
      finishedDrivers是存储已经完成开启动作的driver,将上一个刚完成的dirver放到这个集合    
      finishedDrivers(driverId) = driver
      将刚才开启driver所占用的内存 和  内核  释放
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }123456789101112131415161718192021222324

Executor 开启过程

首先当然是看

Master.scala 中的launchExecutor()方法
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec) //根据传入的worker,为该worker传入Executor
    worker.actor ! LaunchExecutor(masterUrl, //利用actor通信,让传入的的worker 执行 LaunchExecutor()
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded( //同时Master把executor添加到driver 中的executor集合中
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }123456789

来看看Worker是如何处理launchExecutor()的

 case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

      // Create the executor's working directory
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }

      // Create local dirs for the executor. These are passed to the executor via the
      // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
      // application finishes.
      val appLocalDirs = appDirectories.get(appId).getOrElse {
        Utils.getOrCreateLocalRootDirs(conf).map { dir =>
          Utils.createDirectory(dir).getAbsolutePath()
        }.toSeq
      }
      appDirectories(appId) = appLocalDirs

      新建一个 ExecutorRunner
      val manager = new ExecutorRunner(
        appId,
        execId,
        appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
        cores_,
        memory_,
        self,
        workerId,
        host,
        webUi.boundPort,
        publicAddress,
        sparkHome,
        executorDir,
        akkaUrl,
        conf,
        appLocalDirs, ExecutorState.LOADING)
      executors(appId + "/" + execId) = manager
      manager.start()
      coresUsed += cores_
      memoryUsed += memory_
      master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
    } catch {
      case e: Exception => {
        logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
        if (executors.contains(appId + "/" + execId)) {
          executors(appId + "/" + execId).kill()
          executors -= appId + "/" + execId
        }
       通知 master  Executor状态改变
        master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
          Some(e.toString), None)
      }
    }
  }12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758

来看看ExecutorRunner.scala

 def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }   
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = new Thread() {
      override def run() {
        killProcess(Some("Worker shutting down"))
      }
    }
    Runtime.getRuntime.addShutdownHook(shutdownHook)
 }12345678910111213

ExecutorRunner.scala

  /**
   * Download and run the executor described in our ApplicationDescription
   */
  def fetchAndRunExecutor() {
try {
  // Launch the process
  val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory,
    sparkHome.getAbsolutePath, substituteVariables)
  val command = builder.command()
  logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

  builder.directory(executorDir)
  builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
  // In case we are running this from within the Spark Shell, avoid creating a "scala"
  // parent process for the executor command
  builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

  // Add webUI log urls
  val baseUrl =
    s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
  builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
  builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

  process = builder.start()
  val header = "Spark Executor Command: %s\n%s\n\n".format(
    command.mkString("\"", "\" \"", "\""), "=" * 40)

  // Redirect its stdout and stderr to files
  val stdout = new File(executorDir, "stdout")
  stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

  val stderr = new File(executorDir, "stderr")
  Files.write(header, stderr, UTF_8)
  stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

  // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
  // or with nonzero exit code
  val exitCode = process.waitFor()
  state = ExecutorState.EXITED
  val message = "Command exited with code " + exitCode

  看到了。Executor 状态是先 通知worker,然后由worker通知master,
  worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
  case interrupted: InterruptedException => {
    logInfo("Runner thread for executor " + fullId + " interrupted")
    state = ExecutorState.KILLED
    killProcess(None)
  }
  case e: Exception => {
    logError("Error running executor", e)
    state = ExecutorState.FAILED
    killProcess(Some(e.toString))
  }
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455

}
}

然后当worker节点上的executor进程启动之后,会向Driver节点上相应的Driver进程反向注册,说明executor进程已经启动成功,等待执行task


猫的想法不敢猜
6楼 · 2021-03-12 15:16

当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。

来源于网络仅供参考

pipi雪
7楼 · 2021-03-15 17:35

当一个Spark上的Application要启动的时候,Master就会发送使用调度算法给Application分配资源,也就是将Application所需要的资源分摊到Worker上面以保证分布式的计算。同时Master还会给Worker发送消息让它去启动Driver(Yarn-Client模式下),和Executor。同时Executor的反向注册与Driver与Executor的状态改变也会通过Worker中的线程与Master进行通信。这就是Worker的重要作用①启动②信息的传递。

相关问题推荐

  • spark的优点有哪些?2021-07-05 16:55
    回答 20

    首先, Spark 非常好用。由于高级 API 剥离了对集群本身的关注,你可以专注于你所要做的计算本身, 只需在自己的笔记本电脑上就可以开发 Spark 应用。其次, Spark 很快,支持交互式使用和复杂算法。最后, Spark 是一个通用引擎,可用它来完成各种各样的运算...

  • 回答 5

    现在企业中多数用的是相对稳定的Spark2.2.0版本。

  • 回答 4

        Spark,是一种One Stack to rule them all的大数据计算框架,是一种基于内存计算的框架,是一种通用的大数据快速处理引擎。    这一站式的计算框架,包含了Spark RDD(这也是Spark Core用于离线批处理)、Spark SQL(交互式查询)、Spark Streaming(实时...

  • 回答 10

    常用RDD算子(1)Action RDDforeach:遍历每个元素,无返回值,一般用在将结果存储到数据库中使用saveAsTextFile存储到hdfs,RDD每个partition存到hdfs的一个block块saveAsObjectFile:存储到hdfs,将每个partition的数据序列化后,以sequenceFile(序列化)...

  • RDD五大属性2020-07-15 13:45
    回答 3

    1、RDD是一个分片的数据集合;2、RDD的函数针对每个分片进行计算;3、RDD之间是个依赖的集合;4、可选:key-value型RDD是根据哈希来分区的;5、可选:数据本地性优先计算。

  • 回答 3

    在hadoop/bin目录下有yarn命令yarn application -kill 

  • 回答 3
    已采纳

    1.Spark SQLSpark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发...

  • 回答 4

    Spark SQL 在 Spark1.x 中和传统 SQL 不完全一致。但是在 Spark2.x 版本中遵循的美国的ANSI的SQL2003完全标准sql 。oracle和mysql都有自己的语法规则,平时所使用的 SQL 语句都不是标准 SQL 。平时用的 mysql 和 oracle 以及 hive,都是部分遵循标准SQL 。...

  • 回答 3

    #!/bin/bash #队列名 根据yarn的队列提交 realtime_queue=root #提交的任务名 my_job_name=OrderQZspark-shell --master yarn --deploy-mode client \--queue $realtime_queue \ #总的executors数 根据数据量与自己的集群资源来分配--num-executors...

  • 回答 2

    Spark是一种快速、通用、可扩展的大数据分析引擎,于2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月成为Apache顶级项目。项目是用Scala进行编写。Spark的结构:Spark生态系统已经发展成为一个包含多个子项目的集...

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 1

    把数据从redis读出来放到kafka里呗,然后用spark-streaming去读kafka的数据,或者写个程序从redis把数据读出来用socket或文件的形式传给spark-streaming,spark-streaming支持很多种源的方式

没有解决我的问题,去提问