问题:逐行读取子流程标准输出

我的python脚本使用子进程来调用非常嘈杂的linux实用程序。我想将所有输出存储到日志文件中,并向用户显示其中的一些内容。我以为下面的方法可以工作,但是直到实用程序产生大量输出后,输出才出现在我的应用程序中。

#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
   print hex(i)*512
   i += 1
   time.sleep(0.5)

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
   #the real code does filtering here
   print "test:", line.rstrip()

我真正想要的行为是过滤器脚本打印从子流程接收到的每一行。Sorta像是做什么tee,但带有python代码。

我想念什么?这有可能吗?


更新:

如果将a sys.stdout.flush()添加到fake_utility.py中,则代码在python 3.1中具有所需的行为。我正在使用python 2.6。您可能会认为使用proc.stdout.xreadlines()与py3k相同,但事实并非如此。


更新2:

这是最小的工作代码。

#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
   print i
   sys.stdout.flush()
   time.sleep(0.5)

#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
   print line.rstrip()

My python script uses subprocess to call a linux utility that is very noisy. I want to store all of the output to a log file and show some of it to the user. I thought the following would work, but the output doesn’t show up in my application until the utility has produced a significant amount of output.

#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
   print hex(i)*512
   i += 1
   time.sleep(0.5)

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
   #the real code does filtering here
   print "test:", line.rstrip()

The behavior I really want is for the filter script to print each line as it is received from the subprocess. Sorta like what tee does but with python code.

What am I missing? Is this even possible?


Update:

If a sys.stdout.flush() is added to fake_utility.py, the code has the desired behavior in python 3.1. I’m using python 2.6. You would think that using proc.stdout.xreadlines() would work the same as py3k, but it doesn’t.


Update 2:

Here is the minimal working code.

#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
   print i
   sys.stdout.flush()
   time.sleep(0.5)

#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
   print line.rstrip()

回答 0

自从我上一次使用Python以来已经很长时间了,但是我认为问题出在语句for line in proc.stdout,该语句在迭代之前读取整个输入。解决方案是改为使用readline()

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

当然,您仍然必须处理子进程的缓冲。

注意:根据文档,使用迭代器的解决方案应该与使用等效readline(),除了预读缓冲区外,但是(或者正因为如此)建议的更改确实为我带来了不同的结果(Windows XP上为Python 2.5)。

It’s been a long time since I last worked with Python, but I think the problem is with the statement for line in proc.stdout, which reads the entire input before iterating over it. The solution is to use readline() instead:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

Of course you still have to deal with the subprocess’ buffering.

Note: according to the documentation the solution with an iterator should be equivalent to using readline(), except for the read-ahead buffer, but (or exactly because of this) the proposed change did produce different results for me (Python 2.5 on Windows XP).


回答 1

参加聚会有点晚,但是很惊讶没有看到我认为这是最简单的解决方案:

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

(这需要Python3。)

Bit late to the party, but was surprised not to see what I think is the simplest solution here:

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

(This requires Python 3.)


回答 2

确实,如果您整理出迭代器,那么缓冲现在可能是您的问题。您可以告诉子进程中的python不要缓冲其输出。

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

变成

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

从python内部调用python时,我需要此功能。

Indeed, if you sorted out the iterator then buffering could now be your problem. You could tell the python in the sub-process not to buffer its output.

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

becomes

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

I have needed this when calling python from within python.


回答 3

您想将这些额外的参数传递给subprocess.Popen

bufsize=1, universal_newlines=True

然后,您可以像示例中那样进行迭代。(使用Python 3.5测试)

You want to pass these extra parameters to subprocess.Popen:

bufsize=1, universal_newlines=True

Then you can iterate as in your example. (Tested with Python 3.5)


回答 4

允许逐行实时地同时迭代stdout和迭代的功能stderr

万一您需要同时获取stdoutstderr在同一时间,你可以使用下面的函数。

该函数使用队列将两个Popen管道合并为一个迭代器。

在这里,我们创建函数read_popen_pipes()

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

read_popen_pipes() 正在使用:

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code

A function that allows iterating over both stdout and stderr concurrently, in realtime, line by line

In case you need to get the output stream for both stdout and stderr at the same time, you can use the following function.

The function uses Queues to merge both Popen pipes into a single iterator.

Here we create the function read_popen_pipes():

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

read_popen_pipes() in use:

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code

回答 5

您也可以读取不带循环的行。在python3.6中工作。

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

You can also read lines w/o loop. Works in python3.6.

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

回答 6

我试图与python3和它的工作,

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0

        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

I tried this with python3 and it worked, source

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0

        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

回答 7

Rômulo答案的以下修改对我适用于Python 2和3(2.7.12和3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break

The following modification of Rômulo’s answer works for me on Python 2 and 3 (2.7.12 and 3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break

回答 8

Dunno(已将其添加到子流程模块中),但使用Python 3时,您可以使用proc.stdout.splitlines()

for line in proc.stdout.splitlines():
   print "stdout:", line

Dunno when this has been added to the subprocess module, but with Python 3 you should be fine with using proc.stdout.splitlines():

for line in proc.stdout.splitlines():
   print "stdout:", line

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。