使用子流程获取实时输出

问题:使用子流程获取实时输出

我正在尝试为命令行程序(svnadmin verify)编写包装脚本,该脚本将显示该操作的良好进度指示器。这要求我能够立即看到包装程序输出的每一行。

我认为我只是使用subprocess.Popen,use 来执行程序stdout=PIPE,然后读取其中的每一行并据此进行操作。但是,当我运行以下代码时,输​​出似乎被缓冲在某处,导致它出现在两个块中,第1到332行,然后是333到439行(输出的最后一行)

from subprocess import Popen, PIPE, STDOUT

p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE, 
        stderr = STDOUT, shell = True)
for line in p.stdout:
    print line.replace('\n', '')

在稍微了解一下子流程的文档之后,我发现了bufsize参数Popen,因此我尝试将bufsize设置为1(缓冲每行)和0(没有缓冲),但是两个值似乎都没有改变行的传递方式。

在这一点上,我开始精通吸管,因此编写了以下输出循环:

while True:
    try:
        print p.stdout.next().replace('\n', '')
    except StopIteration:
        break

但是得到了相同的结果。

是否可以获取使用子进程执行的程序的“实时”程序输出?Python中还有其他向前兼容的选项(不是exec*)吗?

I am trying to write a wrapper script for a command line program (svnadmin verify) that will display a nice progress indicator for the operation. This requires me to be able to see each line of output from the wrapped program as soon as it is output.

I figured that I’d just execute the program using subprocess.Popen, use stdout=PIPE, then read each line as it came in and act on it accordingly. However, when I ran the following code, the output appeared to be buffered somewhere, causing it to appear in two chunks, lines 1 through 332, then 333 through 439 (the last line of output)

from subprocess import Popen, PIPE, STDOUT

p = Popen('svnadmin verify /var/svn/repos/config', stdout = PIPE, 
        stderr = STDOUT, shell = True)
for line in p.stdout:
    print line.replace('\n', '')

After looking at the documentation on subprocess a little, I discovered the bufsize parameter to Popen, so I tried setting bufsize to 1 (buffer each line) and 0 (no buffer), but neither value seemed to change the way the lines were being delivered.

At this point I was starting to grasp for straws, so I wrote the following output loop:

while True:
    try:
        print p.stdout.next().replace('\n', '')
    except StopIteration:
        break

but got the same result.

Is it possible to get ‘realtime’ program output of a program executed using subprocess? Is there some other option in Python that is forward-compatible (not exec*)?


回答 0

我尝试了这个,由于某种原因,代码

for line in p.stdout:
  ...

积极地缓冲,变体

while True:
  line = p.stdout.readline()
  if not line: break
  ...

才不是。显然,这是一个已知的错误:http : //bugs.python.org/issue3907(从2018年8月29日开始,此问题已“关闭”)

I tried this, and for some reason while the code

for line in p.stdout:
  ...

buffers aggressively, the variant

while True:
  line = p.stdout.readline()
  if not line: break
  ...

does not. Apparently this is a known bug: http://bugs.python.org/issue3907 (The issue is now “Closed” as of Aug 29, 2018)


回答 1

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
    print line,
p.stdout.close()
p.wait()
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
    print line,
p.stdout.close()
p.wait()

回答 2

您可以将子流程的输出直接定向到流。简化示例:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)

You can direct the subprocess output to the streams directly. Simplified example:

subprocess.run(['ls'], stderr=sys.stderr, stdout=sys.stdout)

回答 3

您可以尝试以下方法:

import subprocess
import sys

process = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

while True:
    out = process.stdout.read(1)
    if out == '' and process.poll() != None:
        break
    if out != '':
        sys.stdout.write(out)
        sys.stdout.flush()

如果您使用readline而不是read,则在某些情况下不会打印输入消息。尝试使用需要内联输入的命令并亲自查看。

You can try this:

import subprocess
import sys

process = subprocess.Popen(
    cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

while True:
    out = process.stdout.read(1)
    if out == '' and process.poll() != None:
        break
    if out != '':
        sys.stdout.write(out)
        sys.stdout.flush()

If you use readline instead of read, there will be some cases where the input message is not printed. Try it with a command the requires an inline input and see for yourself.


回答 4

流子stdin和stdout与ASYNCIO在Python的博客文章凯文·麦卡锡显示了如何ASYNCIO做到这一点:

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The Streaming subprocess stdin and stdout with asyncio in Python blog post by Kevin McCarthy shows how to do it with asyncio:

import asyncio
from asyncio.subprocess import PIPE
from asyncio import create_subprocess_exec


async def _read_stream(stream, callback):
    while True:
        line = await stream.readline()
        if line:
            callback(line)
        else:
            break


async def run(command):
    process = await create_subprocess_exec(
        *command, stdout=PIPE, stderr=PIPE
    )

    await asyncio.wait(
        [
            _read_stream(
                process.stdout,
                lambda x: print(
                    "STDOUT: {}".format(x.decode("UTF8"))
                ),
            ),
            _read_stream(
                process.stderr,
                lambda x: print(
                    "STDERR: {}".format(x.decode("UTF8"))
                ),
            ),
        ]
    )

    await process.wait()


async def main():
    await run("docker build -t my-docker-image:latest .")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

回答 5

实时输出问题已解决:在捕获c程序的实时输出时,我在Python中确实遇到了类似的问题。我添加了“ fflush(stdout) ;” 在我的C代码中 它为我工作。这是代码片段

<< C程序>>

#include <stdio.h>
void main()
{
    int count = 1;
    while (1)
    {
        printf(" Count  %d\n", count++);
        fflush(stdout);
        sleep(1);
    }
}

<< Python程序>>

#!/usr/bin/python

import os, sys
import subprocess


procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

while procExe.poll() is None:
    line = procExe.stdout.readline()
    print("Print:" + line)

<<输出>>打印:计数1打印:计数2打印:计数3

希望能帮助到你。

〜塞拉姆

Real Time Output Issue resolved: I encountered a similar issue in Python, while capturing the real time output from C program. I added fflush(stdout); in my C code. It worked for me. Here is the code.

C program:

#include <stdio.h>
void main()
{
    int count = 1;
    while (1)
    {
        printf(" Count  %d\n", count++);
        fflush(stdout);
        sleep(1);
    }
}

Python program:

#!/usr/bin/python

import os, sys
import subprocess


procExe = subprocess.Popen(".//count", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

while procExe.poll() is None:
    line = procExe.stdout.readline()
    print("Print:" + line)

Output:

Print: Count  1
Print: Count  2
Print: Count  3

回答 6

我不久前遇到了同样的问题。我的解决方案read是放弃对该方法的迭代,即使您的子流程未完成执行,该方法也会立即返回,等等。

I ran into the same problem awhile back. My solution was to ditch iterating for the read method, which will return immediately even if your subprocess isn’t finished executing, etc.


回答 7

根据使用情况,您可能还想禁用子流程本身中的缓冲。

如果子进程将是Python进程,则可以在调用之前执行此操作:

os.environ["PYTHONUNBUFFERED"] = "1"

或者将其作为env参数传递给Popen

否则,如果您使用的是Linux / Unix,则可以使用该stdbuf工具。例如:

cmd = ["stdbuf", "-oL"] + cmd

另请参见这里stdbuf或其他选项。

(有关相同答案,请参见此处。)

Depending on the use case, you might also want to disable the buffering in the subprocess itself.

If the subprocess will be a Python process, you could do this before the call:

os.environ["PYTHONUNBUFFERED"] = "1"

Or alternatively pass this in the env argument to Popen.

Otherwise, if you are on Linux/Unix, you can use the stdbuf tool. E.g. like:

cmd = ["stdbuf", "-oL"] + cmd

See also here about stdbuf or other options.

(See also here for the same answer.)


回答 8

我使用此解决方案在子流程上获得实时输出。该过程完成后,该循环将立即停止,不再需要break语句或可能的无限循环。

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

while sub_process.poll() is None:
    out = sub_process.stdout.read(1)
    sys.stdout.write(out)
    sys.stdout.flush()

I used this solution to get realtime output on a subprocess. This loop will stop as soon as the process completes leaving out a need for a break statement or possible infinite loop.

sub_process = subprocess.Popen(my_command, close_fds=True, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

while sub_process.poll() is None:
    out = sub_process.stdout.read(1)
    sys.stdout.write(out)
    sys.stdout.flush()

回答 9

在此处找到此“即插即用”功能。像魅力一样工作!

import subprocess

def myrun(cmd):
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

Found this “plug-and-play” function here. Worked like a charm!

import subprocess

def myrun(cmd):
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

回答 10

您可以在子进程的输出中的每个字节上使用迭代器。这允许从子进程进行内联更新(以’\ r’结尾的行覆盖先前的输出行):

from subprocess import PIPE, Popen

command = ["my_command", "-my_arg"]

# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)


# read each byte of subprocess
while subprocess.poll() is None:
    for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
        c = c.decode('ascii')
        sys.stdout.write(c)
sys.stdout.flush()

if subprocess.returncode != 0:
    raise Exception("The subprocess did not terminate correctly.")

You may use an iterator over each byte in the output of the subprocess. This allows inline update (lines ending with ‘\r’ overwrite previous output line) from the subprocess:

from subprocess import PIPE, Popen

command = ["my_command", "-my_arg"]

# Open pipe to subprocess
subprocess = Popen(command, stdout=PIPE, stderr=PIPE)


# read each byte of subprocess
while subprocess.poll() is None:
    for c in iter(lambda: subprocess.stdout.read(1) if subprocess.poll() is None else {}, b''):
        c = c.decode('ascii')
        sys.stdout.write(c)
sys.stdout.flush()

if subprocess.returncode != 0:
    raise Exception("The subprocess did not terminate correctly.")

回答 11

在Python 3.x中,该过程可能会挂起,因为输出是字节数组而不是字符串。确保将其解码为字符串。

从Python 3.6开始,您可以使用Popen Constructor中的参数encoding来实现。完整的例子:

process = subprocess.Popen(
    'my_command',
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    shell=True,
    encoding='utf-8',
    errors='replace'
)

while True:
    realtime_output = process.stdout.readline()

    if realtime_output == '' and process.poll() is not None:
        break

    if realtime_output:
        print(realtime_output.strip(), flush=True)

请注意,此代码重定向 stderrstdout处理输出错误

In Python 3.x the process might hang because the output is a byte array instead of a string. Make sure you decode it into a string.

Starting from Python 3.6 you can do it using the parameter encoding in Popen Constructor. The complete example:

process = subprocess.Popen(
    'my_command',
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    shell=True,
    encoding='utf-8',
    errors='replace'
)

while True:
    realtime_output = process.stdout.readline()

    if realtime_output == '' and process.poll() is not None:
        break

    if realtime_output:
        print(realtime_output.strip(), flush=True)

Note that this code redirects stderr to stdout and handles output errors.


回答 12

将pexpect [ http://www.noah.org/wiki/Pexpect ]与非阻塞的阅读行一起使用将解决此问题。这是由于管道是缓冲的,因此您的应用程序的输出将被管道缓冲,因此,直到缓冲填满或进程终止,您才能获得该输出。

Using pexpect with non-blocking readlines will resolve this problem. It stems from the fact that pipes are buffered, and so your app’s output is getting buffered by the pipe, therefore you can’t get to that output until the buffer fills or the process dies.


回答 13

完整的解决方案:

import contextlib
import subprocess

# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
    stream = getattr(proc, stream)
    with contextlib.closing(stream):
        while True:
            out = []
            last = stream.read(1)
            # Don't loop forever
            if last == '' and proc.poll() is not None:
                break
            while last not in newlines:
                # Don't loop forever
                if last == '' and proc.poll() is not None:
                    break
                out.append(last)
                last = stream.read(1)
            out = ''.join(out)
            yield out

def example():
    cmd = ['ls', '-l', '/']
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        # Make all end-of-lines '\n'
        universal_newlines=True,
    )
    for line in unbuffered(proc):
        print line

example()

Complete solution:

import contextlib
import subprocess

# Unix, Windows and old Macintosh end-of-line
newlines = ['\n', '\r\n', '\r']
def unbuffered(proc, stream='stdout'):
    stream = getattr(proc, stream)
    with contextlib.closing(stream):
        while True:
            out = []
            last = stream.read(1)
            # Don't loop forever
            if last == '' and proc.poll() is not None:
                break
            while last not in newlines:
                # Don't loop forever
                if last == '' and proc.poll() is not None:
                    break
                out.append(last)
                last = stream.read(1)
            out = ''.join(out)
            yield out

def example():
    cmd = ['ls', '-l', '/']
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        # Make all end-of-lines '\n'
        universal_newlines=True,
    )
    for line in unbuffered(proc):
        print line

example()

回答 14

这是我经常使用的基本骨架。它使实现超时变得容易,并且能够处理不可避免的挂起过程。

import subprocess
import threading
import Queue

def t_read_stdout(process, queue):
    """Read from stdout"""

    for output in iter(process.stdout.readline, b''):
        queue.put(output)

    return

process = subprocess.Popen(['dir'],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           bufsize=1,
                           cwd='C:\\',
                           shell=True)

queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()

while process.poll() is None or not queue.empty():
    try:
        output = queue.get(timeout=.5)

    except Queue.Empty:
        continue

    if not output:
        continue

    print(output),

t_stdout.join()

This is the basic skeleton that I always use for this. It makes it easy to implement timeouts and is able to deal with inevitable hanging processes.

import subprocess
import threading
import Queue

def t_read_stdout(process, queue):
    """Read from stdout"""

    for output in iter(process.stdout.readline, b''):
        queue.put(output)

    return

process = subprocess.Popen(['dir'],
                           stdout=subprocess.PIPE,
                           stderr=subprocess.STDOUT,
                           bufsize=1,
                           cwd='C:\\',
                           shell=True)

queue = Queue.Queue()
t_stdout = threading.Thread(target=t_read_stdout, args=(process, queue))
t_stdout.daemon = True
t_stdout.start()

while process.poll() is None or not queue.empty():
    try:
        output = queue.get(timeout=.5)

    except Queue.Empty:
        continue

    if not output:
        continue

    print(output),

t_stdout.join()

回答 15

(此解决方案已通过Python 2.7.15进行了测试
),每行读/写后只需要sys.stdout.flush():

while proc.poll() is None:
    line = proc.stdout.readline()
    sys.stdout.write(line)
    # or print(line.strip()), you still need to force the flush.
    sys.stdout.flush()

(This solution has been tested with Python 2.7.15)
You just need to sys.stdout.flush() after each line read/write:

while proc.poll() is None:
    line = proc.stdout.readline()
    sys.stdout.write(line)
    # or print(line.strip()), you still need to force the flush.
    sys.stdout.flush()

回答 16

很少有建议使用python 3.x或pthon 2.x的答案,下面的代码对两者都适用。

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
    stdout = []
    while True:
        line = p.stdout.readline()
        if not isinstance(line, (str)):
            line = line.decode('utf-8')
        stdout.append(line)
        print (line)
        if (line == '' and p.poll() != None):
            break

Few answers suggesting python 3.x or pthon 2.x , Below code will work for both.

 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,)
    stdout = []
    while True:
        line = p.stdout.readline()
        if not isinstance(line, (str)):
            line = line.decode('utf-8')
        stdout.append(line)
        print (line)
        if (line == '' and p.poll() != None):
            break