问题:在进程运行时不断打印子进程输出

为了从我的Python脚本启动程序,我使用以下方法:

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

因此,当我启动像这样的过程时Process.execute("mvn clean install"),我的程序将等待直到该过程完成为止,然后我才能获得程序的完整输出。如果我正在运行需要一段时间才能完成的过程,这将很烦人。

我可以让我的程序通过在循环完成之前轮询过程输出来逐行写过程输出吗?

** [编辑]对不起,发布此问题之前,我搜索得并不好。线程实际上是关键。在此处找到了一个示例,该示例演示了如何执行此操作:** 从线程中获取Python Subprocess.Popen

To launch programs from my Python-scripts, I’m using the following method:

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

So when i launch a process like Process.execute("mvn clean install"), my program waits until the process is finished, and only then i get the complete output of my program. This is annoying if i’m running a process that takes a while to finish.

Can i let my program write the process output line by line, by polling the process output before it finishes in a loop or something?

**[EDIT] Sorry i didn’t search very well before posting this question. Threading is actually the key. Found an example here which shows how to do it: ** Python Subprocess.Popen from a thread


回答 0

您可以在命令输出行之后立即使用iter处理行lines = iter(fd.readline, "")。这是一个显示典型用例的完整示例(感谢@jfs的帮助):

from __future__ import print_function # Only Python 2.x
import subprocess

def execute(cmd):
    popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
    for stdout_line in iter(popen.stdout.readline, ""):
        yield stdout_line 
    popen.stdout.close()
    return_code = popen.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, cmd)

# Example
for path in execute(["locate", "a"]):
    print(path, end="")

You can use iter to process lines as soon as the command outputs them: lines = iter(fd.readline, ""). Here’s a full example showing a typical use case (thanks to @jfs for helping out):

from __future__ import print_function # Only Python 2.x
import subprocess

def execute(cmd):
    popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
    for stdout_line in iter(popen.stdout.readline, ""):
        yield stdout_line 
    popen.stdout.close()
    return_code = popen.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, cmd)

# Example
for path in execute(["locate", "a"]):
    print(path, end="")

回答 1

好的,我设法通过使用此问题的片段来解决了没有线程的问题(不建议使用线程会更好的任何建议),方法是使用该问题的代码段在运行时拦截子进程的stdout

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    # Poll process for new output until finished
    while True:
        nextline = process.stdout.readline()
        if nextline == '' and process.poll() is not None:
            break
        sys.stdout.write(nextline)
        sys.stdout.flush()

    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

Ok i managed to solve it without threads (any suggestions why using threads would be better are appreciated) by using a snippet from this question Intercepting stdout of a subprocess while it is running

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    # Poll process for new output until finished
    while True:
        nextline = process.stdout.readline()
        if nextline == '' and process.poll() is not None:
            break
        sys.stdout.write(nextline)
        sys.stdout.flush()

    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

回答 2

在Python中刷新子进程的stdout缓冲区后立即逐行打印子进程的输出:

from subprocess import Popen, PIPE, CalledProcessError

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='') # process line here

if p.returncode != 0:
    raise CalledProcessError(p.returncode, p.args)

注意:您不需要p.poll()-循环在到达eof时结束。而且您不需要iter(p.stdout.readline, '')-预读错误已在Python 3中修复。

另请参见Python:从subprocess.communicate()读取流输入

To print subprocess’ output line-by-line as soon as its stdout buffer is flushed in Python 3:

from subprocess import Popen, PIPE, CalledProcessError

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='') # process line here

if p.returncode != 0:
    raise CalledProcessError(p.returncode, p.args)

Notice: you do not need p.poll() — the loop ends when eof is reached. And you do not need iter(p.stdout.readline, '') — the read-ahead bug is fixed in Python 3.

See also, Python: read streaming input from subprocess.communicate().


回答 3

当您只想打印输出时,实际上有一种非常简单的方法来执行此操作:

import subprocess
import sys

def execute(command):
    subprocess.check_call(command, stdout=sys.stdout, stderr=subprocess.STDOUT)

在这里,我们只是将子流程指向我们自己的标准输出,并使用现有的成功或异常api。

There is actually a really simple way to do this when you just want to print the output:

import subprocess
import sys

def execute(command):
    subprocess.check_call(command, stdout=sys.stdout, stderr=subprocess.STDOUT)

Here we’re simply pointing the subprocess to our own stdout, and using existing succeed or exception api.


回答 4

@tokland

尝试了代码并针对3.4进行了更正,Windows dir.cmd是一个简单的dir命令,另存为cmd文件

import subprocess
c = "dir.cmd"

def execute(command):
    popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
    lines_iterator = iter(popen.stdout.readline, b"")
    while popen.poll() is None:
        for line in lines_iterator:
            nline = line.rstrip()
            print(nline.decode("latin"), end = "\r\n",flush =True) # yield line

execute(c)

@tokland

tried your code and corrected it for 3.4 and windows dir.cmd is a simple dir command, saved as cmd-file

import subprocess
c = "dir.cmd"

def execute(command):
    popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
    lines_iterator = iter(popen.stdout.readline, b"")
    while popen.poll() is None:
        for line in lines_iterator:
            nline = line.rstrip()
            print(nline.decode("latin"), end = "\r\n",flush =True) # yield line

execute(c)

回答 5

如果有人想同时使用线程读取stdout和读取消息stderr,这是我想出的:

import threading
import subprocess
import Queue

class AsyncLineReader(threading.Thread):
    def __init__(self, fd, outputQueue):
        threading.Thread.__init__(self)

        assert isinstance(outputQueue, Queue.Queue)
        assert callable(fd.readline)

        self.fd = fd
        self.outputQueue = outputQueue

    def run(self):
        map(self.outputQueue.put, iter(self.fd.readline, ''))

    def eof(self):
        return not self.is_alive() and self.outputQueue.empty()

    @classmethod
    def getForFd(cls, fd, start=True):
        queue = Queue.Queue()
        reader = cls(fd, queue)

        if start:
            reader.start()

        return reader, queue


process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)

# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
   # Process all available lines from the stdout Queue.
   while not stdoutQueue.empty():
       line = stdoutQueue.get()
       print 'Received stdout: ' + repr(line)

       # Do stuff with stdout line.

   # Process all available lines from the stderr Queue.
   while not stderrQueue.empty():
       line = stderrQueue.get()
       print 'Received stderr: ' + repr(line)

       # Do stuff with stderr line.

   # Sleep for a short time to avoid excessive CPU use while waiting for data.
   sleep(0.05)

print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()

# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()

print "Waiting for process to exit..."
returnCode = process.wait()

if returnCode != 0:
   raise subprocess.CalledProcessError(returnCode, command)

我只是想分享一下,因为我最终遇到了这个问题,试图做类似的事情,但是没有一个答案解决了我的问题。希望它可以帮助某人!

请注意,在我的用例中,外部进程会杀死我们的进程Popen()

In case someone wants to read from both stdout and stderr at the same time using threads, this is what I came up with:

import threading
import subprocess
import Queue

class AsyncLineReader(threading.Thread):
    def __init__(self, fd, outputQueue):
        threading.Thread.__init__(self)

        assert isinstance(outputQueue, Queue.Queue)
        assert callable(fd.readline)

        self.fd = fd
        self.outputQueue = outputQueue

    def run(self):
        map(self.outputQueue.put, iter(self.fd.readline, ''))

    def eof(self):
        return not self.is_alive() and self.outputQueue.empty()

    @classmethod
    def getForFd(cls, fd, start=True):
        queue = Queue.Queue()
        reader = cls(fd, queue)

        if start:
            reader.start()

        return reader, queue


process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)

# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
   # Process all available lines from the stdout Queue.
   while not stdoutQueue.empty():
       line = stdoutQueue.get()
       print 'Received stdout: ' + repr(line)

       # Do stuff with stdout line.

   # Process all available lines from the stderr Queue.
   while not stderrQueue.empty():
       line = stderrQueue.get()
       print 'Received stderr: ' + repr(line)

       # Do stuff with stderr line.

   # Sleep for a short time to avoid excessive CPU use while waiting for data.
   sleep(0.05)

print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()

# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()

print "Waiting for process to exit..."
returnCode = process.wait()

if returnCode != 0:
   raise subprocess.CalledProcessError(returnCode, command)

I just wanted to share this, as I ended up on this question trying to do something similar, but none of the answers solved my problem. Hopefully it helps someone!

Note that in my use case, an external process kills the process that we Popen().


回答 6

对于任何试图从Python脚本获取标准输出的问题的人,请注意Python会缓冲其标准输出,因此可能需要一段时间才能看到该标准输出。

可以通过在目标脚本中的每个stdout写完后添加以下内容来纠正此问题:

sys.stdout.flush()

For anyone trying the answers to this question to get the stdout from a Python script note that Python buffers its stdout, and therefore it may take a while to see the stdout.

This can be rectified by adding the following after each stdout write in the target script:

sys.stdout.flush()

回答 7

在Python> = 3.5中使用subprocess.run对我有效:

import subprocess

cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)

(在执行期间获取输出也可以在没有的情况下使用shell=Truehttps://docs.python.org/3/library/subprocess.html#subprocess.run

In Python >= 3.5 using subprocess.run works for me:

import subprocess

cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)

(getting the output during execution also works without shell=True) https://docs.python.org/3/library/subprocess.html#subprocess.run


回答 8

为了回答最初的问题,IMO最好的方法是直接将子进程stdout直接重定向到您的程序stdout(可选,可以对进行相同的操作stderr,如下例所示)

p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()

To answer the original question, the best way IMO is just redirecting subprocess stdout directly to your program’s stdout (optionally, the same can be done for stderr, as in example below)

p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()

回答 9

此PoC不断读取过程的输出,可以在需要时进行访问。仅保留最后一个结果,所有其他输出都将被丢弃,从而防止PIPE耗尽内存:

import subprocess
import time
import threading
import Queue


class FlushPipe(object):
    def __init__(self):
        self.command = ['python', './print_date.py']
        self.process = None
        self.process_output = Queue.LifoQueue(0)
        self.capture_output = threading.Thread(target=self.output_reader)

    def output_reader(self):
        for line in iter(self.process.stdout.readline, b''):
            self.process_output.put_nowait(line)

    def start_process(self):
        self.process = subprocess.Popen(self.command,
                                        stdout=subprocess.PIPE)
        self.capture_output.start()

    def get_output_for_processing(self):
        line = self.process_output.get()
        print ">>>" + line


if __name__ == "__main__":
    flush_pipe = FlushPipe()
    flush_pipe.start_process()

    now = time.time()
    while time.time() - now < 10:
        flush_pipe.get_output_for_processing()
        time.sleep(2.5)

    flush_pipe.capture_output.join(timeout=0.001)
    flush_pipe.process.kill()

print_date.py

#!/usr/bin/env python
import time

if __name__ == "__main__":
    while True:
        print str(time.time())
        time.sleep(0.01)

输出:您可以清楚地看到,从〜2.5s间隔只有输出,两者之间没有任何输出。

>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01

This PoC constantly reads the output from a process and can be accessed when needed. Only the last result is kept, all other output is discarded, hence prevents the PIPE from growing out of memory:

import subprocess
import time
import threading
import Queue


class FlushPipe(object):
    def __init__(self):
        self.command = ['python', './print_date.py']
        self.process = None
        self.process_output = Queue.LifoQueue(0)
        self.capture_output = threading.Thread(target=self.output_reader)

    def output_reader(self):
        for line in iter(self.process.stdout.readline, b''):
            self.process_output.put_nowait(line)

    def start_process(self):
        self.process = subprocess.Popen(self.command,
                                        stdout=subprocess.PIPE)
        self.capture_output.start()

    def get_output_for_processing(self):
        line = self.process_output.get()
        print ">>>" + line


if __name__ == "__main__":
    flush_pipe = FlushPipe()
    flush_pipe.start_process()

    now = time.time()
    while time.time() - now < 10:
        flush_pipe.get_output_for_processing()
        time.sleep(2.5)

    flush_pipe.capture_output.join(timeout=0.001)
    flush_pipe.process.kill()

print_date.py

#!/usr/bin/env python
import time

if __name__ == "__main__":
    while True:
        print str(time.time())
        time.sleep(0.01)

output: You can clearly see that there is only output from ~2.5s interval nothing in between.

>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01

回答 10

这至少在Python3.4中有效

import subprocess

process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
    print(line.decode().strip())

This works at least in Python3.4

import subprocess

process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
    print(line.decode().strip())

回答 11

这里没有答案能满足我的所有需求。

  1. 没有用于stdout的线程(也没有队列等)
  2. 非阻塞,因为我需要检查其他情况
  3. 根据需要使用PIPE来执行多项操作,例如流输出,写入日志文件并返回输出的字符串副本。

一些背景知识:我正在使用ThreadPoolExecutor来管理线程池,每个线程都启动一个子进程并运行它们的并发性。(在Python2.7中,但这也应在较新的3.x中运行)。我不想仅将线程用于输出收集,因为我希望尽可能多的线程可用于其他事情(20个进程的池将仅使用40个线程来运行; 1个用于进程线程,而1个用于stdout …还有更多,如果您想要stderr,我猜)

我在这里剥离了很多异常,因此这是基于可在生产环境中使用的代码的。希望我不会在复制粘贴时毁了它。另外,非常欢迎反馈!

import time
import fcntl
import subprocess
import time

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)

def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
    """A little inline function to handle the stdout business. """
    # fcntl makes readline non-blocking so it raises an IOError when empty
    try:
        for s in iter(proc_stream.readline, ''):   # replace '' with b'' for Python 3
            my_buffer.append(s)

            if echo_streams:
                sys.stdout.write(s)

            if log_file:
                log_file.write(s)
    except IOError:
        pass

# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
    handle_stdout(proc_stdout, stdout_parts)

    # ...Check for other things here...
    # For example, check a multiprocessor.Value('b') to proc.kill()

    time.sleep(0.01)

# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)

stdout_str = "".join(stdout_parts)  # Just to demo

我确定这里要增加开销,但是这对我来说不是问题。从功能上来说,它可以满足我的需求。我唯一没有解决的问题就是为什么这对于日志消息非常有效,但是我看到一些print消息稍后出现,并且一次全部出现。

None of the answers here addressed all of my needs.

  1. No threads for stdout (no Queues, etc, either)
  2. Non-blocking as I need to check for other things going on
  3. Use PIPE as I needed to do multiple things, e.g. stream output, write to a log file and return a string copy of the output.

A little background: I am using a ThreadPoolExecutor to manage a pool of threads, each launching a subprocess and running them concurrency. (In Python2.7, but this should work in newer 3.x as well). I don’t want to use threads just for output gathering as I want as many available as possible for other things (a pool of 20 processes would be using 40 threads just to run; 1 for the process thread and 1 for stdout…and more if you want stderr I guess)

I’m stripping back a lot of exception and such here so this is based on code that works in production. Hopefully I didn’t ruin it in the copy and paste. Also, feedback very much welcome!

import time
import fcntl
import subprocess
import time

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)

def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
    """A little inline function to handle the stdout business. """
    # fcntl makes readline non-blocking so it raises an IOError when empty
    try:
        for s in iter(proc_stream.readline, ''):   # replace '' with b'' for Python 3
            my_buffer.append(s)

            if echo_streams:
                sys.stdout.write(s)

            if log_file:
                log_file.write(s)
    except IOError:
        pass

# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
    handle_stdout(proc_stdout, stdout_parts)

    # ...Check for other things here...
    # For example, check a multiprocessor.Value('b') to proc.kill()

    time.sleep(0.01)

# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)

stdout_str = "".join(stdout_parts)  # Just to demo

I’m sure there is overhead being added here but it is not a concern in my case. Functionally it does what I need. The only thing I haven’t solved is why this works perfectly for log messages but I see some print messages show up later and all at once.


回答 12

在Python 3.6中,我使用了以下命令:

import subprocess

cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)

In Python 3.6 I used this:

import subprocess

cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。