alchemy零基础请教,怎么才能使for i in xxx.query.这个过程快一点?

2021-02-07 09:54发布

for szh in xxx.query.filter(xxx.age == 16):#10条记录    for szy in xxx.query.filter(xxx.sex == 'boy...

for szh in xxx.query.filter(xxx.age == 16):#10条记录
   for szy in xxx.query.filter(xxx.sex == 'boy'):#14条记录
       for szs in xxx.query.filter(xxx.xxx == xxx):#11条记录
           for kzb in xxx.query.filter(xxx.xxx == xxx):#10条记录
               for lzg in xxx.query.filter(xxx.xxx == xxx):#10条记录
                   pass
return '结束'

代码差不多就上面那样,xxx就类似通讯录表单,上面5个循环用了36秒。五个for i in range(14)相套才不到1秒。

请教有什么办法改进吗?


1条回答
007
2楼 · 2021-02-25 14:58





Airflow是什么

Airflow是一个可编程,调度和监控的工作流平台,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行。airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理,可视化方面和易用性都是很好的。

2019年airflow已经成长为apache的顶级项目了,跟spark搭配是很常用的场景。

只要符合定时任务流的工作,都可以用Airflow来实现。我们主要用Airflow来实现定时的ETL处理。

我们经常会使用spark来做etl,但是etl的流程和步骤是很多的,比如先清洗什么,再清洗什么,有一个步骤需要上一个步骤清洗完了才能启动。最原始的肯定是人工监控和手动调度执行。但是这样太累了。

airflow能很好的实现自动化这部分的逻辑。

只需要我们编写好相关执行顺序以及依赖的DAG流程。airflow就能按照定好的流程自动进行调度运行。

airflow官网

airflow官网文档

github

相关概念和原理

1、DAGDAG意为有向无循环图,在Airflow中则定义了整个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。

2、TaskTask为DAG中具体的作业任务,它必须存在于某一个DAG之中。Task在DAG中配置依赖关系,跨DAG的依赖是可行的,但是并不推荐。跨DAG依赖会导致DAG图的直观性降低,并给依赖管理带来麻烦。

3、DAGRun当一个DAG满足它的调度时间,或者被外部触发时,就会产生一个DAGRun。可以理解为由DAG实例化的实例。

4、TaskInstance当一个Task被调度启动时,就会产生一个TaskInstance。可以理解为由Task实例化的实例

5、执行器(Executor)Airflow本身是一个综合平台,它兼容多种组件,所以在使用的时候有多种方案可以选择。比如最关键的执行器就有四种选择:

SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

LocalExecutor:多进程本地执行任务

DaskExecutor:动态任务调度,主要用于数据分析

CeleryExecutor:分布式调度,生产常用

celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,比如redis或者rabbitmq,以rabbitmq为例,系统整体结构如下所示:


其中:

turing为外部系统

GDags服务帮助拼接成dag

master节点webui管理dags、日志等信息

scheduler负责调度,只支持单节点

worker负责执行具体dag中的task,worker支持多节点

在整个调度系统中,节点之间的传递介质是消息,而消息的本质内容是执行脚本的命令,也就是说,工作节点的dag文件必须和master节点的dag文件保持一致,不然任务的执行会出问题。

6、任务处理器

operator描述了工作流中的一个task,是一个抽象的概念,相当于抽象task定义

airflow内置了丰富的任务处理器,用于实现不同类型的任务:

BashOperator:执行bash命令

PythonOperator:调用python代码

EmailOperator:发送邮件

HTTPOperator:发送HTTP请求

SqlOperator:执行SQL命令

除了这些基本的构建块之外,还有更多的特定处理器:DockerOperator,HiveOperator,S3FileTransferOperator,PrestoToMysqlOperator,SlackOperator

7、角色

webserver:提供web端服务,以及会定时生成子进程去扫描对应的目录下的dags,并更新数据库

scheduler:任务调度服务,根据dags生成任务,并提交到消息中间件队列中(redis或rabbitMq)

celeryworker:分布在不同的机器上,作为任务真正的的执行节点。通过监听消息中间件:redis或rabbitMq领取任务

flower:监控worker进程的存活性,启动或关闭worker进程,查看运行的task

airflow特点

分布式任务调度:允许一个工作流的task在多台worker上同时执行

可构建任务依赖:以有向无环图的方式构建任务依赖关系

task原子性:工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始任务

安装

#airflowneedsahome,~/airflowisthedefault,

#butyoucanlayfoundationsomewhereelseifyouprefer

#(optional)

exportAIRFLOW_HOME=~/airflow


#installfrompypiusingpip

pipinstallapache-airflow


#initializethedatabase

airflowinitdb


#startthewebserver,defaultportis8080

airflowwebserver-p8080

#nohupairflowwebserver-p8080>~/airflow/active.log2>&1&


#startthescheduler

airflowscheduler


#删除dag(需要先去~/airflow/dags中删除py文件)

airflowdelete_dag-y{dag_id}


使用pip安装的安装路径一般在python路径的site-packages中。

当在~/airflow/dags文件下添加py文件后,需要等待一会,才会在web中显示。重新运行airflowscheduler可以立刻在web中显示py文件。

显示了py文件之后我们就可以运行DAG了。

修改配置

基础配置airflow.cfg

安装好Airflow,第一次运行airflowinitdb之后,会在Airflow文件夹下面产生一个airflow.cfg文件,这个就是基础配置文件。我们以这个基础文件作为模板来修改成为我们需要的配置文件。以下的操作都是找到对应的配置字段,修改其字段内容。

修改默认时区:default_timezone=Asia/Shanghai,说明:修改时区之后,Airflow前端页面仍旧会使用UTC时区显示,但是配合主机/容器的时区,这样我们在写dag任务执行时间的时候就不需要转换时区了。

修改执行器类型:executor=CeleryExecutor

不加载范例dag:load_example=False

不让同个dag并行操作:max_active_runs_per_dag=1,说明:在ETL过程中,还是线性执行会比较好控制,如果里面需要批量操作,可以在ETL的具体处理过程中加入多线程或者多进程方式执行,不要在dag中体现

最高的dag并发数量:dag_concurrency=16,说明:一般配置成服务器的CPU核数,默认16也没问题。

最高的任务并发数量:worker_concurrency=16,说明:CeleryExecutor在Airflow的worker线程中执行的,这里配置的是启动多少个worker

数据库配置:sql_alchemy_conn=mysql://airflow:airflow@127.0.0.1:3306/airflow?charset=utf8,说明:我们一般是用MySQL来配合Airflow的运行

CeleryBroker:broker_url=redis://:password@127.0.0.1:6379/0,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。

CeleryResultbackend:result_backend=redis://:password@127.0.0.1:6379/1,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。

五、MySQL需要注意的地方

mysql的配置中需要加入以下内容,不然执行会报错。需要在initdb之前加入并重启。

[mysqld]innodb_large_prefix=onexplicit_defaults_for_timestamp=1

六、运行

由于使用的是CeleryExecutor,需要顺序执行三个进程:airflowwebserver-Dairflowscheduler-Dairflowworker-D

常用管理airflow命令

airflowtest dag_idtask_idexecution_date  测试task


示例: airflowtestexample_hello_world_daghello_task20200226


airflowrundag_idtask_idexecution_date运行task


airflowrun-Adag_idtask_idexecution_date忽略依赖task运行task


airflowtrigger_dagdag_id-rRUN_ID-eEXEC_DATE运行整个dag文件


airflowwebserver-D 守护进程运行webserver


airflowscheduler-D 守护进程运行调度


airflowworker-D守护进程运行celeryworker


airflowworker-c1-D守护进程运行celeryworker并指定任务并发数为1


airflowpausedag_id 暂停任务


airflowunpausedag_id取消暂停,等同于在管理界面打开off按钮


airflowlist_tasksdag_id查看task列表


airflowcleardag_id清空任务实例


web界面使用

启动web管控界面需要执行airflowwebserver-D命令,默认访问端口是8080

假设部署在192.168.30.11这台服务器上

在浏览器中访问http://192.168.30.11:8080/admin/


(1)任务启动暂停开关

(2)任务运行状态

(3)待执行,未分发的任务

(4)手动触发执行任务

(5)任务管控界面

选择对应dag栏目,点击(5)中的GraphView即可进入任务管控界面


点击对应的任务,会弹出一个任务管控台,主要几个功能如下:

ViewLog:查看任务日志

Run:运行选中任务

Clear:清空任务队列

MarkSuccess:标记任务为成功状态

在界面中配置参数

Menu->Admin->Variables



使用airflow

运行一个dag的流程

在~/airflow/dags文件下添加py文件,(需要等待一会,才会在web中显示,如果未开启webserver,也是可以运行的)

airflowunpausedag_id(取消暂停任务,任务会按照设定时间周期执行)

airflowtrigger_dagdag_id(立刻运行整个dag)


重启一个dag的流程

rm-rf~/airflow/dags/aml_sl_with_config.py

airflowdelete_dag-yaml_sl_with_config

ps-ef|grep"airflowscheduler"|awk'{print$2}'|xargskill-9

vi~/airflow/dags/aml_sl_with_config.py

nohupairflowscheduler&


通过DAG文件实现定时任务

crontab语法

crontab格式如下所示:

#┌─────────────minute(0-59)

#│┌─────────────hour(0-23)

#││┌─────────────dayofmonth(1-31)

#│││┌─────────────month(1-12)

#││││┌─────────────dayofweek(0-6)(SundaytoSaturday;

#│││││7isalsoSundayonsomesystems)

#│││││

#│││││

#*****commandtoexecute


域是否必须取值范围可用特殊符号备注MinutesYes0–59*,-HoursYes0–23*,-DayofmonthYes1–31*,-?LW?LW部分实现可用MonthYes1–12orJAN–DEC*,-DayofweekYes0–6orSUN–SAT*,-?L#?LW部分实现可用YearNo1970–2099*,-标准实现里无这一项

特殊符号功能说明:

逗号(,)

逗号用于分隔一个列表里的元素,比如"MON,WED,FRI"在第五域(dayofweek)表示Mondays,WednesdaysandFridays。


连字符(-)

连字符用于表示范围,比如2000–2010表示2000到2010之间的每年,包括这两年(闭区间)。


百分号(%)

用于命令(command)中的格式化


L

表示last,最后一个,比如第五域,5L表示当月最后一个星期五


W

W表示weekday(Monday-Friday),指离指定日期附近的工作日,比如第三域设置为15L,这表示临近当月15附近的工作日,假如15号是星期六,那么定时器会在14号执行,如果15号是星期天,那么定时器会在16号执行,也就是说只会在离指定日期最近的那天执行。


井号#

#用于第五域(dayofweek),#后面跟着一个1~5之间的数字,这个用于表示第几个星期,比如5#3表示第三个星期五


?

在有些实现里面,?与*的功能相同,还有一些实现里面?表示cron的启动时间,比如当cron服务在8:25am启动,则??****会更新为258****,直到下一次cron服务重新启动,定时器会再次更新。


/

/一般与*组合使用,后面跟着一个数字,表示频率,比如在第一域(Minutes)中*/5表示每5分钟,是普通列表表示5,10,15,20,25,30,35,40,45,50,55,00的缩写


普通任务–helloWorld

fromdatetimeimporttimedelta,datetime

importairflow

fromairflowimportDAG

fromairflow.operators.bash_operatorimportBashOperator

fromairflow.operators.dummy_operatorimportDummyOperator


default_args={#默认参数

'owner':'zzq',#dag拥有者,用于权限管控

'depends_on_past':False,#是否依赖上游任务

'start_date':datetime(2020,2,26),#任务开始时间,默认utc时间

'email':['123456789@.com'],#告警通知邮箱地址

'email_on_failure':False,

'email_on_retry':False,

'retries':1,

'retry_delay':timedelta(minutes=5),

}


dag=DAG(

'example_hello_world_dag',#dag的id

default_args=default_args,

description='myfirstDAG',#描述

schedule_interval='*/20****',#crontab

start_date=datetime(2020,2,26)#开始时间,覆盖默认参数

)


defprint_hello():

return'Helloworld!'


dummy_operator=DummyOperator(task_id='dummy_task',dag=dag)


hello_operator=BashOperator(#通过BashOperator定义执行bash命令的任务

task_id='sleep_task',

depends_on_past=False,

bash_command='echo`date`>>/home/py/test.txt',

dag=dag

)


dummy_operator>>hello_operator#设置任务依赖关系

#dummy_operator.set_downstream(hello_operator)


定义http任务并使用本地时间

importos

fromdatetimeimporttimedelta,datetime

importpytz

fromairflow.operators.http_operatorimportSimpleHttpOperator

fromairflow.modelsimportDAG


default_args={

'owner':'zzq',

#'depends_on_past':False,

'depends_on_past':True,

'wait_for_downstream':True,

'execution_timeout':timedelta(minutes=3),

'email':['123456789@.com'],

'email_on_failure':False,

'email_on_retry':False,

'retries':1,

'retry_delay':timedelta(minutes=5),

}


#将本地时间转换为utc时间,再设置为start_date

tz=pytz.timezone('Asia/Shanghai')

dt=datetime(2020,2,26,12,20,tzinfo=tz)

utc_dt=dt.astimezone(pytz.utc).replace(tzinfo=None)


os.environ['AIRFLOW_CONN_HTTP_TEST']='http://localhost:9090'


dag=DAG(

'testtag01',

default_args=default_args,

description='myDAG',

schedule_interval='*/2****',

start_date=utc_dt

)


#通过SimpleHttpOperator定义http任务

task1=SimpleHttpOperator(

task_id='get_op1',

http_conn_id='http_test',

method='GET',

endpoint='test1',

data={},

headers={},

dag=dag)


task2=SimpleHttpOperator(

task_id='get_op2',

http_conn_id='http_test',

method='GET',

endpoint='test2',

data={},

headers={},

dag=dag)


task1>>task2


参数细节

这里我们要特别注意一个关于调度执行时间的问题。在谈这个问题前,我们先确定几个名词:

startdate:在配置中,它是作业开始调度时间。而在谈论执行状况时,它是调度开始时间。scheduleinterval:调度执行周期。executiondate:执行时间,在Airflow中称之为执行时间,但其实它并不是真实的执行时间。那么现在,让我们看一下当一个新配置的DAG生效后第一次调度会在什么时候。很多人会很自然的认为,第一次的调度时间当然是在作业中配置的startdate,但其实并不是。

第一次调度时间是在作业中配置的startdate的第二个满足scheduleinterval的时间点

并且记录的executiondate为作业中配置的startdate的第一个满足scheduleinterval的时间点。

另外,当作业已经执行过之后,startdate的配置将不会再生效,这个作业的调度开始时间将直接按照上次调度所对应的executiondate来计算。

这个例子只是简要的介绍了一下DAG的配置,也只介绍了非常少量的配置参数。Airflow为DAG和作业提供了大量的可配置参数,详情可以参考Airflow官方文档。

跳过非最新DAGRun

假如有一个每小时调度的DAG出错了,我们把它的调度暂停,之后花了3个小时修复了它,修复完成后重新启动这个作业的调度。于是Airflow一下子创建了3个DAGRun并同时执行,这显然不是我们希望的,我们希望它只执行最新的DAGRun。

我们可以创建一个ShortCircuitOperator,并且让DAG中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前DAGRun是否为最新,不是最新的直接跳过整个DAG。

defskip_dag_not_latest_worker(ds,**context):

ifcontext['dag_run']andcontext['dag_run'].external_trigger:

logging.info('ExternallytriggeredDAG_Run:allowingexecutiontoproceed.')

returnTrue


skip=False

now=datetime.now()

left_window=context['dag'].following_schedule(context['execution_date'])

right_window=context['dag'].following_schedule(left_window)

logging.info('Checkinglatestonlywithleft_window:%sright_window:%snow:%s',left_window,right_window,now)


ifnotleft_window

skip=True

returnnotskip


ShortCircuitOperator(

task_id='skip_dag_not_latest',

provide_context=True,

python_callable=skip_dag_not_latest_worker,

dag=dag

)


当存在正在执行的DAGRun时跳过当前DAGRun

依旧是之前提到的每小时调度的DAG,假设它这次没有出错而是由于资源、网络或者其他问题导致执行时间变长,当下一个调度时间开始时Airflow依旧会启动一次新的DAGRun,这样就会同时出现2个DAGRun。如果我们想要避免这种情况,一个简单的方法是直接将DAG的max_active_runs设置为1。但这样会导致DAGRun堆积的问题,如果你配置的调度是早上9点至晚上9点,直至晚上9点之后Airflow可能依旧在处理堆积的DAGRun。这样就可能影响到我们原本安排在晚上9点之后的任务。

我们可以创建一个ShortCircuitOperator,并且让DAG中所有没有依赖的作业都依赖这个作业,然后在这个作业中进行判断,检测当前是否存在正在执行的DAGRun,存在时则直接跳过整个DAG。

defskip_dag_when_previous_running_worker(ds,**context):

ifcontext['dag_run']andcontext['dag_run'].external_trigger:

logging.info('ExternallytriggeredDAG_Run:allowingexecutiontoproceed.')

returnTrue


skip=False

session=settings.Session()

count=session.query(DagRun).filter(

DagRun.dag_id==context['dag'].dag_id,

DagRun.state.in_(['running']),

).count()

session.close()

logging.info('CheckingrunningDAGcount:%s'%count)

skip=count>1

returnnotskip


ShortCircuitOperator(

task_id='skip_dag_when_previous_running',

provide_context=True,

python_callable=skip_dag_when_previous_running_worker,

dag=dag

)


使用的最佳实践

1、利用provide_context和XCOM在任务间传递信息

在default_args里面配置’provide_context’:True,这样在每个任务执行完之后都可以返回一个信息(当你需要的时候),可以使用xcom在不同的operator间传递变量。

这样每个任务都可以获取到之前任务执行返回的信息,以进行自身的处理操作。

以下是一个简单的例子:



#任务1,获得数据并保存到文件中,返回文件名


defjob_get_datas(**kwargs):

filename=get_datas()#数据获取的函数,返回的是存储数据的文件名

returnfilename

    

operator_get_datas=PythonOperator(

task_id='task_get_datas',

    python_callable=job_get_datas,

    dag=dag

    )



#把存储文件的数据导入数据库


defjob_data_2_mysql(**kwargs):

filename=kwargs['task_instance'].xcom_pull(task_ids='task_get_datas')#获取task_get_datas任务返回的数据

result=data_2_mysql(filename)#数据入库的函数

returnresult

    

operator_data_2_mysql=PythonOperator(

task_id='task_data_2_mysql',

    python_callable=job_data_2_mysql,

    dag=dag)





#或者先push到xcom中再pull



defprocessing_data(**kwargs):

kwargs['ti'].xcom_push(key='X',value=X)

kwargs['ti'].xcom_push(key='str_with_trx_with_retail_with_corporate_with_account',value=str_with_trx_with_retail_with_corporate_with_account)



processing_data_operator=PythonOperator(

task_id='processing_data_operator',

provide_context=True,

python_callable=processing_data,

dag=dag,

)



defpredict(**kwargs):

ti=kwargs['ti']

X=ti.xcom_pull(key='X',task_ids='processing_data_operator')


predict_operator=PythonOperator(

task_id='predict_operator',

provide_context=True,

python_callable=predict,

dag=dag,

)



注意:由于这里的上下文信息(任务返回的数据)是存到Airflow的MySQL中,字段长度有限,所以不推荐返回具体数据,而是通过其他途径存储临时数据(例如临时文件形式),返回关键信息(例如临时文件的文件名),这样既不会因为异常断开导致整个任务流需要重跑,也不会因为数据量过大导致Airflow存储MySQL的时候报错。

2、处理逻辑与任务流执行分离

虽然在dag里面可以直接写python代码(Airflow本身也是用python实现的),但是不推荐将处理逻辑写在dag上面。这里有两方面的考虑:

在Airflow的前端界面中,是可以看到dag的代码的,将处理逻辑、特别是数据库或其他服务的用户密码暴露出来未必是好事;

如果将逻辑写在dag里面,那么在测试逻辑的时候,就太依赖Airflow了。这与解耦的开发逻辑思路相违背了,我们是需要一个松耦合的代码世界。

那么推荐在项目下面添加一个etl_utils目录(或者你喜欢的名称),用于存放处理逻辑。这个目录下一般分成三个子目录config、etl、system,分别是配置信息(数据库密码等)、逻辑代码、通用工具(如封装好的es操作类)。那么一般项目的目录结构如下:

-/dag_xxx.py-/test_xxx.py-/etl_utls/-/etl_utls/config/...-/etl_utls/etl/...-/etl_utls/system/...


所有的文件之间的调用层级以根目录为起点。我们在实现逻辑之后,就可以在根目录下编写测试代码,按顺序执行我们需要实现的流程。按这种方式测试完流程之后再组织dag。

3、关于中间数据

在处理逻辑中,我们尽量将每个处理过程细分出来,每个处理完成之后都将数据保存到临时文件中(中间处理过程,一般不要存数据库了,加大数据库的存取压力不是一件好事情,而且这些都是临时的信息),这些文件可以是同一个文件进行反复覆盖(每个任务流都取一个相对唯一的文件名,例如使用uuid,或者第一次处理的时间戳,加上任务流名字作为唯一辨识)。千万不要将这些信息放在内存里,万一挂了,就找不回来了,又要整个流程重新跑过。

4、临时文件

临时文件,注意同个任务流中保持一致,但是在不同任务流中需要能区分,有时候上一个任务流失败了,下一个任务流继续执行,那么如果没有区分能力,就会把上一个任务流的数据给覆盖掉了。注意在最后加上一个删除文件的处理,减少系统空间压力。

5、关于处理频率

机器的处理能力总是有限的,所以我们在条件允许的情况下,每次处理的数据量尽量减小。一般减小每次处理的数据量的方法,就是增加处理频率。但是加大处理频率,又会加大Airflow自身运行需要占用的资源。所以需要在数据量和频率之间找到一个平衡,这里每个项目可能有自己的特点,需要在每个项目的实际情况中找到适合项目的处理频率。

高可用airflow集群安装步骤

airflow单节点部署

将以所有上守护进程运行在同一台机器上即可完成airflow的单结点部署,架构如下图所示


airflow多节点(集群)部署

在稳定性要求较高的场景,如金融交易系统中,一般采用集群、高可用的方式来部署。ApacheAirflow同样支持集群、高可用的部署,airflow的守护进程可分布在多台机器上运行,架构如下图所示:

这样做有以下好处1)高可用如果一个worker节点崩溃或离线时,集群仍可以被控制的,其他worker节点的任务仍会被执行。

2)分布式处理如果你的工作流中有一些内存密集型的任务,任务最好是分布在多台机器上运行以便得到更快的执行。

扩展worker节点

水平扩展你可以通过向集群中添加更多worker节点来水平地扩展集群,并使这些新节点指向同一个元数据库,从而分发处理过程。由于worker不需要在任何守护进程注册即可执行任务,因此所以worker节点可以在不停机,不重启服务下的情况进行扩展,也就是说可以随时扩展。

垂直扩展你可以通过增加单个worker节点的守护进程数来垂直扩展集群。可以通过修改airflow的配置文件-{AIRFLOW_HOME}/airflow.cfg中celeryd_concurrency的值来实现,例如:

celeryd_concurrency=30


您可以根据实际情况,如集群上运行的任务性质,CPU的内核数量等,增加并发进程的数量以满足实际需求。

扩展Master节点

您还可以向集群中添加更多主节点,以扩展主节点上运行的服务。您可以扩展webserver守护进程,以防止太多的HTTP请求出现在一台机器上,或者您想为webserver的服务提供更高的可用性。需要注意的一点是,每次只能运行一个scheduler守护进程。如果您有多个scheduler运行,那么就有可能一个任务被执行多次。这可能会导致您的工作流因重复运行而出现一些问题。下图为扩展Master节点的架构图:


看到这里,可能有人会问,scheduler不能同时运行两个,那么运行scheduler的节点一旦出了问题,任务不就完全不运行了吗?

答案:这是个非常好的问题,不过已经有解决方案了,我们可以在两台机器上部署scheduler,只运行一台机器上的scheduler守护进程,一旦运行scheduler守护进程的机器出现故障,立刻启动另一台机器上的scheduler即可。我们可以借助第三方组件airflow-scheduler-failover-controller实现scheduler的高可用。

具体步骤如下所示:

下载failover

gitclonehttps://github.com/teamclairvoyant/airflow-scheduler-failover-controller


使用pip进行安装

cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}

pipinstall-e.


初始化failover

scheduler_failover_controllerinit


注:初始化时,会向airflow.cfg中追加内容,因此需要先安装airflow并初始化。

更改failover配置

scheduler_nodes_in_cluster=host1,host2


注:hostname可以通过scheduler_failover_controllerget_current_host命令获得

配置安装failover的机器之间的免密登录,配置完成后,可以使用如下命令进行验证:

scheduler_failover_controllertest_connection


启动failover

scheduler_failover_controllerstart


因此更健壮的架构图如下所示:


队列服务及元数据库(Metestore)的高可用。

队列服务取决于使用的消息队列是否可以高用可部署,如RabbitMQ和Redis。

RabbitMQ集群并配置Mirrored模式见:http://blog.csdn.net/u010353408/article/details/77964190

元数据库(Metestore)取决于所使用的数据库,如Mysql等。

Mysql做主从备份见:http://blog.csdn.net/u010353408/article/details/77964157

airflow集群部署的具体步骤

前提条件节点运行的守护进程如下:

master1运行:webserver,scheduler

master2运行:webserver

worker1运行:worker

worker2运行:worker

队列服务处于运行中.(RabbitMQ,Redis,etc)

安装RabbitMQ方法参见:http://site.clairvoyantsoft.com/installing-rabbitmq/

如果正在使用RabbitMQ,推荐RabbitMQ也做成高可用的集群部署,并为RabbitMQ实例配置负载均衡。步骤

在所有需要运行守护进程的机器上安装ApacheAirflow。具体安装方法可参考上面的简单安装。

修改{AIRFLOW_HOME}/airflow.cfg文件,确保所有机器使用同一份配置文件。

修改Executor为CeleryExecutor

executor=CeleryExecutor


指定元数据库(metestore)

sql_alchemy_conn=mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow


设置中间人(broker)如果使用RabbitMQ

broker_url=amqp://guest:guest@{RABBITMQ_HOST}:5672/


如果使用Redis

broker_url=redis://{REDIS_HOST}:6379/0#使用数据库0


设定结果存储后端backend

celery_result_backend=db+mysql://{USERNAME}:{PASSWORD}@{MYSQL_HOST}:3306/airflow


#当然您也可以使用Redis:celery_result_backend=redis://{REDIS_HOST}:6379/1


在master1和master2上部署您的工作流(DAGs)。在master1,初始airflow的元数据库

$airflowinitdb


在master1,启动相应的守护进程

$airflowwebserver

$airflowscheduler


在master2,启动WebServer

$airflowwebserver


在worker1和worker2启动worker

$airflowworker


使用负载均衡处理webserver

可以使用nginx,AWS等服务器处理webserver的负载均衡,不在此详述至此,所有均已集群或高可用部署,apache-airflow系统已坚不可摧。

官方文档如下:Documentation:https://airflow.incubator.apache.org/InstallDocumentation:https://airflow.incubator.apache.org/installation.htmlGitHubRepo:https://github.com/apache/incubator-airflow

参考链接:

如何部署一个健壮的apache-airflow调度系统

优化–架构和高可用集群

airflow中webserver和worker都可以启动多个,但是scheduler只能启动一个,这样造成了airflow的单点,目前已经有第三方开源方案来解决这个问题:

AirflowSchedulerFailoverController

地址:https://github.com/teamclairvoyant/airflow-scheduler-failover-controller

实现原理

TheAirflowSchedulerFailoverController(ASFC)是一种保证机制,保证至少有一共scheduler在运行。

首先需要启动ASFC在每个我们规划的用于运行scheduler的实例中。当我们启动多个ASFC时,有一个会是启用状态,其他是备用状态。ASFC之间通过心跳机制跟踪确认scheduler可用,如果心跳丢失,则启用备用ASFC。

活动状态的ASFC每10秒会检查scheduler的状态。如果没有找到scheduler,会尝试重启schduler的daemon进程。如果还是无法启动,则会在其他节点启用scheduler的进程。

安装

#gitclonehttps://github.com/teamclairvoyant/airflow-scheduler-failover-controller

#cdairflow-scheduler-failover-controller

#pipinstall-e.


报错

Collectingairflow>=1.7.0(fromscheduler-failover-controller==1.0.1)

Couldnotfindaversionthatsatisfiestherequirementairflow>=1.7.0(fromscheduler-failover-controller==1.0.1)(fromversions:0.6)

Nomatchingdistributionfoundforairflow>=1.7.0(fromscheduler-failover-controller==1.0.1)


查看

#visetup.py

install_requires=[

'airflow>=1.7.0',

'kazoo>=2.2.1',

'coverage>=4.2',

'eventlet>=0.9.7',

],


#piplist|grepairflow

apache-airflow1.10.0


需要将setup.py中airflow改为apache-airflow,安装之后启动

#scheduler_failover_controller-h


会报错

pkg_resources.ContextualVersionConflict:(Flask-Login0.2.11(/usr/lib64/python2.7/site-packages),Requirement.parse('Flask-Login<0>=0.3'),set(['flask-appbuilder']))


重装Flask-Login

#pipuninstallFlask-Login

#pipinstallFlask-Login


重装之后是Flask-Login0.4.1,满足要求,但是又会报错

apache-airflow1.10.0hasrequirementflask-login==0.2.11,butyou'llhaveflask-login0.4.1whichisincompatible.


AirflowSchedulerFailoverController和airflow1.10.0不兼容;需要选用兼容的版本

优化–Sensor的替代方案

Airflow中有一类Operator被称为Sensor,Sensor可以感应预先设定的条件是否满足(如:某个时间点是否达到、某条MySQL记录是否被更新、某个DAG是否完成),当满足条件后Sensor作业变为Success使得下游的作业能够执行。Sensor的功能很强大但却带来一个问题,假如我们有一个Sensor用于检测某个MySQL记录是否被更新,在Sensor作业启动后3个小时这个MySQL记录才被更新。于是我们的这个Sensor占用了一个Worker整整3小时,这显然是一个极大的浪费。

因此我们需要一个Sensor的替代方案,既能满足Sensor原来的功能,又能节省Worker资源。有一个办法是不使用Sensor,直接使用PythonOperator判断预先设定的条件是否满足,如果不满足直接raiseException,然后将这个作业的retry_delay(重试间隔时间)设为每次检测的间隔时间,retries(重试次数)设为最长检测时间除以retry_delay,即满足:最长检测时间=retries*retry_delay。这样既不会长时间占用Worker资源,又可以满足Sensor原来的功能。

优化–AirflowDAGCreationManagerPlugin

Airflow虽然具有强大的功能,但是配置DAG并不是简单的工作,也有一些较为繁琐的概念,对于业务人员来说可能略显复杂。因此,笔者编写了AirflowDAGCreationManagerPlugin(https://github.com/lattebank/airflow-dag-creation-manager-plugin)以提供一个Web界面来让业务人员可视化的编写及管理DAG。具体的安装及使用方法请查看插件的README。

插件的Web界面中可以直接所见即所得的编写DAG图。

插件中尽量简化了一些繁琐的诸如上文所述的作业开始调度时间等一系列的概念,并提供了一些在实际工作中常常会用到的一些额外的功能(如上文提到的跳过非最新DAGRun、当存在正在执行的DAGRun时跳过当前DAGRun等),以及版本控制和权限管理。如果大家在使用Airflow的过程中也有类似的问题,欢迎尝试使用AirflowDAGCreationManagerPlugin。


相关问题推荐

  • 回答 3

    换行。比如,print hello\nworld效果就是helloworld\n就是一个换行符。\是转义的意思,&#39;\n&#39;是换行,&#39;\t&#39;是tab,&#39;\\&#39;是,\ 是在编写程序中句子太长百,人为换行后加上\但print出来是一整行。...

  • 回答 42

    十种常见排序算法一般分为以下几种:(1)非线性时间比较类排序:a. 交换类排序(快速排序、冒泡排序)b. 插入类排序(简单插入排序、希尔排序)c. 选择类排序(简单选择排序、堆排序)d. 归并排序(二路归并排序、多路归并排序)(2)线性时间非比较类排序:...

  • 回答 70
    已采纳

    前景很好,中国正在产业升级,工业机器人和人工智能方面都会是强烈的热点,而且正好是在3~5年以后的时间。难度,肯定高,要求你有创新的思维能力,高数中的微积分、数列等等必须得非常好,软件编程(基础的应用最广泛的语言:C/C++)必须得很好,微电子(数字电...

  • 回答 28

    迭代器与生成器的区别:(1)生成器:生成器本质上就是一个函数,它记住了上一次返回时在函数体中的位置。对生成器函数的第二次(或第n次)调用,跳转到函数上一次挂起的位置。而且记录了程序执行的上下文。生成器不仅记住了它的数据状态,生成器还记住了程序...

  • 回答 9

    python中title( )属于python中字符串函数,返回’标题化‘的字符串,就是单词的开头为大写,其余为小写

  • 回答 6

    第一种解释:代码中的cnt是count的简称,一种电脑计算机内部的数学函数的名字,在Excel办公软件中计算参数列表中的数字项的个数;在数据库( sq| server或者access )中可以用来统计符合条件的数据条数。函数COUNT在计数时,将把数值型的数字计算进去;但是...

  • 回答 1

    head是方法,所以需要取小括号,即dataset.head()显示的则是前5行。data[:, :-1]和data[:, -1]。另外,如果想通过位置取数据,请使用iloc,即dataset.iloc[:, :-1]和dataset.iloc[:, -1],前者表示的是取所有行,但不包括最后一列的数据,结果是个DataFrame。...

  • Python入门简单吗2021-09-23 13:21
    回答 45

    挺简单的,其实课程内容没有我们想象的那么难、像我之前同学,完全零基础,培训了半年,直接出来就工作了,人家还在北京大公司上班,一个月15k,实力老厉害了

  • 回答 4

    Python针对众多的类型,提供了众多的内建函数来处理(内建是相对于导入import来说的,后面学习到包package时,将会介绍),这些内建函数功用在于其往往可对多种类型对象进行类似的操作,即多种类型对象的共有的操作;如果某种操作只对特殊的某一类对象可行,Pyt...

  • 回答 8

     相当于 ... 这里不是注释

  • 回答 4

    还有FIXME

  • 回答 3

    python的两个库:xlrd和xlutils。 xlrd打开excel,但是打开的excel并不能直接写入数据,需要用xlutils主要是复制一份出来,实现后续的写入功能。

  • 回答 8

    单行注释:Python中的单行注释一般是以#开头的,#右边的文字都会被当做解释说明的内容,不会被当做执行的程序。为了保证代码的可读性,一般会在#后面加一两个空格然后在编写解释内容。示例:#  单行注释print(hello world)注释可以放在代码上面也可以放在代...

  • 回答 2

    主要是按行读取,然后就是写出判断逻辑来勘测行是否为注视行,空行,编码行其他的:import linecachefile=open(&#39;3_2.txt&#39;,&#39;r&#39;)linecount=len(file.readlines())linecache.getline(&#39;3_2.txt&#39;,linecount)这样做的过程中发现一个问题,...

  • 回答 4

    或许是里面有没被注释的代码

  • 回答 26

    自学的话要看个人情况,可以先在B站找一下视频看一下

没有解决我的问题,去提问