标签归档:scheduler

如何从Python异步运行外部命令?

问题:如何从Python异步运行外部命令?

我需要从Python脚本异步运行Shell命令。我的意思是,我希望我的Python脚本能够在外部命令关闭并继续执行所需操作的同时继续运行。

我读了这篇文章:

在Python中调用外部命令

然后我os.system()去做了一些测试,如果我&在命令末尾使用它,看起来就可以完成这项工作,这样我就不必等待它返回。我想知道的是,这是否是完成此任务的正确方法?我试过了,commands.call()但是对我来说不起作用,因为它会阻塞外部命令。

请告诉我是否os.system()建议这样做,或者我应该尝试其他方法。

I need to run a shell command asynchronously from a Python script. By this I mean that I want my Python script to continue running while the external command goes off and does whatever it needs to do.

I read this post:

Calling an external command in Python

I then went off and did some testing, and it looks like os.system() will do the job provided that I use & at the end of the command so that I don’t have to wait for it to return. What I am wondering is if this is the proper way to accomplish such a thing? I tried commands.call() but it will not work for me because it blocks on the external command.

Please let me know if using os.system() for this is advisable or if I should try some other route.


回答 0

subprocess.Popen正是您想要的。

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(编辑以完成评论的答案)

Popen实例可以执行其他各种操作,例如可以poll()查看它是否仍在运行,还可以communicate()使用它在stdin上发送数据,并等待其终止。

subprocess.Popen does exactly what you want.

from subprocess import Popen
p = Popen(['watch', 'ls']) # something long running
# ... do other stuff while subprocess is running
p.terminate()

(Edit to complete the answer from comments)

The Popen instance can do various other things like you can poll() it to see if it is still running, and you can communicate() with it to send it data on stdin, and wait for it to terminate.


回答 1

如果要并行运行许多进程,然后在它们产生结果时进行处理,则可以使用轮询,如下所示:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

控制流有些混乱,因为我正试图将其缩小—您可以根据自己的口味进行重构。:-)

这具有先为早期处理请求提供服务的优势。如果您调用communicate第一个正在运行的进程,而事实证明运行时间最长,则其他正在运行的进程在可能已经处理完它们的结果时将一直闲置在那里。

If you want to run many processes in parallel and then handle them when they yield results, you can use polling like in the following:

from subprocess import Popen, PIPE
import time

running_procs = [
    Popen(['/usr/bin/my_cmd', '-i %s' % path], stdout=PIPE, stderr=PIPE)
    for path in '/tmp/file0 /tmp/file1 /tmp/file2'.split()]

while running_procs:
    for proc in running_procs:
        retcode = proc.poll()
        if retcode is not None: # Process finished.
            running_procs.remove(proc)
            break
        else: # No process is done, wait a bit and check again.
            time.sleep(.1)
            continue

    # Here, `proc` has finished with return code `retcode`
    if retcode != 0:
        """Error handling."""
    handle_results(proc.stdout)

The control flow there is a little bit convoluted because I’m trying to make it small — you can refactor to your taste. :-)

This has the advantage of servicing the early-finishing requests first. If you call communicate on the first running process and that turns out to run the longest, the other running processes will have been sitting there idle when you could have been handling their results.


回答 2

我想知道的是[os.system()]是否是完成此类任务的正确方法?

os.system()不是正确的方法。这就是每个人都说要使用的原因subprocess

有关更多信息,请阅读http://docs.python.org/library/os.html#os.system

子流程模块提供了更强大的功能来生成新流程并检索其结果。使用该模块优于使用此功能。使用子流程模块。尤其要检查“子过程模块”部分的“替换旧功能”。

What I am wondering is if this [os.system()] is the proper way to accomplish such a thing?

No. os.system() is not the proper way. That’s why everyone says to use subprocess.

For more information, read http://docs.python.org/library/os.html#os.system

The subprocess module provides more powerful facilities for spawning new processes and retrieving their results; using that module is preferable to using this function. Use the subprocess module. Check especially the Replacing Older Functions with the subprocess Module section.


回答 3

我使用asyncproc模块取得了成功,该模块很好地处理了流程的输出。例如:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

I’ve had good success with the asyncproc module, which deals nicely with the output from the processes. For example:

import os
from asynproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll is not None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

回答 4

pexpect与非阻塞阅读行结合使用是另一种方法。Pexpect解决了死锁问题,使您可以轻松地在后台运行进程,并在进程吐出预定义的字符串时提供简便的方法来进行回调,并且通常使与进程的交互更加容易。

Using pexpect with non-blocking readlines is another way to do this. Pexpect solves the deadlock problems, allows you to easily run the processes in the background, and gives easy ways to have callbacks when your process spits out predefined strings, and generally makes interacting with the process much easier.


回答 5

考虑到“我不必等待它返回”,最简单的解决方案之一就是:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

但是…据我所读,这不是“ subprocess.CREATE_NEW_CONSOLE标记完成此事的正确方法”,因为标志会产生安全风险。

这里发生的关键事情是使用subprocess.CREATE_NEW_CONSOLE来创建新的控制台,并.pid(返回进程ID,以便以后可以检查程序是否需要),以免等待程序完成其工作。

Considering “I don’t have to wait for it to return”, one of the easiest solutions will be this:

subprocess.Popen( \
    [path_to_executable, arg1, arg2, ... argN],
    creationflags = subprocess.CREATE_NEW_CONSOLE,
).pid

But… From what I read this is not “the proper way to accomplish such a thing” because of security risks created by subprocess.CREATE_NEW_CONSOLE flag.

The key things that happen here is use of subprocess.CREATE_NEW_CONSOLE to create new console and .pid (returns process ID so that you could check program later on if you want to) so that not to wait for program to finish its job.


回答 6

我在使用Python中的s3270脚本软件尝试连接到3270终端时遇到相同的问题。现在,我在这里找到的Process子类解决了这个问题:

http://code.activestate.com/recipes/440554/

这是从文件中获取的示例:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

I have the same problem trying to connect to an 3270 terminal using the s3270 scripting software in Python. Now I’m solving the problem with an subclass of Process that I found here:

http://code.activestate.com/recipes/440554/

And here is the sample taken from file:

def recv_some(p, t=.1, e=1, tr=5, stderr=0):
    if tr < 1:
        tr = 1
    x = time.time()+t
    y = []
    r = ''
    pr = p.recv
    if stderr:
        pr = p.recv_err
    while time.time() < x or r:
        r = pr()
        if r is None:
            if e:
                raise Exception(message)
            else:
                break
        elif r:
            y.append(r)
        else:
            time.sleep(max((x-time.time())/tr, 0))
    return ''.join(y)

def send_all(p, data):
    while len(data):
        sent = p.send(data)
        if sent is None:
            raise Exception(message)
        data = buffer(data, sent)

if __name__ == '__main__':
    if sys.platform == 'win32':
        shell, commands, tail = ('cmd', ('dir /w', 'echo HELLO WORLD'), '\r\n')
    else:
        shell, commands, tail = ('sh', ('ls', 'echo HELLO WORLD'), '\n')

    a = Popen(shell, stdin=PIPE, stdout=PIPE)
    print recv_some(a),
    for cmd in commands:
        send_all(a, cmd + tail)
        print recv_some(a),
    send_all(a, 'exit' + tail)
    print recv_some(a, e=0)
    a.wait()

回答 7

接受的答案旧。

我在这里找到了一个更好的现代答案:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

并进行了一些更改:

  1. 使它在Windows上工作
  2. 使它与多个命令一起工作
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

示例命令不可能在您的系统上完美地工作,也不可能处理奇怪的错误,但是此代码确实演示了一种使用asyncio运行多个子进程并输出输出的方法。

The accepted answer is very old.

I found a better modern answer here:

https://kevinmccarthy.org/2016/07/25/streaming-subprocess-stdin-and-stdout-with-asyncio-in-python/

and made some changes:

  1. make it work on windows
  2. make it work with multiple commands
import sys
import asyncio

if sys.platform == "win32":
    asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())


async def _read_stream(stream, cb):
    while True:
        line = await stream.readline()
        if line:
            cb(line)
        else:
            break


async def _stream_subprocess(cmd, stdout_cb, stderr_cb):
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )

        await asyncio.wait(
            [
                _read_stream(process.stdout, stdout_cb),
                _read_stream(process.stderr, stderr_cb),
            ]
        )
        rc = await process.wait()
        return process.pid, rc
    except OSError as e:
        # the program will hang if we let any exception propagate
        return e


def execute(*aws):
    """ run the given coroutines in an asyncio loop
    returns a list containing the values returned from each coroutine.
    """
    loop = asyncio.get_event_loop()
    rc = loop.run_until_complete(asyncio.gather(*aws))
    loop.close()
    return rc


def printer(label):
    def pr(*args, **kw):
        print(label, *args, **kw)

    return pr


def name_it(start=0, template="s{}"):
    """a simple generator for task names
    """
    while True:
        yield template.format(start)
        start += 1


def runners(cmds):
    """
    cmds is a list of commands to excecute as subprocesses
    each item is a list appropriate for use by subprocess.call
    """
    next_name = name_it().__next__
    for cmd in cmds:
        name = next_name()
        out = printer(f"{name}.stdout")
        err = printer(f"{name}.stderr")
        yield _stream_subprocess(cmd, out, err)


if __name__ == "__main__":
    cmds = (
        [
            "sh",
            "-c",
            """echo "$SHELL"-stdout && sleep 1 && echo stderr 1>&2 && sleep 1 && echo done""",
        ],
        [
            "bash",
            "-c",
            "echo 'hello, Dave.' && sleep 1 && echo dave_err 1>&2 && sleep 1 && echo done",
        ],
        [sys.executable, "-c", 'print("hello from python");import sys;sys.exit(2)'],
    )

    print(execute(*runners(cmds)))

It is unlikely that the example commands will work perfectly on your system, and it doesn’t handle weird errors, but this code does demonstrate one way to run multiple subprocesses using asyncio and stream the output.


回答 8

这里有几个答案,但是没有一个满足我的以下要求:

  1. 我不想等待命令完成或用子进程输出污染我的终端。

  2. 我想使用重定向运行bash脚本。

  3. 我想在我的bash脚本中支持管道(例如find ... | tar ...)。

满足以上要求的唯一组合是:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)

There are several answers here but none of them satisfied my below requirements:

  1. I don’t want to wait for command to finish or pollute my terminal with subprocess outputs.

  2. I want to run bash script with redirects.

  3. I want to support piping within my bash script (for example find ... | tar ...).

The only combination that satiesfies above requirements is:

subprocess.Popen(['./my_script.sh "arg1" > "redirect/path/to"'],
                 stdout=subprocess.PIPE, 
                 stderr=subprocess.PIPE,
                 shell=True)

回答 9

Python 3子过程示例在“等待命令异步终止”下对此进行了介绍:

import asyncio

proc = await asyncio.create_subprocess_exec(
    'ls','-lha',
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

该过程完成后将立即开始运行await asyncio.create_subprocess_exec(...)。如果在您调用时还没有完成await proc.communicate(),它将在那儿等待,以便为您提供输出状态。如果完成,proc.communicate()将立即返回。

要点类似于Terrels的答案,但我认为Terrels的答案似乎使事情复杂化了。

请参阅asyncio.create_subprocess_exec以获取更多信息。

This is covered by Python 3 Subprocess Examples under “Wait for command to terminate asynchronously”:

import asyncio

proc = await asyncio.create_subprocess_exec(
    'ls','-lha',
    stdout=asyncio.subprocess.PIPE,
    stderr=asyncio.subprocess.PIPE)

# do something else while ls is working

# if proc takes very long to complete, the CPUs are free to use cycles for 
# other processes
stdout, stderr = await proc.communicate()

The process will start running as soon as the await asyncio.create_subprocess_exec(...) has completed. If it hasn’t finished by the time you call await proc.communicate(), it will wait there in order to give you your output status. If it has finished, proc.communicate() will return immediately.

The gist here is similar to Terrels answer but I think Terrels answer appears to overcomplicate things.

See asyncio.create_subprocess_exec for more information.


Airflow-Apache Airflow 以编程方式创作、调度和监控工作流的平台

Apache Airflow(或简称Airflow)是以编程方式创作、计划和监控工作流的平台

将工作流定义为代码后,它们将变得更具可维护性、可版本化、可测试性和协作性

使用Airflow将工作流创作为任务的有向无环图(DAG)。气流调度器在遵循指定依赖关系的同时对一组工作人员执行任务。丰富的命令行实用程序使在DAG上执行复杂操作变得轻而易举。丰富的用户界面使您可以轻松地可视化生产中运行的管道、监控进度以及在需要时排除问题故障

目录

项目重点

气流与主要为静电且变化缓慢的工作流配合使用效果最好。当DAG结构从一次运行到下一次运行类似时,它允许围绕工作单元和连续性保持清晰。其他类似的项目包括LuigiOozieAzkaban

Airflow通常用于处理数据,但认为任务理想上应该是幂等的(即,任务的结果将是相同的,并且不会在目标系统中创建重复数据),并且不应该将大量数据从一个任务传递到下一个任务(尽管任务可以使用Airflow的传递元数据Xcom feature)。对于大容量、数据密集型任务,最佳做法是将任务委托给专门从事该类型工作的外部服务

Airflow不是一种流解决方案,但它通常用于处理实时数据,成批地将数据从流中拉出

原则

  • 动态的:气流管道配置为代码(Python),允许动态生成管道。这允许编写动态实例化管道的代码
  • 可扩展的:轻松定义您自己的运算符、执行器和扩展库,使其符合适合您的环境的抽象级别
  • 优雅的:气流管道是精干而清晰的。将脚本参数化内置到气流的核心中,使用功能强大的金家模板引擎
  • 可扩展:Airflow采用模块化架构,并使用消息队列来编排任意数量的工作人员

要求

Apache Airflow使用以下各项进行测试:

主版本(Dev) 稳定版本(2.1.1)
python 3.6、3.7、3.8、3.9 3.6、3.7、3.8
库伯内斯 1.20、1.19、1.18 1.20、1.19、1.18
PostgreSQL 9.6、10、11、12、13 9.6、10、11、12、13
MySQL 5.7,8 5.7,8
SQLite 3.15.0+ 3.15.0+
MSSQL(试验性) 2017、2019年

注:MySQL5.x版本不能运行多个调度程序或有限制–请参阅Scheduler docs未测试/推荐使用MariaDB

注:SQLite用于气流测试。请不要在生产中使用它。我们建议使用SQLite的最新稳定版本进行本地开发

快速入门

访问Airflow官方网站文档(最新稳定版本)以获取以下方面的帮助installing Airflowgetting started,或通过更完整的tutorial

注意:如果您正在查找主分支(最新开发分支)的文档:您可以在s.apache.org/airflow-docs

有关气流改善建议(AIP)的更多信息,请访问Airflow Wiki

相关项目的文档,如提供程序包、Docker映像、Helm Chart,您可以在the documentation index

从PyPI安装

我们将Apache Airflow发布为apache-airflowPyPI格式的包。然而,安装它有时可能会很棘手,因为Airflow既是一个库,也是一个应用程序。库通常使它们的依赖项保持打开,而应用程序通常将它们钉住,但是我们不应该同时做这两件事。我们决定使我们的依赖项尽可能地开放(在setup.py),因此如果需要,用户可以安装不同版本的库。这就是说,时不时地,明摆着pip install apache-airflow将无法工作或将产生无法使用的气流安装

但是,为了实现可重复安装,我们还在孤儿中保留了一组“已知可以工作”的约束文件constraints-mainconstraints-2-0树枝。对于每个主要/次要Python版本,我们将那些“已知正在工作”的约束文件分开保存。从PyPI安装气流时,可以将它们用作约束文件。请注意,您必须在URL中指定正确的气流标签/版本/分支和Python版本

  1. 仅安装气流:

注:仅限pip目前官方支持安装

虽然他们使用其他工具(如poetrypip-tools,它们共享的工作流不同于pip-尤其是在约束与需求管理方面。通过以下方式安装Poetrypip-tools当前不支持

如果要使用这些工具安装气流,则应使用约束文件并将其转换为工具所需的适当格式和工作流

pip install apache-airflow==2.1.1 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.1/constraints-3.7.txt"
  1. 使用附加软件安装(例如Postgres、Google)
pip install apache-airflow[postgres,google]==2.1.1 \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.1/constraints-3.7.txt"

有关安装提供程序包的信息,请查看providers

官方源代码

阿帕奇气流是一种Apache Software Foundation(ASF)项目,我们的官方源代码发布:

遵循ASF规则,发布的源包必须足以让用户构建和测试版本,前提是他们可以访问适当的平台和工具

便利套餐

还有其他安装和使用气流的方法。这些都是“方便”的方法–它们不是“官方发布”,正如ASF Release Policy,但是它们可以由不想自己构建软件的用户使用

这些是-按照人们安装气流的最常见方式的顺序:

  • PyPI releases使用标准安装风量的步骤pip工具
  • Docker Images要通过以下方式安装风量,请执行以下操作docker工具,在Kubernetes,Helm图表中使用它们,docker-composedocker swarm等。您可以在中阅读有关使用、自定义和扩展图像的更多信息Latest docs中的内部结构的详细信息。IMAGES.rst文档
  • Tags in GitHub要检索用于通过git生成官方源码包的git项目源代码,请执行以下操作:

所有这些工件都不是官方发布的,但它们都是使用官方发布的来源准备的。其中一些工件是“开发”或“预发布”的,它们按照ASF政策被清楚地标记为“开发”或“预发布”

用户界面

  • DAGS:环境中所有DAG的概述

  • :跨时间的DAG的树表示形式

  • 图表:可视化特定运行的DAG依赖项及其当前状态

  • 任务工期:一段时间内花在不同任务上的总时间

  • 甘特图:DAG的持续时间和重叠

  • 代码:查看DAG源代码的快捷方式

语义版本化

从Airflow 2.0.0开始,我们支持严格SemVer适用于已发布的所有软件包的方法

我们几乎没有达成一致的特定规则来定义不同包的版本控制细节:

  • 气流:SemVer规则仅适用于核心气流(不包括对供应商的任何更改)。更改气流依赖性版本的限制本身并不是突破性的更改
  • 气流供应器:SemVer规则仅适用于特定提供程序代码中的更改。软件包的SemVer主要版本和次要版本独立于气流版本。例如google 4.1.0amazon 3.0.3提供程序可以随同安装在一起Airflow 2.1.1如果提供程序和气流包之间存在交叉依赖限制,则它们在提供程序中显示为install_requires限制。我们的目标是保持供应商与之前发布的所有Airflow 2版本的向后兼容性,但有时会有突破性的更改,这可能会使一些或所有供应商指定最低气流版本。更改最低支持气流版本对提供商来说是一项突破性更改,因为安装新提供商可能会自动升级气流(这可能是升级提供商的不良副作用)
  • 气流舵图:SemVer规则仅适用于图表中的更改。该图表的SemVer主要版本和次要版本独立于气流版本。我们的目标是保持头盔图表与所有发布的Airflow 2版本的向后兼容性,但一些新功能可能只能从特定的Airlfow版本开始工作。但是,我们可能会将舵图限制为依赖于最小气流版本
  • Airflow API客户端:SemVer主要版本和次要版本跟随气流的主要版本和次要版本。气流的第一个主要或次要X.Y.0版本之后应始终跟随所有客户端的X.Y.0版本。然后,客户端可以独立于气流补丁版本,发布其自己的带有错误修复的补丁版本

版本生命周期

Apache Airflow版本生命周期:

版本 当前修补程序/次要修补程序 状态 第一个版本 有限的支持 停产/终止
2个 2.1.1 支持 2020年12月17日 2021年12月 待定
1.10 1.10.15 停产 2018年8月27日 2020年12月17日 2021年6月17日
1.9 1.9.0 停产 2018年1月03日 2018年8月27日 2018年8月27日
1.8 1.8.2 停产 2017年3月19日 2018年1月03日 2018年1月03日
1.7 1.7.1.2 停产 2016年3月28日 2017年3月19日 2017年3月19日

有限的支持版本将仅受安全和关键错误修复支持。EOL版本将不会得到任何修复,也不会获得支持。我们始终建议所有用户针对正在使用的任何主要版本运行最新的可用次要版本。我们高度建议在最早方便的时间并在停产日期之前升级到最新的Airflow主要版本

支持Python和Kubernetes版本

从AirFlow2.0开始,我们同意使用某些规则来支持Python和Kubernetes。它们基于Python和Kubernetes的官方发布时间表,在Python Developer’s GuideKubernetes version skew policy

  1. 当Python和Kubernetes版本达到EOL时,我们不再支持它们。我们在EOL日期之后立即在Main中取消对这些EOL版本的支持,当我们发布第一个新的次要版本(或如果没有新的次要版本,则为主要版本)时,它将被有效地删除。例如,对于Python 3.6,这意味着我们在2021年12月23日之后立即在Main中停止支持,并且在此之后发布的第一个主要或次要版本将不会包含它
  2. 支持Python/Kubernetes的“最旧”版本是默认版本。“默认”仅在使用DockerHub中提供的此默认版本和默认参考图像运行的配置项PR中的“冒烟测试”方面有意义。目前apache/airflow:latestapache/airflow:2.1.1图像都是Python 3.6图像,但是,2021年12月23日之后的Airflow版本的第一个次要/主要版本将成为Python 3.7图像
  3. 正式发布后,我们在Main中支持新版本的Python/Kubernetes,一旦我们让它们在我们的CI管道中工作(这可能不是立即进行的,因为大多数情况下依赖于Python的新版本),我们就会基于工作CI设置发布新的图像/Airflow中的支持

有关Python版本要求的其他说明

  • 以前的版本requires使用Python 3时至少使用Python 3.5.3

贡献

想要帮助构建Apache Airflow吗?请查看我们的contributing documentation

中描述了Apache Airflow的官方Docker(容器)图像IMAGES.rst

谁使用Apache Airflow?

400多个组织正在使用Apache Airflowin the wild

谁维护阿帕奇气流?

气流是community,但是core committers/maintainers负责审核和合并PR,并围绕新功能请求指导对话。如果您想成为一名维护员,请查看Apache Airflowcommitter requirements

我可以在演示文稿中使用Apache Airflow徽标吗?

是!一定要遵守阿帕奇基金会trademark policies和阿帕奇气流Brandbook有关最新徽标的信息,请参阅this repo和Apache软件基金会website

气流商品

如果你想要阿帕奇气流贴纸、t恤等,那就去看看吧。Redbubble Shop

链接

赞助商

Apache Airflow的CI基础设施由以下各方赞助: