问题:子流程命令的实时输出
我正在使用python脚本作为流体力学代码的驱动程序。是时候运行模拟了,我subprocess.Popen
用来运行代码,将stdout和stderr的输出收集到subprocess.PIPE
—中,然后我可以打印(并保存到日志文件中)输出信息,并检查是否有错误。问题是,我不知道代码是如何进行的。如果直接从命令行运行它,它会向我输出有关它的迭代时间,时间,下一时间步长等的信息。
有没有办法既存储输出(用于日志记录和错误检查),又产生实时流输出?
我的代码的相关部分:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failed\n\n%s\n\n" % (errors)
success = False
if( errors ): log_file.write("\n\n%s\n\n" % errors)
最初,我是run_command
通过管道传递数据,tee
以便将副本直接发送到日志文件,并且流仍直接输出到终端-但是那样,我无法存储任何错误(据我所知)。
编辑:
临时解决方案:
ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()
然后,在另一个终端中,运行tail -f log.txt
(st log_file = 'log.txt'
)。
I’m using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen
to run the code, collect the output from stdout and stderr into a subprocess.PIPE
— then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.
Is there a way to both store the output (for logging and error checking), and also produce a live-streaming output?
The relevant section of my code:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
print "RUN failed\n\n%s\n\n" % (errors)
success = False
if( errors ): log_file.write("\n\n%s\n\n" % errors)
Originally I was piping the run_command
through tee
so that a copy went directly to the log-file, and the stream still output directly to the terminal — but that way I can’t store any errors (to my knowlege).
Edit:
Temporary solution:
ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
log_file.flush()
then, in another terminal, run tail -f log.txt
(s.t. log_file = 'log.txt'
).
回答 0
您可以通过两种方法执行此操作,或者通过从read
或readline
函数创建一个迭代器,然后执行:
import subprocess
import sys
with open('test.log', 'w') as f: # replace 'w' with 'wb' for Python 3
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), ''): # replace '' with b'' for Python 3
sys.stdout.write(c)
f.write(c)
要么
import subprocess
import sys
with open('test.log', 'w') as f: # replace 'w' with 'wb' for Python 3
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''): # replace '' with b'' for Python 3
sys.stdout.write(line)
f.write(line)
或者,您可以创建reader
和writer
文件。将传递writer
到Popen
并从中读取reader
import io
import time
import subprocess
import sys
filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
process = subprocess.Popen(command, stdout=writer)
while process.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())
这样,您就可以将数据写入 test.log
在和标准输出中。
文件方法的唯一优点是您的代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并reader
以不阻塞的方式随时阅读。当使用PIPE
,read
和readline
功能将阻塞,直到任一个字符被写入到管或线被分别写入到管道。
You have two ways of doing this, either by creating an iterator from the read
or readline
functions and do:
import subprocess
import sys
with open('test.log', 'w') as f: # replace 'w' with 'wb' for Python 3
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for c in iter(lambda: process.stdout.read(1), ''): # replace '' with b'' for Python 3
sys.stdout.write(c)
f.write(c)
or
import subprocess
import sys
with open('test.log', 'w') as f: # replace 'w' with 'wb' for Python 3
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in iter(process.stdout.readline, ''): # replace '' with b'' for Python 3
sys.stdout.write(line)
f.write(line)
Or you can create a reader
and a writer
file. Pass the writer
to the Popen
and read from the reader
import io
import time
import subprocess
import sys
filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
process = subprocess.Popen(command, stdout=writer)
while process.poll() is None:
sys.stdout.write(reader.read())
time.sleep(0.5)
# Read the remaining
sys.stdout.write(reader.read())
This way you will have the data written in the test.log
as well as on the standard output.
The only advantage of the file approach is that your code doesn’t block. So you can do whatever you want in the meantime and read whenever you want from the reader
in a non-blocking way. When you use PIPE
, read
and readline
functions will block until either one character is written to the pipe or a line is written to the pipe respectively.
回答 1
执行摘要(或“ tl; dr”版本):最多有一个很容易subprocess.PIPE
,否则很难。
现在可能是时候解释一下它是如何subprocess.Popen
工作的了。
(注意:这是针对Python 2.x的,尽管3.x相似;并且我对Windows变体很模糊。我对POSIX的了解要好得多。)
该Popen
功能需要同时处理零到三个I / O流。分别以stdin
,stdout
和表示stderr
。
您可以提供:
None
,表示您不想重定向流。它将照常继承这些。请注意,至少在POSIX系统上,这并不意味着它将使用Python的sys.stdout
,而仅使用Python的实际标准输出。参见演示示例。
- 一个
int
值。这是一个“原始”文件描述符(至少在POSIX中)。(附带说明:PIPE
和STDOUT
实际上int
是内部的,但是是“不可能的”描述符-1和-2。)
- 流-实际上是具有
fileno
方法的任何对象。 Popen
将使用来找到该流的描述符stream.fileno()
,然后按照int
值进行操作。
subprocess.PIPE
,指示Python应该创建一个管道。
subprocess.STDOUT
(stderr
仅适用):告诉Python使用与相同的描述符stdout
。仅当您提供的(非None
)值时才有意义stdout
,即使如此,也只有在设置时才需要它stdout=subprocess.PIPE
。(否则,您可以只提供您提供的相同参数stdout
,例如Popen(..., stdout=stream, stderr=stream)
。)
最简单的情况(无管道)
如果不进行任何重定向(将所有三个都保留为默认None
值或提供明确的None
),Pipe
则非常简单。它只需要剥离子流程并使其运行。或者,如果您重定向到一个非PIPE
-an int
或流是fileno()
-它仍然很容易,因为OS做所有的工作。Python只需要剥离子进程,即可将其stdin,stdout和/或stderr连接到提供的文件描述符。
仍然很容易的情况:一根烟斗
如果仅重定向一个流,那么Pipe
事情仍然很简单。让我们一次选择一个流并观看。
假设你想提供一些stdin
,但让stdout
和stderr
去未重定向,或去文件描述符。作为父进程,您的Python程序只需要用于通过write()
管道发送数据。您可以自己执行此操作,例如:
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc
或者您可以将stdin数据传递到proc.communicate()
,然后执行stdin.write
上面所示的操作。没有输出返回,因此communicate()
只有一项实际工作:它还会为您关闭管道。(如果不调用proc.communicate()
,则必须调用proc.stdin.close()
以关闭管道,以便子进程知道不再有数据通过。)
假设你想捕捉stdout
,但休假stdin
和stderr
孤独。同样,这很容易:只需调用proc.stdout.read()
(或等效命令),直到没有更多输出为止。由于proc.stdout()
是普通的Python I / O流,因此可以在其上使用所有普通的构造,例如:
for line in proc.stdout:
或者,您也可以使用proc.communicate()
,它可以read()
为您轻松完成。
如果只想捕获stderr
,则它的功能与相同stdout
。
在事情变得艰难之前,还有另外一个技巧。假设您要捕获stdout
,并且还捕获stderr
但与stdout在同一管道上:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
在这种情况下,subprocess
“作弊”!好吧,它必须这样做,所以它并不是真正的作弊:它使用其stdout和stderr引导到(单个)管道描述符中的子进程来启动子进程,该子进程描述符反馈给其父进程(Python)。在父端,只有一个管道描述符用于读取输出。所有“ stderr”输出都显示在中proc.stdout
,如果调用proc.communicate()
,stderr结果(元组中的第二个值)将是None
,而不是字符串。
困难情况:两个或更多管道
当您要使用至少两个管道时,所有问题都会出现。实际上,subprocess
代码本身具有以下功能:
def communicate(self, input=None):
...
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
但是,可惜,在这里,我们至少制作了两个(也许三个)不同的管道,因此count(None)
返回值为1或0。我们必须用困难的方式做事。
在Windows上,这用于threading.Thread
累积self.stdout
和的结果self.stderr
,并让父线程传递self.stdin
输入数据(然后关闭管道)。
在POSIX上,poll
如果可用,则使用,否则select
,使用累加输出并传递标准输入。所有这些都在(单个)父进程/线程中运行。
这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入过程被挂起之前,等待读取过程从另一端“清除”管道之前,可以在管道中填充多少数据有一个很小的限制。为了说明起见,我们将这个较小的限制设置为一个字节。(实际上,这是工作原理,但限制远大于一个字节。)
如果父进程(Python)尝试写入多个字节(例如'go\n'
到)proc.stdin
,则第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,从而清空管道。
同时,假设子流程决定打印一个友好的“ Hello!Do n’t Panic!”。问候。在H
进入它的标准输出管道,但e
导致其暂停,等待其家长阅读H
,排空stdout管道。
现在我们陷入困境:Python进程处于睡眠状态,等待说完“ go”,而子进程也处于睡眠状态,等待说完“ Hello!Don Panic!”。
该subprocess.Popen
代码避免了线程化或选择/轮询的问题。当字节可以通过管道时,它们就会通过。如果不能,则只有一个线程(而不是整个进程)必须进入睡眠状态;或者,在选择/轮询的情况下,Python进程同时等待“可以写入”或“可用数据”,然后写入该进程的stdin仅在有空间时,并且仅在数据准备就绪时读取其stdout和/或stderr。一旦发送了所有标准输入数据(如果有的话)并且所有标准输出和/或标准错误数据都已存储,则该proc.communicate()
代码(实际上_communicate
是处理多毛案件的地方)返回。
如果你想同时读取stdout
并stderr
在两个不同的管道(无论任何的stdin
重定向),则需要避免死锁了。此处的死锁情况有所不同-发生在子进程stderr
从中提取数据时写入了很长时间stdout
,反之亦然,但是这种情况仍然存在。
演示
我答应演示未经重定向的python subprocess
写入底层标准输出,而不是sys.stdout
。因此,这是一些代码:
from cStringIO import StringIO
import os
import subprocess
import sys
def show1():
print 'start show1'
save = sys.stdout
sys.stdout = StringIO()
print 'sys.stdout being buffered'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
in_stdout = sys.stdout.getvalue()
sys.stdout = save
print 'in buffer:', in_stdout
def show2():
print 'start show2'
save = sys.stdout
sys.stdout = open(os.devnull, 'w')
print 'after redirect sys.stdout'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
sys.stdout = save
show1()
show2()
运行时:
$ python out.py
start show1
hello
in buffer: sys.stdout being buffered
start show2
hello
请注意,如果添加stdout=sys.stdout
,第一个例程将失败,因为StringIO
对象没有fileno
。hello
如果已添加,第二个将省略,stdout=sys.stdout
因为它sys.stdout
已被重定向到os.devnull
。
(如果重定向Python的file-descriptor-1,则子进程将遵循该重定向。该open(os.devnull, 'w')
调用将产生一个fileno()
大于2 的流。)
Executive Summary (or “tl;dr” version): it’s easy when there’s at most one subprocess.PIPE
, otherwise it’s hard.
It may be time to explain a bit about how subprocess.Popen
does its thing.
(Caveat: this is for Python 2.x, although 3.x is similar; and I’m quite fuzzy on the Windows variant. I understand the POSIX stuff much better.)
The Popen
function needs to deal with zero-to-three I/O streams, somewhat simultaneously. These are denoted stdin
, stdout
, and stderr
as usual.
You can provide:
None
, indicating that you don’t want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python’s sys.stdout
, just Python’s actual stdout; see demo at end.
- An
int
value. This is a “raw” file descriptor (in POSIX at least). (Side note: PIPE
and STDOUT
are actually int
s internally, but are “impossible” descriptors, -1 and -2.)
- A stream—really, any object with a
fileno
method. Popen
will find the descriptor for that stream, using stream.fileno()
, and then proceed as for an int
value.
subprocess.PIPE
, indicating that Python should create a pipe.
subprocess.STDOUT
(for stderr
only): tell Python to use the same descriptor as for stdout
. This only makes sense if you provided a (non-None
) value for stdout
, and even then, it is only needed if you set stdout=subprocess.PIPE
. (Otherwise you can just provide the same argument you provided for stdout
, e.g., Popen(..., stdout=stream, stderr=stream)
.)
The easiest cases (no pipes)
If you redirect nothing (leave all three as the default None
value or supply explicit None
), Pipe
has it quite easy. It just needs to spin off the subprocess and let it run. Or, if you redirect to a non-PIPE
—an int
or a stream’s fileno()
—it’s still easy, as the OS does all the work. Python just needs to spin off the subprocess, connecting its stdin, stdout, and/or stderr to the provided file descriptors.
The still-easy case: one pipe
If you redirect only one stream, Pipe
still has things pretty easy. Let’s pick one stream at a time and watch.
Suppose you want to supply some stdin
, but let stdout
and stderr
go un-redirected, or go to a file descriptor. As the parent process, your Python program simply needs to use write()
to send data down the pipe. You can do this yourself, e.g.:
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc
or you can pass the stdin data to proc.communicate()
, which then does the stdin.write
shown above. There is no output coming back so communicate()
has only one other real job: it also closes the pipe for you. (If you don’t call proc.communicate()
you must call proc.stdin.close()
to close the pipe, so that the subprocess knows there is no more data coming through.)
Suppose you want to capture stdout
but leave stdin
and stderr
alone. Again, it’s easy: just call proc.stdout.read()
(or equivalent) until there is no more output. Since proc.stdout()
is a normal Python I/O stream you can use all the normal constructs on it, like:
for line in proc.stdout:
or, again, you can use proc.communicate()
, which simply does the read()
for you.
If you want to capture only stderr
, it works the same as with stdout
.
There’s one more trick before things get hard. Suppose you want to capture stdout
, and also capture stderr
but on the same pipe as stdout:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
In this case, subprocess
“cheats”! Well, it has to do this, so it’s not really cheating: it starts the subprocess with both its stdout and its stderr directed into the (single) pipe-descriptor that feeds back to its parent (Python) process. On the parent side, there’s again only a single pipe-descriptor for reading the output. All the “stderr” output shows up in proc.stdout
, and if you call proc.communicate()
, the stderr result (second value in the tuple) will be None
, not a string.
The hard cases: two or more pipes
The problems all come about when you want to use at least two pipes. In fact, the subprocess
code itself has this bit:
def communicate(self, input=None):
...
# Optimization: If we are only using one pipe, or no pipe at
# all, using select() or threads is unnecessary.
if [self.stdin, self.stdout, self.stderr].count(None) >= 2:
But, alas, here we’ve made at least two, and maybe three, different pipes, so the count(None)
returns either 1 or 0. We must do things the hard way.
On Windows, this uses threading.Thread
to accumulate results for self.stdout
and self.stderr
, and has the parent thread deliver self.stdin
input data (and then close the pipe).
On POSIX, this uses poll
if available, otherwise select
, to accumulate output and deliver stdin input. All this runs in the (single) parent process/thread.
Threads or poll/select are needed here to avoid deadlock. Suppose, for instance, that we’ve redirected all three streams to three separate pipes. Suppose further that there’s a small limit on how much data can be stuffed into to a pipe before the writing process is suspended, waiting for the reading process to “clean out” the pipe from the other end. Let’s set that small limit to a single byte, just for illustration. (This is in fact how things work, except that the limit is much bigger than one byte.)
If the parent (Python) process tries to write several bytes—say, 'go\n'
to proc.stdin
, the first byte goes in and then the second causes the Python process to suspend, waiting for the subprocess to read the first byte, emptying the pipe.
Meanwhile, suppose the subprocess decides to print a friendly “Hello! Don’t Panic!” greeting. The H
goes into its stdout pipe, but the e
causes it to suspend, waiting for its parent to read that H
, emptying the stdout pipe.
Now we’re stuck: the Python process is asleep, waiting to finish saying “go”, and the subprocess is also asleep, waiting to finish saying “Hello! Don’t Panic!”.
The subprocess.Popen
code avoids this problem with threading-or-select/poll. When bytes can go over the pipes, they go. When they can’t, only a thread (not the whole process) has to sleep—or, in the case of select/poll, the Python process waits simultaneously for “can write” or “data available”, writes to the process’s stdin only when there is room, and reads its stdout and/or stderr only when data are ready. The proc.communicate()
code (actually _communicate
where the hairy cases are handled) returns once all stdin data (if any) have been sent and all stdout and/or stderr data have been accumulated.
If you want to read both stdout
and stderr
on two different pipes (regardless of any stdin
redirection), you will need to avoid deadlock too. The deadlock scenario here is different—it occurs when the subprocess writes something long to stderr
while you’re pulling data from stdout
, or vice versa—but it’s still there.
The Demo
I promised to demonstrate that, un-redirected, Python subprocess
es write to the underlying stdout, not sys.stdout
. So, here is some code:
from cStringIO import StringIO
import os
import subprocess
import sys
def show1():
print 'start show1'
save = sys.stdout
sys.stdout = StringIO()
print 'sys.stdout being buffered'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
in_stdout = sys.stdout.getvalue()
sys.stdout = save
print 'in buffer:', in_stdout
def show2():
print 'start show2'
save = sys.stdout
sys.stdout = open(os.devnull, 'w')
print 'after redirect sys.stdout'
proc = subprocess.Popen(['echo', 'hello'])
proc.wait()
sys.stdout = save
show1()
show2()
When run:
$ python out.py
start show1
hello
in buffer: sys.stdout being buffered
start show2
hello
Note that the first routine will fail if you add stdout=sys.stdout
, as a StringIO
object has no fileno
. The second will omit the hello
if you add stdout=sys.stdout
since sys.stdout
has been redirected to os.devnull
.
(If you redirect Python’s file-descriptor-1, the subprocess will follow that redirection. The open(os.devnull, 'w')
call produces a stream whose fileno()
is greater than 2.)
回答 2
我们还可以使用默认的文件迭代器来读取stdout,而不是使用带有readline()的iter构造。
import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
sys.stdout.write(line)
We can also use the default file iterator for reading stdout instead of using iter construct with readline().
import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
sys.stdout.write(line)
回答 3
如果您可以使用第三方库,则可以使用类似的东西sarge
(披露:我是它的维护者)。该库允许无阻塞地访问子流程的输出流-它位于subprocess
模块之上。
If you’re able to use third-party libraries, You might be able to use something like sarge
(disclosure: I’m its maintainer). This library allows non-blocking access to output streams from subprocesses – it’s layered over the subprocess
module.
回答 4
解决方案1:实时并发记录stdout
和stderr
一个简单的解决方案,可以同时逐行实时地同时将stdout和stderr 记录到日志文件中。
import subprocess as sp
from concurrent.futures import ThreadPoolExecutor
def log_popen_pipe(p, stdfile):
with open("mylog.txt", "w") as f:
while p.poll() is None:
f.write(stdfile.readline())
f.flush()
# Write the rest from the buffer
f.write(stdfile.read())
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
with ThreadPoolExecutor(2) as pool:
r1 = pool.submit(log_popen_pipe, p, p.stdout)
r2 = pool.submit(log_popen_pipe, p, p.stderr)
r1.result()
r2.result()
解决方案2:read_popen_pipes()
允许您同时并行访问两个管道(stdout / stderr)的功能
import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
# The function in use:
with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
print(out_line, end='')
print(err_line, end='')
return p.poll()
Solution 1: Log stdout
AND stderr
concurrently in realtime
A simple solution which logs both stdout AND stderr concurrently, line-by-line in realtime into a log file.
import subprocess as sp
from concurrent.futures import ThreadPoolExecutor
def log_popen_pipe(p, stdfile):
with open("mylog.txt", "w") as f:
while p.poll() is None:
f.write(stdfile.readline())
f.flush()
# Write the rest from the buffer
f.write(stdfile.read())
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
with ThreadPoolExecutor(2) as pool:
r1 = pool.submit(log_popen_pipe, p, p.stdout)
r2 = pool.submit(log_popen_pipe, p, p.stderr)
r1.result()
r2.result()
Solution 2: A function read_popen_pipes()
that allows you to iterate over both pipes (stdout/stderr), concurrently in realtime
import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor
def enqueue_output(file, queue):
for line in iter(file.readline, ''):
queue.put(line)
file.close()
def read_popen_pipes(p):
with ThreadPoolExecutor(2) as pool:
q_stdout, q_stderr = Queue(), Queue()
pool.submit(enqueue_output, p.stdout, q_stdout)
pool.submit(enqueue_output, p.stderr, q_stderr)
while True:
if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
break
out_line = err_line = ''
try:
out_line = q_stdout.get_nowait()
err_line = q_stderr.get_nowait()
except Empty:
pass
yield (out_line, err_line)
# The function in use:
with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:
for out_line, err_line in read_popen_pipes(p):
print(out_line, end='')
print(err_line, end='')
p.poll()
回答 5
一个好的但“重量级”的解决方案是使用Twisted-参见底部。
如果您只愿意接受标准输出,则应该遵循以下原则:
import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
stdoutdata = popenobj.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
print "Return code", popenobj.returncode
(如果使用read(),它将尝试读取无用的整个“文件”,我们在这里真正可以使用的是读取管道中所有数据的东西)
一个人也可以尝试通过线程来解决这个问题,例如:
import subprocess
import sys
import threading
popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
def stdoutprocess(o):
while True:
stdoutdata = o.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode
现在,我们可以通过两个线程来添加stderr。
但是请注意,子流程文档不建议直接使用这些文件,建议使用communicate()
(主要涉及死锁,我认为这不是上面的问题),解决方案有点笨拙,因此看来子流程模块似乎还不够用工作(另请参见:http : //www.python.org/dev/peps/pep-3145/),我们需要查看其他内容。
一个更复杂的解决方案是使用Twisted,如下所示:https : //twistedmatrix.com/documents/11.1.0/core/howto/process.html
使用Twisted进行此操作的方法是使用reactor.spawnprocess()
并提供ProcessProtocol
,然后异步处理输出来创建您的流程。Twisted示例Python代码在这里:https : //twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py
A good but “heavyweight” solution is to use Twisted – see the bottom.
If you’re willing to live with only stdout something along those lines should work:
import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
stdoutdata = popenobj.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
print "Return code", popenobj.returncode
(If you use read() it tries to read the entire “file” which isn’t useful, what we really could use here is something that reads all the data that’s in the pipe right now)
One might also try to approach this with threading, e.g.:
import subprocess
import sys
import threading
popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)
def stdoutprocess(o):
while True:
stdoutdata = o.stdout.readline()
if stdoutdata:
sys.stdout.write(stdoutdata)
else:
break
t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode
Now we could potentially add stderr as well by having two threads.
Note however the subprocess docs discourage using these files directly and recommends to use communicate()
(mostly concerned with deadlocks which I think isn’t an issue above) and the solutions are a little klunky so it really seems like the subprocess module isn’t quite up to the job (also see: http://www.python.org/dev/peps/pep-3145/ ) and we need to look at something else.
A more involved solution is to use Twisted as shown here: https://twistedmatrix.com/documents/11.1.0/core/howto/process.html
The way you do this with Twisted is to create your process using reactor.spawnprocess()
and providing a ProcessProtocol
that then processes output asynchronously. The Twisted sample Python code is here: https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py
回答 6
除了所有这些答案之外,一种简单的方法还可以如下:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
while process.stdout.readable():
line = process.stdout.readline()
if not line:
break
print(line.strip())
只要可读流就循环遍历可读流,如果结果为空,则将其停止。
这里的关键是,只要有输出,就readline()
返回一行(\n
末尾带有),如果确实是末尾,则返回空。
希望这对某人有帮助。
In addition to all these answer, one simple approach could also be as follows:
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
while process.stdout.readable():
line = process.stdout.readline()
if not line:
break
print(line.strip())
Loop through the readable stream as long as it’s readable and if it gets an empty result, stop.
The key here is that readline()
returns a line (with \n
at the end) as long as there’s an output and empty if it’s really at the end.
Hope this helps someone.
回答 7
基于以上所有内容,我建议您对版本进行略微修改(python3):
- while循环调用readline(建议的iter解决方案似乎对我而言永远受阻-Python 3,Windows 7)
- 经过结构化处理,因此在轮询返回后,不需要重复处理读数据-
None
- 将stderr传递到stdout,以便读取两个输出
- 添加了代码以获取cmd的退出值。
码:
import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, universal_newlines=True)
while True:
rd = proc.stdout.readline()
print(rd, end='') # and whatever you want to do...
if not rd: # EOF
returncode = proc.poll()
if returncode is not None:
break
time.sleep(0.1) # cmd closed stdout, but not exited yet
# You may want to check on ReturnCode here
Based on all the above I suggest a slightly modified version (python3):
- while loop calling readline (The iter solution suggested seemed to block forever for me – Python 3, Windows 7)
- structered so handling of read data does not need to be duplicated after poll returns not-
None
- stderr piped into stdout so both output outputs are read
- Added code to get exit value of cmd.
Code:
import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, universal_newlines=True)
while True:
rd = proc.stdout.readline()
print(rd, end='') # and whatever you want to do...
if not rd: # EOF
returncode = proc.poll()
if returncode is not None:
break
time.sleep(0.1) # cmd closed stdout, but not exited yet
# You may want to check on ReturnCode here
回答 8
看起来行缓冲输出将为您工作,在这种情况下,可能适合以下情况。(注意:未经测试。)这只会实时提供子进程的标准输出。如果您想同时拥有stderr和stdout,则必须使用进行更复杂的操作select
。
proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
line = proc.stdout.readline()
print line
log_file.write(line + '\n')
# Might still be data on stdout at this point. Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
print line
log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...
It looks like line-buffered output will work for you, in which case something like the following might suit. (Caveat: it’s untested.) This will only give the subprocess’s stdout in real time. If you want to have both stderr and stdout in real time, you’ll have to do something more complex with select
.
proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
line = proc.stdout.readline()
print line
log_file.write(line + '\n')
# Might still be data on stdout at this point. Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
print line
log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...
回答 9
为什么不stdout
直接设置为sys.stdout
?而且,如果还需要输出到日志,则可以简单地覆盖f的write方法。
import sys
import subprocess
class SuperFile(open.__class__):
def write(self, data):
sys.stdout.write(data)
super(SuperFile, self).write(data)
f = SuperFile("log.txt","w+")
process = subprocess.Popen(command, stdout=f, stderr=f)
Why not set stdout
directly to sys.stdout
? And if you need to output to a log as well, then you can simply override the write method of f.
import sys
import subprocess
class SuperFile(open.__class__):
def write(self, data):
sys.stdout.write(data)
super(SuperFile, self).write(data)
f = SuperFile("log.txt","w+")
process = subprocess.Popen(command, stdout=f, stderr=f)
回答 10
我尝试过的所有上述解决方案都无法将stderr和stdout输出分开(多个管道),或者在OS管道缓冲区已满时永远阻塞,这在运行命令的命令输出速度太快时会发生(在python上有此警告poll()子流程手册)。我发现的唯一可靠方法是通过select,但这是仅posix的解决方案:
import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poller = select.poll()
poller.register(p.stdout, select.POLLIN)
poller.register(p.stderr, select.POLLIN)
coutput=''
cerror=''
fdhup={}
fdhup[p.stdout.fileno()]=0
fdhup[p.stderr.fileno()]=0
while sum(fdhup.values()) < len(fdhup):
try:
r = poller.poll(1)
except select.error, err:
if err.args[0] != EINTR:
raise
r=[]
for fd, flags in r:
if flags & (select.POLLIN | select.POLLPRI):
c = os.read(fd, 1024)
if rtoutput:
sys.stdout.write(c)
sys.stdout.flush()
if fd == p.stderr.fileno():
cerror+=c
else:
coutput+=c
else:
fdhup[fd]=1
return p.poll(), coutput.strip(), cerror.strip()
All of the above solutions I tried failed either to separate stderr and stdout output, (multiple pipes) or blocked forever when the OS pipe buffer was full which happens when the command you are running outputs too fast (there is a warning for this on python poll() manual of subprocess). The only reliable way I found was through select, but this is a posix-only solution:
import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poller = select.poll()
poller.register(p.stdout, select.POLLIN)
poller.register(p.stderr, select.POLLIN)
coutput=''
cerror=''
fdhup={}
fdhup[p.stdout.fileno()]=0
fdhup[p.stderr.fileno()]=0
while sum(fdhup.values()) < len(fdhup):
try:
r = poller.poll(1)
except select.error, err:
if err.args[0] != EINTR:
raise
r=[]
for fd, flags in r:
if flags & (select.POLLIN | select.POLLPRI):
c = os.read(fd, 1024)
if rtoutput:
sys.stdout.write(c)
sys.stdout.flush()
if fd == p.stderr.fileno():
cerror+=c
else:
coutput+=c
else:
fdhup[fd]=1
return p.poll(), coutput.strip(), cerror.strip()
回答 11
与先前的答案类似,但是以下解决方案为我在使用Python3的Windows上提供了一种通用的实时打印和登录方法(get-realtime-output-using-python):
def print_and_log(command, logFile):
with open(logFile, 'wb') as f:
command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
while True:
output = command.stdout.readline()
if not output and command.poll() is not None:
f.close()
break
if output:
f.write(output)
print(str(output.strip(), 'utf-8'), flush=True)
return command.poll()
Similar to previous answers but the following solution worked for for me on windows using Python3 to provide a common method to print and log in realtime (getting-realtime-output-using-python):
def print_and_log(command, logFile):
with open(logFile, 'wb') as f:
command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
while True:
output = command.stdout.readline()
if not output and command.poll() is not None:
f.close()
break
if output:
f.write(output)
print(str(output.strip(), 'utf-8'), flush=True)
return command.poll()
回答 12
我认为该subprocess.communicate
方法有点误导:它实际上填充了您在中指定的stdout和stderrsubprocess.Popen
。
但是,从subprocess.PIPE
您可以提供给subprocess.Popen
的stdout和stderr参数中读取信息,最终将填满OS管道缓冲区并死锁您的应用程序(特别是如果您有多个必须使用的进程/线程)subprocess
)。
我提出的解决方案是为stdout和stderr提供文件-并读取文件的内容,而不是从死锁中读取PIPE
。这些文件可以是tempfile.NamedTemporaryFile()
-在将它们写入时也可以访问以进行读取subprocess.communicate
。
以下是示例用法:
try:
with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
这是准备使用的源代码与我可以用来解释其作用的注释:
如果您使用的是python 2,请确保首先从pypi 安装最新版本的subprocess32软件包。
import os
import sys
import threading
import time
import tempfile
import logging
if os.name == 'posix' and sys.version_info[0] < 3:
# Support python 2
import subprocess32 as subprocess
else:
# Get latest and greatest from python 3
import subprocess
logger = logging.getLogger(__name__)
class ProcessError(Exception):
"""Base exception for errors related to running the process"""
class ProcessTimeout(ProcessError):
"""Error that will be raised when the process execution will exceed a timeout"""
class ProcessRunner(object):
def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
"""
Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
Process Runner. This is a class that should be used as a context manager - and that provides an iterator
for reading captured output from subprocess.communicate in near realtime.
Example usage:
try:
with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
:param args: same as subprocess.Popen
:param env: same as subprocess.Popen
:param timeout: same as subprocess.communicate
:param bufsize: same as subprocess.Popen
:param seconds_to_wait: time to wait between each readline from the temporary file
:param kwargs: same as subprocess.Popen
"""
self._seconds_to_wait = seconds_to_wait
self._process_has_timed_out = False
self._timeout = timeout
self._process_done = False
self._std_file_handle = tempfile.NamedTemporaryFile()
self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
self._thread = threading.Thread(target=self._run_process)
self._thread.daemon = True
def __enter__(self):
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._thread.join()
self._std_file_handle.close()
def __iter__(self):
# read all output from stdout file that subprocess.communicate fills
with open(self._std_file_handle.name, 'r') as stdout:
# while process is alive, keep reading data
while not self._process_done:
out = stdout.readline()
out_without_trailing_whitespaces = out.rstrip()
if out_without_trailing_whitespaces:
# yield stdout data without trailing \n
yield out_without_trailing_whitespaces
else:
# if there is nothing to read, then please wait a tiny little bit
time.sleep(self._seconds_to_wait)
# this is a hack: terraform seems to write to buffer after process has finished
out = stdout.read()
if out:
yield out
if self._process_has_timed_out:
raise ProcessTimeout('Process has timed out')
if self._process.returncode != 0:
raise ProcessError('Process has failed')
def _run_process(self):
try:
# Start gathering information (stdout and stderr) from the opened process
self._process.communicate(timeout=self._timeout)
# Graceful termination of the opened process
self._process.terminate()
except subprocess.TimeoutExpired:
self._process_has_timed_out = True
# Force termination of the opened process
self._process.kill()
self._process_done = True
@property
def return_code(self):
return self._process.returncode
I think that the subprocess.communicate
method is a bit misleading: it actually fills the stdout and stderr that you specify in the subprocess.Popen
.
Yet, reading from the subprocess.PIPE
that you can provide to the subprocess.Popen
‘s stdout and stderr parameters will eventually fill up OS pipe buffers and deadlock your app (especially if you’ve multiple processes/threads that must use subprocess
).
My proposed solution is to provide the stdout and stderr with files – and read the files’ content instead of reading from the deadlocking PIPE
. These files can be tempfile.NamedTemporaryFile()
– which can also be accessed for reading while they’re being written into by subprocess.communicate
.
Below is a sample usage:
try:
with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
And this is the source code which is ready to be used with as many comments as I could provide to explain what it does:
If you’re using python 2, please make sure to first install the latest version of the subprocess32 package from pypi.
import os
import sys
import threading
import time
import tempfile
import logging
if os.name == 'posix' and sys.version_info[0] < 3:
# Support python 2
import subprocess32 as subprocess
else:
# Get latest and greatest from python 3
import subprocess
logger = logging.getLogger(__name__)
class ProcessError(Exception):
"""Base exception for errors related to running the process"""
class ProcessTimeout(ProcessError):
"""Error that will be raised when the process execution will exceed a timeout"""
class ProcessRunner(object):
def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
"""
Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
Process Runner. This is a class that should be used as a context manager - and that provides an iterator
for reading captured output from subprocess.communicate in near realtime.
Example usage:
try:
with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
for out in process_runner:
print(out)
catch ProcessError as e:
print(e.error_message)
raise
:param args: same as subprocess.Popen
:param env: same as subprocess.Popen
:param timeout: same as subprocess.communicate
:param bufsize: same as subprocess.Popen
:param seconds_to_wait: time to wait between each readline from the temporary file
:param kwargs: same as subprocess.Popen
"""
self._seconds_to_wait = seconds_to_wait
self._process_has_timed_out = False
self._timeout = timeout
self._process_done = False
self._std_file_handle = tempfile.NamedTemporaryFile()
self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
self._thread = threading.Thread(target=self._run_process)
self._thread.daemon = True
def __enter__(self):
self._thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._thread.join()
self._std_file_handle.close()
def __iter__(self):
# read all output from stdout file that subprocess.communicate fills
with open(self._std_file_handle.name, 'r') as stdout:
# while process is alive, keep reading data
while not self._process_done:
out = stdout.readline()
out_without_trailing_whitespaces = out.rstrip()
if out_without_trailing_whitespaces:
# yield stdout data without trailing \n
yield out_without_trailing_whitespaces
else:
# if there is nothing to read, then please wait a tiny little bit
time.sleep(self._seconds_to_wait)
# this is a hack: terraform seems to write to buffer after process has finished
out = stdout.read()
if out:
yield out
if self._process_has_timed_out:
raise ProcessTimeout('Process has timed out')
if self._process.returncode != 0:
raise ProcessError('Process has failed')
def _run_process(self):
try:
# Start gathering information (stdout and stderr) from the opened process
self._process.communicate(timeout=self._timeout)
# Graceful termination of the opened process
self._process.terminate()
except subprocess.TimeoutExpired:
self._process_has_timed_out = True
# Force termination of the opened process
self._process.kill()
self._process_done = True
@property
def return_code(self):
return self._process.returncode
回答 13
这是我在一个项目中使用的类。它将子流程的输出重定向到日志。刚开始,我尝试简单地重写写方法,但是由于子进程将永远不会调用它而无法工作(重定向发生在文件描述符级别)。因此,我使用自己的管道,类似于subprocess-module中的管道。这具有将所有日志记录/打印逻辑封装在适配器中的优点,并且您只需将记录器的实例传递给Popen
:subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))
class LogAdapter(threading.Thread):
def __init__(self, logname, level = logging.INFO):
super().__init__()
self.log = logging.getLogger(logname)
self.readpipe, self.writepipe = os.pipe()
logFunctions = {
logging.DEBUG: self.log.debug,
logging.INFO: self.log.info,
logging.WARN: self.log.warn,
logging.ERROR: self.log.warn,
}
try:
self.logFunction = logFunctions[level]
except KeyError:
self.logFunction = self.log.info
def fileno(self):
#when fileno is called this indicates the subprocess is about to fork => start thread
self.start()
return self.writepipe
def finished(self):
"""If the write-filedescriptor is not closed this thread will
prevent the whole program from exiting. You can use this method
to clean up after the subprocess has terminated."""
os.close(self.writepipe)
def run(self):
inputFile = os.fdopen(self.readpipe)
while True:
line = inputFile.readline()
if len(line) == 0:
#no new data was added
break
self.logFunction(line.strip())
如果您不需要日志记录而只想使用print()
它,则可以明显地删除大部分代码并使该类更短。您还可以通过__enter__
and __exit__
方法将其展开并调用finished
,__exit__
以便可以轻松地将其用作上下文。
Here is a class which I’m using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn’t work as the subprocess will never call it (redirection happens on filedescriptor level). So I’m using my own pipe, similar to how it’s done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen
: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))
class LogAdapter(threading.Thread):
def __init__(self, logname, level = logging.INFO):
super().__init__()
self.log = logging.getLogger(logname)
self.readpipe, self.writepipe = os.pipe()
logFunctions = {
logging.DEBUG: self.log.debug,
logging.INFO: self.log.info,
logging.WARN: self.log.warn,
logging.ERROR: self.log.warn,
}
try:
self.logFunction = logFunctions[level]
except KeyError:
self.logFunction = self.log.info
def fileno(self):
#when fileno is called this indicates the subprocess is about to fork => start thread
self.start()
return self.writepipe
def finished(self):
"""If the write-filedescriptor is not closed this thread will
prevent the whole program from exiting. You can use this method
to clean up after the subprocess has terminated."""
os.close(self.writepipe)
def run(self):
inputFile = os.fdopen(self.readpipe)
while True:
line = inputFile.readline()
if len(line) == 0:
#no new data was added
break
self.logFunction(line.strip())
If you don’t need logging but simply want to use print()
you can obviously remove large portions of the code and keep the class shorter. You could also expand it by an __enter__
and __exit__
method and call finished
in __exit__
so that you could easily use it as context.
回答 14
没有Pythonic解决方案对我有用。事实证明,proc.stdout.read()
类似的行为可能永远存在。
因此,我这样使用tee
:
subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')
如果您已经在使用此解决方案,将非常方便shell=True
。
${PIPESTATUS}
捕获整个命令链的成功状态(仅在Bash中可用)。如果我省略&& exit ${PIPESTATUS}
,则它将始终返回零,因为tee
从不失败。
unbuffer
可能需要立即将每行打印到终端中,而不是等待太久直到“管道缓冲区”填满。但是,unbuffer吞没了assert(SIG Abort)的退出状态。
2>&1
还将stderror记录到文件中。
None of the Pythonic solutions worked for me.
It turned out that proc.stdout.read()
or similar may block forever.
Therefore, I use tee
like this:
subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')
This solution is convenient if you are already using shell=True
.
${PIPESTATUS}
captures the success status of the entire command chain (only available in Bash).
If I omitted the && exit ${PIPESTATUS}
, then this would always return zero since tee
never fails.
unbuffer
might be necessary for printing each line immediately into the terminal, instead of waiting way too long until the “pipe buffer” gets filled.
However, unbuffer swallows the exit status of assert (SIG Abort)…
2>&1
also logs stderror to the file.