flink 是否允许任务共享slot?

2020-08-27 09:34发布

flink 是否允许任务共享slot?

flink 是否允许任务共享slot?

3条回答
aijingda
2楼 · 2020-08-27 10:11

flink允许任务共享slot,具体讲一下如何共享slot:

默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。

允许slot共享有以下两点好处:

1.Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将task的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。

2.2共享Slot实例

将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。

首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。

2.3 SlotSharingGroup(soft)

SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。

保证同一个group的并行度相同的sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)

为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup什么呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。

2.4 CoLocationGroup(强制)

CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。

3. Slot & parallelism的关系

3.1 Slots && parallelism

如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。

3.2如何计算Slot

如何计算一个应用需要多少slot?

 

如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。


爱梦 - 拿来吧你
3楼 · 2021-09-17 09:14

为了实现并行执行,Flink应用会将算子划分为不同任务,然后将这些任务分配到集群中的不同进程上去执行。和很多其他分布式系统一样,Flink应用的性能很大程度上取决于任务的调度方式。任务被分配到的工作进程、任务间的共存情况以及工作进程中的任务数都会对应用的性能产生显著影响。本节中我们就讨论一下如何通过调整默认行为以及控制作业链与作业分配(处理槽共享组)来提高应用的性能。


其实这两个概念我们可以看作:资源共享链与资源共享组。当我们编写完一个Flink程序,从Client开始执行——>JobManager——>TaskManager——>Slot启动并执行Task的过程中,会对我们提交的执行计划进行优化,其中有两个比较重要的优化过程是:任务链与处理槽共享组,前者是对执行效率的优化,后者是对内存资源的优化。


作业链

一、执行过程

                                   


Chain:Flink会尽可能地将多个operator链接(chain)在一起形成一个task pipline。每个task pipline在一个线程中执行

优点:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换(即降低本地数据交换成本),减少了延迟的同时提高整体的吞吐量。

概述:在StreamGraph转换为JobGraph过程中,关键在于将多个 StreamNode 优化为一个 JobVertex,对应的 StreamEdge 则转化为 JobEdge,并且 JobVertex 和 JobEdge 之间通过 IntermediateDataSet (中间数据集)形成一个生产者和消费者的连接关系。每个JobVertex就是JobManger的一个任务调度单位(任务Task)。为了避免在这个过程中将关联性很强的几个StreamNode(算子)放到不同JobVertex(Task)中,从而导致因为Task执行产生的效率问题(数据交换(网络传输)、线程上下文切换),Flink会在StreamGraph转换为JobGraph过程中将可以优化的算子合并为一个算子链(也就是形成一个Task)。这样就可以把这条链上的算子放到一个线程中去执行,这样就提高了任务执行效率。

可见,StreamGraph转换为JobGraph过程中,实际上是逐条审查每一个StreamEdge和该SteamEdge两头连接的两个StreamNode的特性,来决定该StreamEdge两头的StreamNode是不是可以合并在一起形成算子链。这个判断过程flink给出了明确的规则,我们看一下StreamingJobGraphGenerator中的isChainable()方法:


                                      


该方法返回true时两个端点才可以合并到一起,根据源码我们可以得出形成作业链的规则如下:


上下游的并行度一致(槽一致)

该节点必须要有上游节点跟下游节点;

下游StreamNode的输入StreamEdge只能有一个) 

上下游节点都在同一个 slot group 中(下面会解释 slot group) 

下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS) 

上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD) 

上下游算子之间没有数据shuffle (数据分区方式是 forward) 

用户没有禁用 chain 

二、开启/禁用全局作业链

用户能够通过禁用全局作业链的操作来关闭整个Flink的作业链,但是这个操作会影响到这个作业的执行情况,除非我们非常清楚作业的执行过程,否则不建议这么做:StreamExecutionEnvironment.disableOperatorChaining()。全局作业链关闭之后,如果想创建对应Operator的作业链,可以使用startNewChain()方法:someStream.filter(...).map(...).startNewChain().map(...)。注意该方法只对当前操作符及之后的操作符有效,所以上述代码只对两个map进行链条绑定。


三、禁用局部作业链

如果我们只想对某个算子执行禁用作业链,只需调用disableChaining()方法:someSteam.map().disableChaining().filter(),该方法只会禁用当前算子的链条(上述代码中就是map),对其他算子操作不产生影响。


处理槽共享组(出于某中目的将多个Task放到同一个slot中执行)

一、Task Slot

 TaskManager 是一个 JVM 进程,并会以独立的线程来执行一个task。为了控制一个 TaskManager 能接受多少个 task,Flink 提出了 Task Slot 的概念,通过 Task Slot 来定义Flink 中的计算资源。solt 对TaskManager内存进行平均分配,每个solt内存都相同,加起来和等于TaskManager可用内存,但是仅仅对内存做了隔离,并没有对cpu进行隔离。将资源 slot 化意味着来自不同job的task不会为了内存而竞争,而是每个task都拥有一定数量的内存储备。


通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的话,也就是说多个task运行在同一个JVM中。而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数据的网络传输。也能共享一些数据结构,一定程度上减少了每个task的消耗。


二、共享槽

问题:


一个TaskManager中至少有一个插槽slot,每个插槽均分内存并且之间是内存隔离的,但是共享CPU。算子根据计算复杂度可以分为资源密集型与非资源密集型算子(可以认为有的算子计算时内存需求大,有些算子内存需求小)。现在有这么个情况:某个Job下的Tasks中既有资源密集型Task(A),又有非资源密集型Task(B),他们被分到不同的slot上,这就会产生问题:

有的slot内存使用率大,有的slot内存使用率小,这样就很不公平,一个槽资源没有得到充分的利用;

对于槽资源有限的情况,任务并行度也不高。

解决方案


                           


默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。允许槽共享,会有以下两个方面的好处:


对于slot有限的场景,我们可以增大每个task的并行度。比如如果不设置SlotSharingGroup,默认所有task在同一个共享组(可以共享所有slot),那么Flink集群需要的任务槽与作业中使用的最高并行度正好相同。但是如上图所示,如果我们强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的共享组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30;

能更好的利用资源:如果没有slot共享,那些资源需求不大的map/source/flatmap子任务将和资源需求更大的window/sink占用相同的资源,槽资源没有充分利用(内存没有充分利用)。

具体共享机制实现


Flink决定哪些任务需要共享slot 以及哪些任务必须放入特定slot。虽然task共享Slot提升资源利用率,但是如果一个Slot中容纳过多task反而会造成资源低下(比如极端情况下所有task都分布在一个Slot内)。所以在Flink中task需要按照一定规则共享Slot ,主要通过SlotSharingGroup和CoLocationGroup定义:


CoLocationGroup:强制将subTasksk放到同一个slot中,是一种硬约束:

保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中(所有的并行度相同的subTasks运行在同一个slot );

主要用于迭代流(训练机器学习模型) ,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。

SlotSharingGroup: 它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,只是尽量做到不能完全保证。

算子的默认group为default,所有任务可以共享同一个slot; 

要想确定一个未做SlotSharingGroup设置的算子的group是什么,可以根据上游算子的 group 和自身是否设置 group共同确定(也就是说如果下游算子没有设置分组,它继承上游算子的分组); 

为了防止不合理的共享,用户可以通过提供的API强制指定operator的共享组。因为不合理的共享槽资源(比如默认情况下所有任务共享所有的slot)会导致每个槽中运行的线程述增多,增加了机器负载。所以适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。比如: someStream.filter(...).slotSharingGroup("group1")就强制指定了filter的slot共享组为group1。

三、Slot共享以及task的调度过程

Flink在调度任务分配Slot的时候遵循两个重要原则:

同一个Job中的同一分组中的不同Task可以共享同一个Slot;

Flink是按照拓扑顺序依次从Source调度到sink。

假设有两个TM:TM1、TM2,每个TM有3个Slot:S1,S2,S3。假设source/map的并行度为2,keyBy/window/sink的并行度为4,那么调度的顺序依次为source/map[1] ->source/map[2] ->keyBy/window/sink[1]->keyBy/window/sink[2]->keyBy/window/sink[3]->keyBy/window/sink[4]。那么Flink调度任务时(使用默认共享分组):

首先调度子任务source/map[1]到TM1.S1;

然后调度子任务source/map[2] ,根据Flink的调度原则:source/map[1] 和source/map[2] 属于同一个Task下的两个SubTask,所以他们不能放到同一个Slot中,所以source/map[2]被调度到TM1.S2;

然后调度keyBy/window/sink,keyBy/window/sink的子任务会被依次调度到TM1.S1、TM1.S2、TM2.S1、TM2.S2。但是如果source/map与keyBy/window/sink属于不同分组,那么keyBy/window/sink会被调度到TM1.S3、TM2.S1、TM2.S2、TM2.S3。

总结

一个Task的子任务SubTask个数称为它的并行度;

一个Task的并行度等于分配给它的Slot个数(前提槽资源充足);

同一个Job下的不同Task可一个放到同一个Slot中——处理槽共享分组;


汽水味的小盆友
4楼 · 2021-09-23 09:47

Flink运行时主要角色有两个:JobManager和TaskManager,无论是standalone集群,flink on yarn都是要启动这两个角色。JobManager主要是负责接受客户端的job,调度job,协调checkpoint等。TaskManager执行具体的Task。TaskManager为了对资源进行隔离和增加允许的task数,引入了slot的概念,这个slot对资源的隔离仅仅是对内存进行隔离,策略是均分,比如taskmanager的管理内存是3GB,假如有两个个slot,那么每个slot就仅仅有1.5GB内存可用。Client这个角色主要是为job提交做些准备工作,比如构建jobgraph提交到jobmanager,提交完了可以立即退出,当然也可以用client来监控进度。

Jobmanager和TaskManager之间通信类似于Spark 的早期版本,采用的是actor系统。如下图



什么是task?
在spark中:

RDD中的一个分区对应一个task,task是单个分区上最小的处理流程单元。被送到某个Executor上的工作单元,和hadoopMR中的MapTask和ReduceTask概念一样,是运行Application的基本单位,多个Task组成一个Stage

上述引入spark的task主要是想带着大家搞明白,以下几个概念:

Flink的并行度由什么决定的?
Flink的task是什么?
Flink的并行度由什么决定的?

这个很简单,Flink每个算子都可以设置并行度,然后就是也可以设置全局并行度。

Api的设置

.map(new RollingAdditionMapper()).setParallelism(10)

全局配置在flink-conf.yaml文件中,parallelism.default,默认是1:可以设置默认值大一点

Flink的task是什么?

按理说应该是每个算子的一个并行度实例就是一个subtask-在这里为了区分暂时叫做substask。那么,带来很多问题,由于flink的taskmanager运行task的时候是每个task采用一个单独的线程,这就会带来很多线程切换开销,进而影响吞吐量。为了减轻这种情况,flink进行了优化,也即对subtask进行链式操作,链式操作结束之后得到的task,再作为一个调度执行单元,放到一个线程里执行。如下图的,source/map 两个算子进行了链式;keyby/window/apply有进行了链式,sink单独的一个。

说明:图中假设是source/map的并行度都是2,keyby/window/apply的并行度也都是2,sink的是1,总共task有五个,最终需要五个线程。

默认情况下,flink允许如果任务是不同的task的时候,允许任务共享slot,当然,前提是必须在同一个job内部。

结果就是,每个slot可以执行job的一整个pipeline,如上图。这样做的好处主要有以下几点:

1.Flink 集群所需的taskslots数与job中最高的并行度一致。也就是说我们不需要再去计算一个程序总共会起多少个task了。

2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作 keyAggregation/sink 一样多的资源。如果有slot共享,将基线的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks,比如keyby/window/apply操作就会均分到申请的所有slot里,这样slot的负载就均衡了。

链式的原则,也即是什么情况下才会对task进行链式操作呢?简单梗概一下:

上下游的并行度一致
下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
上下游节点都在同一个 slot group 中(下面会解释 slot group)
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
两个节点间数据分区方式是 forward(参考理解数据流的分区)
用户没有禁用 chain

slot和parallelism

1.slot是指taskmanager的并发执行能力

在hadoop 1.x 版本中也有slot的概念,有兴趣的读者可以了解一下

taskmanager.numberOfTaskSlots:3

每一个taskmanager中的分配3个TaskSlot,3个taskmanager一共有9个TaskSlot

2.parallelism是指taskmanager实际使用的并发能力

parallelism.default:1

运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲。设置合适的并行度才能提高效率。

3.parallelism是可配置、可指定的

1.可以通过修改$FLINK_HOME/conf/flink-conf.yaml文件的方式更改并行度

2.可以通过设置$FLINK_HOME/bin/flink 的-p参数修改并行度

3.可以通过设置executionEnvironmentk的方法修改并行度

4.可以通过设置flink的编程API修改过并行度

5.这些并行度设置优先级从低到高排序,排序为api>env>p>file.

6.设置合适的并行度,能提高运算效率

7.parallelism不能多与slot个数。

slot和parallelism总结

1.slot是静态的概念,是指taskmanager具有的并发执行能力

2.parallelism是动态的概念,是指程序运行时实际使用的并发能力

3.设置合适的parallelism能提高运算效率,太多了和太少了都不行

4.设置parallelism有多中方式,优先级为api>env>p>file

相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike'port';4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问