Spark】spark job运行原理是什么?

2020-05-25 08:50发布

2条回答
息红泪
2楼 · 2020-05-25 09:06

一:Spark集群部署

二:Job提交解密

三:Job生成和接受

四:Task的运行

五:再论shuffle


1,从spark Runtime 的角度讲来讲有5大核心对象:Master , Worker , Executor ,Driver , CoarseGrainedExecutorbacked ;


2,Spark 在做分布式集群系统的设计的时候,最大化功能的独立,模块化封装具体的独立的对象,强内聚低耦合   (耦合性也称块间联系,指软件系统结构中各模块间相互联系紧密程度的一种度量。模块之间联系越紧密,其耦合性就越强,模块的独立性则越差。模块间耦合高低取决于模块间接口的复杂性、调用的方式及传递的信息。内聚性又称块内联系。指模块的功能强度的度量,即一个模块内部各个元素彼此结合的紧密程度的度量。若一个模块内各元素(语名之间、程序段之间)联系的越紧密,则它的内聚性就越高。)




3,当Driver中的sparkContext 初始化的时候会提交程序给Master,Master如果接受该程序在spark中运行的话,就会为当前程序分配AppID ,同时分配计算资源,需要特备注意的是: Master是根据当前程序的配置信息来给集群中的Worker发指令来分配具体的计算资源。但是,Master发指令后并不关心具体的计算资源是否已经分配,转过来说,Master发出指令后就记录了分配的资源,以后客户端再次提交其他程序的话就不能使用该资源啦,其弊端是可能会导致其他要提交的程序无法分配到本来应该可以分配到的计算资源。最终优势在spark分布式系统功能弱耦合的基础上最快的运行系统(否则如果Master要等到计算资源最终分配成功后才通知Driver的话,会造成Driver的阻塞,不能够最大化的并行计算资源的利用率)  (低耦合 : 不关心指令发送成功还是失败)  (快是对Driver 而言)


补充说明的是: Spark默认程序是排队的,Spark默认的情况下由于集群中一般都只有一个Application在运行,所有Master分配计算资源策略就没有那么明显啦)


二 : Job提交过程 源码解密


1,一个非常重要的技巧通过在Spark-shell 中运行一个Job来了解Job提交的过程,然后再次用源码验证。

这个过程  : 

  sc.textFile("library/data1").flatMap(_.split("")).map(word => (word,1)).reduceByKey(_+_)saveAsTextFile("/library/data2")


2,在Spark中所有的Action都会触发一个至少一个Job,在上述代码中通过savaAsTextFile来触发Job的


3.SparkContext 在实例化的时候会构造SparkDeployShedulerBackend(deploy : 配置,部署),DAGScheduler,TaskShedulerImpl(Impl : 接口),MapOutputTrackerMaster(Tracker : 追踪)等对象:

(1)SparkDeploySchedulerBackend负责集群计算资源的管理和调度。

(2)DAGScheduler : 负责高层调度(例如: Job中stage的划分,数据本地性等内容)

(3)TaskShedulerImpl : 负责具体stage内部的底层调度(例如: 每个Task的调度 ,Task容错等等)

(4)MapOutputTrackerMaster: 负责shuffle中数据的输出和读取的管理。


4,TaskSchedulerImpl内部的调度:



三:Task 的运行解密:

1,Task运行在Executor中,而Executor又是位于CoarseGrainedExecutorBackend中的且CoarseGrainedExecutorBackend和Executor是一一对应的:


2,单CoarseGrainedExecutorBackend接受到TaskSetManager发过来的LaunchTask的消息后会反序列化TaskDescription,然后使用CoarseGrainedExecutorBackend中唯一的Executor来执行任务


case LaunchTask(data) =>

if (executor == null) {

logError(“Received LaunchTask command but executor was null”)

System.exit(1)

} else {

val taskDesc = ser.deserializeTaskDescription

logInfo(“Got assigned task ” + taskDesc.taskId)

executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,

taskDesc.name, taskDesc.serializedTask)

}


发消息要么是case class 或者 case object(是唯一的)每次生成类的事例



小冰块儿
3楼 · 2020-05-25 09:20

Spark作业的运行基本原理如下图所示:

我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。提交作业的节点称为Master节点,Driver进程就是开始执行你Spark程序的那个Main函数(Driver进程不一定在Master节点上)。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。

Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点Worker上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。


相关问题推荐

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 9

    1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...

  • 回答 2

    搭建高可用模式用的协同处理框架。

  • 回答 1

    多数是版本不对、、、、、、引用错误,,,等

  • 回答 1

    目前,还是选择saprkflink还有成长的空间

  • 回答 2

            org.springframework.boot         spring-boot-starter-parent         1.3.2.RELEASE                             2.10.4         1.6.2                       ...

  • 回答 1

    一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...

  • 回答 4

    该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...

  • 回答 2

    Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write...

  • 回答 1

    json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?json文件如下,有字符格式的日期类型```{ name : Andy, age : 30, time :2015-03-03T08:25:55.769Z}{ name : Justin, age : 19, time : 2015-04-04T08:25:55.769Z }{ name : pan, ag.....

  • 什么叫闭包?Spark 2020-05-25 22:30
    回答 2

    python中    方法中定义了一个其他的方法,调用方法的时候,也执行了其他的方法def outer():     name = out     print(name)     def inner():         name = inner         print(name)     return inner   s...

  • 回答 3

    1).使用程序中的集合创建rdd2).使用本地文件系统创建rdd3).使用hdfs创建rdd,4).基于数据库db创建rdd5).基于Nosql创建rdd,如hbase6).基于s3创建rdd,7).基于数据流,如socket创建rdd

  • 回答 3

    1)自动的进行内存和磁盘的存储切换;2)基于Lineage的高效容错;3)task如果失败会自动进行特定次数的重试;4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;5)checkpoint和persist,数据计算之后持久化缓存6)数据调度弹性,DAG TASK...

  • 回答 1

    Spark中数据的本地化方式分为5种1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次...

  • 回答 1

    一、spark普通shuffle的基本原理      1、假如现在在一个节点上由4个shufflemapTask在执行,但是这个节点的core的数量数2,在远端有4个resultTask等待接收shuffleMapTask的数据进行处理      2、这样可以有两个shufflemaptask可以同时执行,在每一...

没有解决我的问题,去提问