标签归档:workflow

在Airflow中创建动态工作流程的正确方法

问题:在Airflow中创建动态工作流程的正确方法

问题

Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?我看过subdags,但看起来它只能与必须在Dag创建时确定的一组静态任务一起使用。

dag触发器会起作用吗?如果可以的话,请提供一个例子。

我有一个问题,在任务A完成之前,无法知道计算任务C所需的任务B的数量。每个任务B. *将花费数小时才能计算,并且无法合并。

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

想法#1

我不喜欢这种解决方案,因为我必须创建一个阻塞的ExternalTask​​Sensor,并且所有任务B. *将需要2-24小时才能完成。因此,我认为这不是可行的解决方案。当然有更简单的方法吗?还是不是为此设计了Airflow?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

编辑1:

到目前为止,这个问题还没有一个很好的答案。我已经与寻求解决方案的几个人联系。

Problem

Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

Would dag triggers work? And if so could you please provide an example.

I have an issue where it is impossible to know the number of task B’s that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idea #1

I don’t like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.


回答 0

这是我在没有任何子查询的情况下以类似要求执行的操作:

首先创建一个返回所需值的方法

def values_function():
     return values

下一个将动态生成作业的create方法:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

然后合并它们:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

Here is how I did it with a similar request without any subdags:

First create a method that returns whatever values you want

def values_function():
     return values

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

And then combine them:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

回答 1

我已经找到了一种根据先前任务的结果创建工作流的方法。
基本上,您想要做的是具有两个以下子子项:

  1. Xcom在首先执行的子数据中推送一个列表(或以后需要创建动态工作流的内容)(请参见test1.py def return_list()
  2. 将主要dag对象作为参数传递给第二个subdag
  3. 现在,如果您有主dag对象,则可以使用它来获取其任务实例的列表。从该任务实例列表中,您可以使用parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1])过滤掉当前运行的一项任务,您可能会在此处添加更多过滤器。
  4. 在该任务实例中,可以通过将dag_id指定为第一个subdag之一来使用xcom pull获得所需的值: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. 使用列表/值动态创建任务

现在,我已经在本地气流安装中对此进行了测试,并且工作正常。我不知道如果同时运行多个dag实例,xcom pull部分是否会有问题,但是您可能会使用唯一键或类似的东西来唯一地标识xcom想要的价值。可能可以将3.步骤优化为100%确保获得当前主dag的特定任务,但是对于我的使用来说,这执行得很好,我认为一个人只需要一个task_instance对象即可使用xcom_pull。

另外,在每次执行之前,我都会为第一个子数据清理xcoms,以确保不会意外获得任何错误的值。

我很难解释,所以我希望下面的代码能使所有内容变得清晰:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

和主要工作流程:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2

I have figured out a way to create workflows based on the result of previous tasks.
Basically what you want to do is have two subdags with the following:

  1. Xcom push a list (or what ever you need to create the dynamic workflow later) in the subdag that gets executed first (see test1.py def return_list())
  2. Pass the main dag object as a parameter to your second subdag
  3. Now if you have the main dag object, you can use it to get a list of its task instances. From that list of task instances, you can filter out a task of the current run by using parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]), one could probably add more filters here.
  4. With that task instance, you can use xcom pull to get the value you need by specifying the dag_id to the one of the first subdag: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use the list/value to create your tasks dynamically

Now I have tested this in my local airflow installation and it works fine. I don’t know if the xcom pull part will have any problems if there is more than one instance of the dag running at the same time, but then you’d probably either use a unique key or something like that to uniquely identify the xcom value you want. One could probably optimize the 3. step to be 100% sure to get a specific task of the current main dag, but for my use this performs well enough, I think one only needs one task_instance object to use xcom_pull.

Also I clean the xcoms for the first subdag before every execution, just to make sure that I don’t accidentally get any wrong value.

I’m pretty bad at explaining, so I hope the following code will make everything clear:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

and the main workflow:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2

回答 2

是的,这是可能的,我创建了一个示例DAG来演示这一点。

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

在运行DAG之前,请创建以下三个气流变量

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

您会看到DAG与此不同

跑完之后

您可以在我的有关在气流上创建动态工作流的文章中看到有关此DAG的更多信息。

Yes this is possible I’ve created an example DAG that demonstrates this.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Before you run the DAG create these three Airflow Variables

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

You’ll see that the DAG goes from this

To this after it’s ran

You can see more information on this DAG in my article on creating Dynamic Workflows On Airflow.


回答 3

OA:“ Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?”

简短的答案是没有。气流将在开始运行之前建立DAG气流。

就是说,我们得出了一个简单的结论,那就是我们没有这种需要。当您要并行处理某些工作时,应评估可用资源,而不是要处理的项目数。

我们这样做是这样的:我们动态生成固定数量的任务(例如10),这些任务将拆分工作。例如,如果我们需要处理100个文件,则每个任务将处理10个文件。我将在今天晚些时候发布代码。

更新资料

这是代码,抱歉。

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['myemail@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

代码说明:

在这里,我们有一个开始任务和一个结束任务(均为虚拟)。

然后从带有for循环的启动任务开始,使用相同的python可调用函数创建10个任务。这些任务在函数create_dynamic_task中创建。

对于每个可调用的python,我们将并行任务的总数和当前任务索引作为参数传递。

假设您要详细说明1000个项目:第一个任务将收到输入,说明应该详细说明10个块中的第一个块。它将把1000个项目分成10个块,并详细说明第一个。

OA: “Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A?”

Short answer is no. Airflow will build the DAG flow before starting to running it.

That said we came to a simple conclusion, that is we don’t have such needing. When you want to parallelize some work you should evaluate the resources you have available and not the number of items to process.

We did it like this: we dynamically generate a fixed number of tasks, say 10, that will split the job. For example if we need to process 100 files each task will process 10 of them. I will post the code later today.

Update

Here is the code, sorry for the delay.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['myemail@gmail.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Code explanation:

Here we have a single start task and a single end task (both dummy).

Then from the start task with the for loop we create 10 tasks with the same python callable. The tasks are created in the function create_dynamic_task.

To each python callable we pass as arguments the total number of parallel tasks and the current task index.

Suppose you have 1000 items to elaborate: the first task will receive in input that it should elaborate the first chunk out of 10 chunks. It will divide the 1000 items into 10 chunks and elaborate the first one.


回答 4

我认为您正在寻找的是动态创建DAG,几天前,经过一些搜索,我发现了此博客,我遇到了这种情况。

动态任务生成

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

设置DAG工作流程

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

这是将代码放在一起后DAG的样子

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

充满希望,这很有帮助,也会帮助其他人

What I think your are looking for is creating DAG dynamically I encountered this type of situation few days ago after some search I found this blog.

Dynamic Task Generation

start = DummyOperator(
    task_id='start',
    dag=dag
)

end = DummyOperator(
    task_id='end',
    dag=dag)

def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id = task_id,
        provide_context=True,
        #Eval is used since the callableFunction var is of type string
        #while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable = eval(callableFunction),
        op_kwargs = args,
        xcom_push = True,
        dag = dag,
    )
    return task

Setting the DAG workflow

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # Use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

This is how our DAG looks like after putting the code together

import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator

start = DummyOperator(
    task_id='start',
    dag=dag
)


def createDynamicETL(task_id, callableFunction, args):
    task = PythonOperator(
        task_id=task_id,
        provide_context=True,
        # Eval is used since the callableFunction var is of type string
        # while the python_callable argument for PythonOperators only receives objects of type callable not strings.
        python_callable=eval(callableFunction),
        op_kwargs=args,
        xcom_push=True,
        dag=dag,
    )
    return task


end = DummyOperator(
    task_id='end',
    dag=dag)

with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
    # use safe_load instead to load the YAML file
    configFile = yaml.safe_load(f)

    # Extract table names and fields to be processed
    tables = configFile['tables']

    # In this loop tasks are created for each table defined in the YAML file
    for table in tables:
        for table, fieldName in table.items():
            # In our example, first step in the workflow for each table is to get SQL data from db.
            # Remember task id is provided in order to exchange data among tasks generated in dynamic way.
            get_sql_data_task = createDynamicETL('{}-getSQLData'.format(table),
                                                 'getSQLData',
                                                 {'host': 'host', 'user': 'user', 'port': 'port', 'password': 'pass',
                                                  'dbname': configFile['dbname']})

            # Second step is upload data to s3
            upload_to_s3_task = createDynamicETL('{}-uploadDataToS3'.format(table),
                                                 'uploadDataToS3',
                                                 {'previous_task_id': '{}-getSQLData'.format(table),
                                                  'bucket_name': configFile['bucket_name'],
                                                  'prefix': configFile['prefix']})

            # This is where the magic lies. The idea is that
            # once tasks are generated they should linked with the
            # dummy operators generated in the start and end tasks. 
            # Then you are done!
            start >> get_sql_data_task
            get_sql_data_task >> upload_to_s3_task
            upload_to_s3_task >> end

It was very help full hope It will also help some one else


回答 5

我想我已经在https://github.com/mastak/airflow_multi_dagrun找到了一个更好的解决方案,它通过触发多个dagrun来使用DagRun的简单排队,类似于TriggerDagRuns。尽管我不得不修补一些细节以使其与最新的气流配合使用,但大多数功劳归于https://github.com/mastak

该解决方案使用一个触发多个DagRun自定义运算符

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

然后,您可以从PythonOperator中的callable函数提交多个dagrun,例如:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

我在https://github.com/flinz/airflow_multi_dagrun上使用代码创建了一个fork

I think I have found a nicer solution to this at https://github.com/mastak/airflow_multi_dagrun, which uses simple enqueuing of DagRuns by triggering multiple dagruns, similar to TriggerDagRuns. Most of the credits go to https://github.com/mastak, although I had to patch some details to make it work with the most recent airflow.

The solution uses a custom operator that triggers several DagRuns:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

You can then submit several dagruns from the callable function in your PythonOperator, for example:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

I created a fork with the code at https://github.com/flinz/airflow_multi_dagrun


回答 6

作业图不是在运行时生成的。而是由Airflow从dags文件夹中拾取图形时生成的图形。因此,实际上不可能在每次运行时都为该作业使用不同的图形。您可以配置作业以在加载时基于查询来构建图。此后的每次运行该图都将保持不变,这可能不是很有用。

您可以设计一个图形,使用分支运算符根据查询结果在每次运行中执行不同的任务。

我要做的是预配置一组任务,然后获取查询结果并将它们分布在各个任务中。无论如何,这可能会更好,因为如果您的查询返回很多结果,那么您可能就不想以很多并发任务来充斥调度程序。为了更加安全,我还使用了一个池来确保并发不会因意外的大查询而失控。

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################

The jobs graph is not generated at run time. Rather the graph is built when it is picked up by Airflow from your dags folder. Therefore it isn’t really going to be possible to have a different graph for the job every time it runs. You can configure a job to build a graph based on a query at load time. That graph will remain the same for every run after that, which is probably not very useful.

You can design a graph which executes different tasks on every run based on query results by using a Branch Operator.

What I’ve done is to pre-configure a set of tasks and then take the query results and distribute them across the tasks. This is probably better anyhow because if your query returns a lot of results, you probably don’t want to flood the scheduler with a lot of concurrent tasks anyhow. To be even safer, I also used a pool to ensure my concurrency doesn’t get out of hand with an unexpectedly large query.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['rotten@stackoverflow'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
get_orders_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################

回答 7

不明白是什么问题?

是一个标准的例子。现在,如果在功能subdag中替换为for i in range(5):for i in range(random.randint(0, 10)):则一切正常。现在,假设运算符“开始”将数据放入文件中,并且该函数将读取该数据,而不是随机值。然后,操作员的“开始”将影响任务的数量。

该问题仅会出现在UI的显示中,因为当输入子数据时,任务数将等于当前从文件/数据库/ XCom读取的最后一个任务。它会自动限制一次一次的几次发射。

Do not understand what the problem is?

Here is a standard example. Now if in function subdag replace for i in range(5): with for i in range(random.randint(0, 10)): then everything will work. Now imagine that operator ‘start’ puts the data in a file, and instead of a random value, the function will read this data. Then operator ‘start’ will affect the number of tasks.

The problem will only be in the display in the UI since when entering the subdag, the number of tasks will be equal to the last read from the file/database/XCom at the moment. Which automatically gives a restriction on several launches of one dag at one time.


回答 8

我发现这篇中篇文章与这个问题非常相似。但是,它充满了错别字,并且在我尝试实现它时不起作用。

我对以上内容的回答如下:

如果要动态创建任务,则必须通过迭代上游任务未创建或可以独立于该任务定义的对象来进行。我了解到,您无法像许多其他人以前指出的那样,将执行日期或其他气流变量传递给模板之外的内容(例如任务)。另请参阅这篇文章

I found this Medium post which is very similar to this question. However it is full of typos, and does not work when I tried implementing it.

My answer to the above is as follows:

If you are creating tasks dynamically you must do so by iterating over something which is not created by an upstream task, or can be defined independently of that task. I learned that you can’t pass execution dates or other airflow variables to something outside of a template (e.g., a task) as many others have pointed out before. See also this post.


Prefect 实现数据自动化的最简单方法

你好,世界!👋

我们为数据科学时代重建了数据工程

Prefect是一个新的工作流管理系统,专为现代基础设施而设计,由开源的Prefect Core工作流引擎提供支持。用户组织Tasks变成Flows,县管睡觉

请阅读docs;获取code;询问我们anything好了!

欢迎使用工作流

Prefect的Pythonic API应该会让新手感到熟悉。将函数标记为任务并相互调用以建立流

from prefect import task, Flow, Parameter


@task(log_stdout=True)
def say_hello(name):
    print("Hello, {}!".format(name))


with Flow("My First Flow") as flow:
    name = Parameter('name')
    say_hello(name)


flow.run(name='world') # "Hello, world!"
flow.run(name='Marvin') # "Hello, Marvin!"

有关更多详细信息,请参阅Core docs

UI和服务器

除了Prefect Cloud平台上,Prefect包括一个用于编排和管理流程的开源后端,主要包括Prefect ServerPrefect UI此本地服务器将流元数据存储在Postgres数据库中并公开GraphQL API

在第一次运行服务器之前,请运行prefect backend server若要为本地业务流程配置Prefect,请执行以下操作。请注意,服务器需要DockerDocker Compose去跑步

要启动服务器、UI和所有必需的基础架构,请运行:

prefect server start

一旦所有组件都在运行,您就可以通过访问http://localhost:8080

请注意,从服务器执行流需要至少运行一个Prefect代理:prefect agent local start

最后,要向服务器注册任何流,请调用flow.register()有关更多详细信息,请参阅orchestration docs

“.完全正确?”

来自拉丁语普雷菲克特斯,意思是“谁是负责人”,省长是监督一个领域并确保规则得到遵守的官员。同样,Prefect负责确保工作流正确执行

它也恰好是那本非常了不起的书的一位巡回研究员的名字,银河系漫游指南

集成

得益于Prefect不断扩大的任务库和深度的生态系统集成,构建数据应用程序比以往任何时候都更加容易

有什么东西不见了吗?打开一个feature requestcontribute a PR好了!Prefect旨在使添加新功能变得极其容易,无论您是在开放源码包之上构建,还是为您的团队维护内部任务库

任务库

Airtable

Asana

AWS

Azure

Azure ML

Databricks

DBT

Docker

Dremio

Dropbox

Email

Fivetran

GitHub

Google Cloud

Google Sheets

Great Expectations

Jira

Jupyter

Kubernetes

Monday

MySQL

PostgreSQL

Python

Pushbullet

Redis

RSS

SendGrid

Shell

Slack

Snowflake

SpaCy

SQLite

SQL Server

Trello

Twitter

部署和执行

Azure

AWS

Dask

Docker

Google Cloud

Kubernetes

Universal Deploy

资源

Prefect提供了各种资源来帮助您获得成功的结果

我们致力于确保一个积极的环境,所有的互动都由我们的Code of Conduct

文档

Prefect的文档–包括概念、教程和完整的API参考–总是可以在docs.prefect.io

有关编写文档的说明,请参阅development guide

松散社区

加入我们的Slack聊聊Prefect,提问,分享小贴士

博客

访问Prefect Blog有关Prefect团队的最新信息和见解

支持

Prefect提供各种社区和高级服务support options适用于Prefect Core和Prefect Cloud的用户

贡献

阅读有关Prefect的community或者一头扎进development guides有关贡献、文档、代码样式和测试的信息

安装

要求

Prefect需要Python 3.6+。如果您是Python新手,我们建议您安装Anaconda distribution

最新版本

要安装Prefect,请运行:

pip install prefect

或者,如果您更喜欢使用conda

conda install -c conda-forge prefect

pipenv

pipenv install --pre prefect

出血边缘

为了进行开发或只是尝试最新的功能,您可能希望直接从源代码安装Prefect

请注意,Prefect的主分支不保证与Prefect Cloud或本地服务器兼容

git clone https://github.com/PrefectHQ/prefect.git
pip install ./prefect

许可证

Prefect Core根据Apache Software License Version 2.0请注意,Prefect Core包括用于运行Prefect Server以及Prefect UI,它们本身是根据Prefect Community License

Airflow-Apache Airflow 以编程方式创作、调度和监控工作流的平台

Apache Airflow(或简称Airflow)是以编程方式创作、计划和监控工作流的平台

将工作流定义为代码后,它们将变得更具可维护性、可版本化、可测试性和协作性

使用Airflow将工作流创作为任务的有向无环图(DAG)。气流调度器在遵循指定依赖关系的同时对一组工作人员执行任务。丰富的命令行实用程序使在DAG上执行复杂操作变得轻而易举。丰富的用户界面使您可以轻松地可视化生产中运行的管道、监控进度以及在需要时排除问题故障

目录

项目重点

气流与主要为静电且变化缓慢的工作流配合使用效果最好。当DAG结构从一次运行到下一次运行类似时,它允许围绕工作单元和连续性保持清晰。其他类似的项目包括LuigiOozieAzkaban

Airflow通常用于处理数据,但认为任务理想上应该是幂等的(即,任务的结果将是相同的,并且不会在目标系统中创建重复数据),并且不应该将大量数据从一个任务传递到下一个任务(尽管任务可以使用Airflow的传递元数据Xcom feature)。对于大容量、数据密集型任务,最佳做法是将任务委托给专门从事该类型工作的外部服务

Airflow不是一种流解决方案,但它通常用于处理实时数据,成批地将数据从流中拉出

原则

  • 动态的:气流管道配置为代码(Python),允许动态生成管道。这允许编写动态实例化管道的代码
  • 可扩展的:轻松定义您自己的运算符、执行器和扩展库,使其符合适合您的环境的抽象级别
  • 优雅的:气流管道是精干而清晰的。将脚本参数化内置到气流的核心中,使用功能强大的金家模板引擎
  • 可扩展:Airflow采用模块化架构,并使用消息队列来编排任意数量的工作人员

要求

Apache Airflow使用以下各项进行测试:

主版本(Dev) 稳定版本(2.1.1)
python 3.6、3.7、3.8、3.9 3.6、3.7、3.8
库伯内斯 1.20、1.19、1.18 1.20、1.19、1.18
PostgreSQL 9.6、10、11、12、13 9.6、10、11、12、13
MySQL 5.7,8 5.7,8
SQLite 3.15.0+ 3.15.0+
MSSQL(试验性) 2017、2019年

注:MySQL5.x版本不能运行多个调度程序或有限制–请参阅Scheduler docs未测试/推荐使用MariaDB

注:SQLite用于气流测试。请不要在生产中使用它。我们建议使用SQLite的最新稳定版本进行本地开发

快速入门

访问Airflow官方网站文档(最新稳定版本)以获取以下方面的帮助installing Airflowgetting started,或通过更完整的tutorial

注意:如果您正在查找主分支(最新开发分支)的文档:您可以在s.apache.org/airflow-docs

有关气流改善建议(AIP)的更多信息,请访问Airflow Wiki

相关项目的文档,如提供程序包、Docker映像、Helm Chart,您可以在the documentation index

从PyPI安装

我们将Apache Airflow发布为apache-airflowPyPI格式的包。然而,安装它有时可能会很棘手,因为Airflow既是一个库,也是一个应用程序。库通常使它们的依赖项保持打开,而应用程序通常将它们钉住,但是我们不应该同时做这两件事。我们决定使我们的依赖项尽可能地开放(在setup.py),因此如果需要,用户可以安装不同版本的库。这就是说,时不时地,明摆着pip install apache-airflow将无法工作或将产生无法使用的气流安装

但是,为了实现可重复安装,我们还在孤儿中保留了一组“已知可以工作”的约束文件constraints-mainconstraints-2-0树枝。对于每个主要/次要Python版本,我们将那些“已知正在工作”的约束文件分开保存。从PyPI安装气流时,可以将它们用作约束文件。请注意,您必须在URL中指定正确的气流标签/版本/分支和Python版本

  1. 仅安装气流:

注:仅限pip目前官方支持安装

虽然他们使用其他工具(如poetrypip-tools,它们共享的工作流不同于pip-尤其是在约束与需求管理方面。通过以下方式安装Poetrypip-tools当前不支持

如果要使用这些工具安装气流,则应使用约束文件并将其转换为工具所需的适当格式和工作流

pip install apache-airflow==2.1.1 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.1/constraints-3.7.txt"
  1. 使用附加软件安装(例如Postgres、Google)
pip install apache-airflow[postgres,google]==2.1.1 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.1/constraints-3.7.txt"

有关安装提供程序包的信息,请查看providers

官方源代码

阿帕奇气流是一种Apache Software Foundation(ASF)项目,我们的官方源代码发布:

遵循ASF规则,发布的源包必须足以让用户构建和测试版本,前提是他们可以访问适当的平台和工具

便利套餐

还有其他安装和使用气流的方法。这些都是“方便”的方法–它们不是“官方发布”,正如ASF Release Policy,但是它们可以由不想自己构建软件的用户使用

这些是-按照人们安装气流的最常见方式的顺序:

  • PyPI releases使用标准安装风量的步骤pip工具
  • Docker Images要通过以下方式安装风量,请执行以下操作docker工具,在Kubernetes,Helm图表中使用它们,docker-composedocker swarm等。您可以在中阅读有关使用、自定义和扩展图像的更多信息Latest docs中的内部结构的详细信息。IMAGES.rst文档
  • Tags in GitHub要检索用于通过git生成官方源码包的git项目源代码,请执行以下操作:

所有这些工件都不是官方发布的,但它们都是使用官方发布的来源准备的。其中一些工件是“开发”或“预发布”的,它们按照ASF政策被清楚地标记为“开发”或“预发布”

用户界面

  • DAGS:环境中所有DAG的概述

  • :跨时间的DAG的树表示形式

  • 图表:可视化特定运行的DAG依赖项及其当前状态

  • 任务工期:一段时间内花在不同任务上的总时间

  • 甘特图:DAG的持续时间和重叠

  • 代码:查看DAG源代码的快捷方式

语义版本化

从Airflow 2.0.0开始,我们支持严格SemVer适用于已发布的所有软件包的方法

我们几乎没有达成一致的特定规则来定义不同包的版本控制细节:

  • 气流:SemVer规则仅适用于核心气流(不包括对供应商的任何更改)。更改气流依赖性版本的限制本身并不是突破性的更改
  • 气流供应器:SemVer规则仅适用于特定提供程序代码中的更改。软件包的SemVer主要版本和次要版本独立于气流版本。例如google 4.1.0amazon 3.0.3提供程序可以随同安装在一起Airflow 2.1.1如果提供程序和气流包之间存在交叉依赖限制,则它们在提供程序中显示为install_requires限制。我们的目标是保持供应商与之前发布的所有Airflow 2版本的向后兼容性,但有时会有突破性的更改,这可能会使一些或所有供应商指定最低气流版本。更改最低支持气流版本对提供商来说是一项突破性更改,因为安装新提供商可能会自动升级气流(这可能是升级提供商的不良副作用)
  • 气流舵图:SemVer规则仅适用于图表中的更改。该图表的SemVer主要版本和次要版本独立于气流版本。我们的目标是保持头盔图表与所有发布的Airflow 2版本的向后兼容性,但一些新功能可能只能从特定的Airlfow版本开始工作。但是,我们可能会将舵图限制为依赖于最小气流版本
  • Airflow API客户端:SemVer主要版本和次要版本跟随气流的主要版本和次要版本。气流的第一个主要或次要X.Y.0版本之后应始终跟随所有客户端的X.Y.0版本。然后,客户端可以独立于气流补丁版本,发布其自己的带有错误修复的补丁版本

版本生命周期

Apache Airflow版本生命周期:

版本 当前修补程序/次要修补程序 状态 第一个版本 有限的支持 停产/终止
2个 2.1.1 支持 2020年12月17日 2021年12月 待定
1.10 1.10.15 停产 2018年8月27日 2020年12月17日 2021年6月17日
1.9 1.9.0 停产 2018年1月03日 2018年8月27日 2018年8月27日
1.8 1.8.2 停产 2017年3月19日 2018年1月03日 2018年1月03日
1.7 1.7.1.2 停产 2016年3月28日 2017年3月19日 2017年3月19日

有限的支持版本将仅受安全和关键错误修复支持。EOL版本将不会得到任何修复,也不会获得支持。我们始终建议所有用户针对正在使用的任何主要版本运行最新的可用次要版本。我们高度建议在最早方便的时间并在停产日期之前升级到最新的Airflow主要版本

支持Python和Kubernetes版本

从AirFlow2.0开始,我们同意使用某些规则来支持Python和Kubernetes。它们基于Python和Kubernetes的官方发布时间表,在Python Developer’s GuideKubernetes version skew policy

  1. 当Python和Kubernetes版本达到EOL时,我们不再支持它们。我们在EOL日期之后立即在Main中取消对这些EOL版本的支持,当我们发布第一个新的次要版本(或如果没有新的次要版本,则为主要版本)时,它将被有效地删除。例如,对于Python 3.6,这意味着我们在2021年12月23日之后立即在Main中停止支持,并且在此之后发布的第一个主要或次要版本将不会包含它
  2. 支持Python/Kubernetes的“最旧”版本是默认版本。“默认”仅在使用DockerHub中提供的此默认版本和默认参考图像运行的配置项PR中的“冒烟测试”方面有意义。目前apache/airflow:latestapache/airflow:2.1.1图像都是Python 3.6图像,但是,2021年12月23日之后的Airflow版本的第一个次要/主要版本将成为Python 3.7图像
  3. 正式发布后,我们在Main中支持新版本的Python/Kubernetes,一旦我们让它们在我们的CI管道中工作(这可能不是立即进行的,因为大多数情况下依赖于Python的新版本),我们就会基于工作CI设置发布新的图像/Airflow中的支持

有关Python版本要求的其他说明

  • 以前的版本requires使用Python 3时至少使用Python 3.5.3

贡献

想要帮助构建Apache Airflow吗?请查看我们的contributing documentation

中描述了Apache Airflow的官方Docker(容器)图像IMAGES.rst

谁使用Apache Airflow?

400多个组织正在使用Apache Airflowin the wild

谁维护阿帕奇气流?

气流是community,但是core committers/maintainers负责审核和合并PR,并围绕新功能请求指导对话。如果您想成为一名维护员,请查看Apache Airflowcommitter requirements

我可以在演示文稿中使用Apache Airflow徽标吗?

是!一定要遵守阿帕奇基金会trademark policies和阿帕奇气流Brandbook有关最新徽标的信息,请参阅this repo和Apache软件基金会website

气流商品

如果你想要阿帕奇气流贴纸、t恤等,那就去看看吧。Redbubble Shop

链接

赞助商

Apache Airflow的CI基础设施由以下各方赞助: