标签归档:subprocess

子流程命令的实时输出

问题:子流程命令的实时输出

我正在使用python脚本作为流体力学代码的驱动程序。是时候运行模拟了,我subprocess.Popen用来运行代码,将stdout和stderr的输出收集到subprocess.PIPE—中,然后我可以打印(并保存到日志文件中)输出信息,并检查是否有错误。问题是,我不知道代码是如何进行的。如果直接从命令行运行它,它会向我输出有关它的迭代时间,时间,下一时间步长等的信息。

有没有办法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我是run_command通过管道传递数据,tee以便将副本直接发送到日志文件,并且流仍直接输出到终端-但是那样,我无法存储任何错误(据我所知)。


编辑:

临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端中,运行tail -f log.txt(st log_file = 'log.txt')。

I’m using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE — then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

Is there a way to both store the output (for logging and error checking), and also produce a live-streaming output?

The relevant section of my code:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

Originally I was piping the run_command through tee so that a copy went directly to the log-file, and the stream still output directly to the terminal — but that way I can’t store any errors (to my knowlege).


Edit:

Temporary solution:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

then, in another terminal, run tail -f log.txt (s.t. log_file = 'log.txt').


回答 0

您可以通过两种方法执行此操作,或者通过从readreadline函数创建一个迭代器,然后执行:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

要么

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

或者,您可以创建readerwriter文件。将传递writerPopen并从中读取reader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样,您就可以将数据写入 test.log在和标准输出中。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并reader以不阻塞的方式随时阅读。当使用PIPEreadreadline功能将阻塞,直到任一个字符被写入到管或线被分别写入到管道。

You have two ways of doing this, either by creating an iterator from the read or readline functions and do:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

or

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

Or you can create a reader and a writer file. Pass the writer to the Popen and read from the reader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

This way you will have the data written in the test.log as well as on the standard output.

The only advantage of the file approach is that your code doesn’t block. So you can do whatever you want in the meantime and read whenever you want from the reader in a non-blocking way. When you use PIPE, read and readline functions will block until either one character is written to the pipe or a line is written to the pipe respectively.


回答 1

执行摘要(或“ tl; dr”版本):最多有一个很容易subprocess.PIPE,否则很难。

现在可能是时候解释一下它是如何subprocess.Popen工作的了。

(注意:这是针对Python 2.x的,尽管3.x相似;并且我对Windows变体很模糊。我对POSIX的了解要好得多。)

Popen功能需要同时处理零到三个I / O流。分别以stdinstdout和表示stderr

您可以提供:

  • None,表示您不想重定向流。它将照常继承这些。请注意,至少在POSIX系统上,这并不意味着它将使用Python的sys.stdout,而仅使用Python的实际标准输出。参见演示示例。
  • 一个int值。这是一个“原始”文件描述符(至少在POSIX中)。(附带说明:PIPESTDOUT实际上int是内部的,但是是“不可能的”描述符-1和-2。)
  • 流-实际上是具有fileno方法的任何对象。 Popen将使用来找到该流的描述符stream.fileno(),然后按照int值进行操作。
  • subprocess.PIPE,指示Python应该创建一个管道。
  • subprocess.STDOUTstderr仅适用):告诉Python使用与相同的描述符stdout。仅当您提供的(非None)值时才有意义stdout,即使如此,也只有在设置时才需要stdout=subprocess.PIPE。(否则,您可以只提供您提供的相同参数stdout,例如Popen(..., stdout=stream, stderr=stream)。)

最简单的情况(无管道)

如果不进行任何重定向(将所有三个都保留为默认None值或提供明确的None),Pipe则非常简单。它只需要剥离子流程并使其运行。或者,如果您重定向到一个非PIPE-an int或流是fileno()-它仍然很容易,因为OS做所有的工作。Python只需要剥离子进程,即可将其stdin,stdout和/或stderr连接到提供的文件描述符。

仍然很容易的情况:一根烟斗

如果仅重定向一个流,那么Pipe事情仍然很简单。让我们一次选择一个流并观看。

假设你想提供一些stdin,但让stdoutstderr去未重定向,或去文件描述符。作为父进程,您的Python程序只需要用于通过write()管道发送数据。您可以自己执行此操作,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者您可以将stdin数据传递到proc.communicate(),然后执行stdin.write上面所示的操作。没有输出返回,因此communicate()只有一项实际工作:它还会为您关闭管道。(如果不调用proc.communicate(),则必须调用proc.stdin.close()以关闭管道,以便子进程知道不再有数据通过。)

假设你想捕捉stdout,但休假stdinstderr孤独。同样,这很容易:只需调用proc.stdout.read()(或等效命令),直到没有更多输出为止。由于proc.stdout()是普通的Python I / O流,因此可以在其上使用所有普通的构造,例如:

for line in proc.stdout:

或者,您也可以使用proc.communicate(),它可以read()为您轻松完成。

如果只想捕获stderr,则它的功能与相同stdout

在事情变得艰难之前,还有另外一个技巧。假设您要捕获stdout,并且还捕获stderr与stdout在同一管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,subprocess“作弊”!好吧,它必须这样做,所以它并不是真正的作弊:它使用其stdout和stderr引导到(单个)管道描述符中的子进程来启动子进程,该子进程描述符反馈给其父进程(Python)。在父端,只有一个管道描述符用于读取输出。所有“ stderr”输出都显示在中proc.stdout,如果调用proc.communicate(),stderr结果(元组中的第二个值)将是None,而不是字符串。

困难情况:两个或更多管道

当您要使用至少两个管道时,所有问题都会出现。实际上,subprocess代码本身具有以下功能:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,可惜,在这里,我们至少制作了两个(也许三个)不同的管道,因此count(None)返回值为1或0。我们必须用困难的方式做事。

在Windows上,这用于threading.Thread累积self.stdout和的结果self.stderr,并让父线程传递self.stdin输入数据(然后关闭管道)。

在POSIX上,poll如果可用,则使用,否则select,使用累加输出并传递标准输入。所有这些都在(单个)父进程/线程中运行。

这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入过程被挂起之前,等待读取过程从另一端“清除”管道之前,可以在管道中填充多少数据有一个很小的限制。为了说明起见,我们将这个较小的限制设置为一个字节。(实际上,这是工作原理,但限制远大于一个字节。)

如果父进程(Python)尝试写入多个字节(例如'go\n'到)proc.stdin,则第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,从而清空管道。

同时,假设子流程决定打印一个友好的“ Hello!Do n’t Panic!”。问候。在H进入它的标准输出管道,但e导致其暂停,等待其家长阅读H,排空stdout管道。

现在我们陷入困境:Python进程处于睡眠状态,等待说完“ go”,而子进程也处于睡眠状态,等待说完“ Hello!Don Panic!”。

subprocess.Popen代码避免了线程化或选择/轮询的问题。当字节可以通过管道时,它们就会通过。如果不能,则只有一个线程(而不是整个进程)必须进入睡眠状态;或者,在选择/轮询的情况下,Python进程同时等待“可以写入”或“可用数据”,然后写入该进程的stdin仅在有空间时,并且仅在数据准备就绪时读取其stdout和/或stderr。一旦发送了所有标准输入数据(如果有的话)并且所有标准输出和/或标准错误数据都已存储,则该proc.communicate()代码(实际上_communicate是处理多毛案件的地方)返回。

如果你想同时读取stdoutstderr在两个不同的管道(无论任何的stdin重定向),则需要避免死锁了。此处的死锁情况有所不同-发生在子进程stderr从中提取数据时写入了很长时间stdout,反之亦然,但是这种情况仍然存在。


演示

我答应演示未经重定向的python subprocess写入底层标准输出,而不是sys.stdout。因此,这是一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加stdout=sys.stdout,第一个例程将失败,因为StringIO对象没有filenohello如果已添加,第二个将省略,stdout=sys.stdout因为它sys.stdout已被重定向到os.devnull

(如果重定向Python的file-descriptor-1,则子进程遵循该重定向。该open(os.devnull, 'w')调用将产生一个fileno()大于2 的流。)

Executive Summary (or “tl;dr” version): it’s easy when there’s at most one subprocess.PIPE, otherwise it’s hard.

It may be time to explain a bit about how subprocess.Popen does its thing.

(Caveat: this is for Python 2.x, although 3.x is similar; and I’m quite fuzzy on the Windows variant. I understand the POSIX stuff much better.)

The Popen function needs to deal with zero-to-three I/O streams, somewhat simultaneously. These are denoted stdin, stdout, and stderr as usual.

You can provide:

  • None, indicating that you don’t want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python’s sys.stdout, just Python’s actual stdout; see demo at end.
  • An int value. This is a “raw” file descriptor (in POSIX at least). (Side note: PIPE and STDOUT are actually ints internally, but are “impossible” descriptors, -1 and -2.)
  • A stream—really, any object with a fileno method. Popen will find the descriptor for that stream, using stream.fileno(), and then proceed as for an int value.
  • subprocess.PIPE, indicating that Python should create a pipe.
  • subprocess.STDOUT (for stderr only): tell Python to use the same descriptor as for stdout. This only makes sense if you provided a (non-None) value for stdout, and even then, it is only needed if you set stdout=subprocess.PIPE. (Otherwise you can just provide the same argument you provided for stdout, e.g., Popen(..., stdout=stream, stderr=stream).)

The easiest cases (no pipes)

If you redirect nothing (leave all three as the default None value or supply explicit None), Pipe has it quite easy. It just needs to spin off the subprocess and let it run. Or, if you redirect to a non-PIPE—an int or a stream’s fileno()—it’s still easy, as the OS does all the work. Python just needs to spin off the subprocess, connecting its stdin, stdout, and/or stderr to the provided file descriptors.

The still-easy case: one pipe

If you redirect only one stream, Pipe still has things pretty easy. Let’s pick one stream at a time and watch.

Suppose you want to supply some stdin, but let stdout and stderr go un-redirected, or go to a file descriptor. As the parent process, your Python program simply needs to use write() to send data down the pipe. You can do this yourself, e.g.:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

or you can pass the stdin data to proc.communicate(), which then does the stdin.write shown above. There is no output coming back so communicate() has only one other real job: it also closes the pipe for you. (If you don’t call proc.communicate() you must call proc.stdin.close() to close the pipe, so that the subprocess knows there is no more data coming through.)

Suppose you want to capture stdout but leave stdin and stderr alone. Again, it’s easy: just call proc.stdout.read() (or equivalent) until there is no more output. Since proc.stdout() is a normal Python I/O stream you can use all the normal constructs on it, like:

for line in proc.stdout:

or, again, you can use proc.communicate(), which simply does the read() for you.

If you want to capture only stderr, it works the same as with stdout.

There’s one more trick before things get hard. Suppose you want to capture stdout, and also capture stderr but on the same pipe as stdout:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

In this case, subprocess “cheats”! Well, it has to do this, so it’s not really cheating: it starts the subprocess with both its stdout and its stderr directed into the (single) pipe-descriptor that feeds back to its parent (Python) process. On the parent side, there’s again only a single pipe-descriptor for reading the output. All the “stderr” output shows up in proc.stdout, and if you call proc.communicate(), the stderr result (second value in the tuple) will be None, not a string.

The hard cases: two or more pipes

The problems all come about when you want to use at least two pipes. In fact, the subprocess code itself has this bit:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

But, alas, here we’ve made at least two, and maybe three, different pipes, so the count(None) returns either 1 or 0. We must do things the hard way.

On Windows, this uses threading.Thread to accumulate results for self.stdout and self.stderr, and has the parent thread deliver self.stdin input data (and then close the pipe).

On POSIX, this uses poll if available, otherwise select, to accumulate output and deliver stdin input. All this runs in the (single) parent process/thread.

Threads or poll/select are needed here to avoid deadlock. Suppose, for instance, that we’ve redirected all three streams to three separate pipes. Suppose further that there’s a small limit on how much data can be stuffed into to a pipe before the writing process is suspended, waiting for the reading process to “clean out” the pipe from the other end. Let’s set that small limit to a single byte, just for illustration. (This is in fact how things work, except that the limit is much bigger than one byte.)

If the parent (Python) process tries to write several bytes—say, 'go\n'to proc.stdin, the first byte goes in and then the second causes the Python process to suspend, waiting for the subprocess to read the first byte, emptying the pipe.

Meanwhile, suppose the subprocess decides to print a friendly “Hello! Don’t Panic!” greeting. The H goes into its stdout pipe, but the e causes it to suspend, waiting for its parent to read that H, emptying the stdout pipe.

Now we’re stuck: the Python process is asleep, waiting to finish saying “go”, and the subprocess is also asleep, waiting to finish saying “Hello! Don’t Panic!”.

The subprocess.Popen code avoids this problem with threading-or-select/poll. When bytes can go over the pipes, they go. When they can’t, only a thread (not the whole process) has to sleep—or, in the case of select/poll, the Python process waits simultaneously for “can write” or “data available”, writes to the process’s stdin only when there is room, and reads its stdout and/or stderr only when data are ready. The proc.communicate() code (actually _communicate where the hairy cases are handled) returns once all stdin data (if any) have been sent and all stdout and/or stderr data have been accumulated.

If you want to read both stdout and stderr on two different pipes (regardless of any stdin redirection), you will need to avoid deadlock too. The deadlock scenario here is different—it occurs when the subprocess writes something long to stderr while you’re pulling data from stdout, or vice versa—but it’s still there.


The Demo

I promised to demonstrate that, un-redirected, Python subprocesses write to the underlying stdout, not sys.stdout. So, here is some code:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

When run:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

Note that the first routine will fail if you add stdout=sys.stdout, as a StringIO object has no fileno. The second will omit the hello if you add stdout=sys.stdout since sys.stdout has been redirected to os.devnull.

(If you redirect Python’s file-descriptor-1, the subprocess will follow that redirection. The open(os.devnull, 'w') call produces a stream whose fileno() is greater than 2.)


回答 2

我们还可以使用默认的文件迭代器来读取stdout,而不是使用带有readline()的iter构造。

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)

We can also use the default file iterator for reading stdout instead of using iter construct with readline().

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)

回答 3

如果您可以使用第三方库,则可以使用类似的东西sarge(披露:我是它的维护者)。该库允许无阻塞地访问子流程的输出流-它位于subprocess模块之上。

If you’re able to use third-party libraries, You might be able to use something like sarge (disclosure: I’m its maintainer). This library allows non-blocking access to output streams from subprocesses – it’s layered over the subprocess module.


回答 4

解决方案1:实时并发记录stdoutstderr

一个简单的解决方案,可以同时逐行实时地同时将stdout和stderr 记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:read_popen_pipes()允许您同时并行访问两个管道(stdout / stderr)的功能

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    return p.poll()

Solution 1: Log stdout AND stderr concurrently in realtime

A simple solution which logs both stdout AND stderr concurrently, line-by-line in realtime into a log file.

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

Solution 2: A function read_popen_pipes() that allows you to iterate over both pipes (stdout/stderr), concurrently in realtime

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()


回答 5

一个好的但“重量级”的解决方案是使用Twisted-参见底部。

如果您只愿意接受标准输出,则应该遵循以下原则:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果使用read(),它将尝试读取无用的整个“文件”,我们在这里真正可以使用的是读取管道中所有数据的东西)

一个人也可以尝试通过线程来解决这个问题,例如:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在,我们可以通过两个线程来添加stderr。

但是请注意,子流程文档不建议直接使用这些文件,建议使用communicate()(主要涉及死锁,我认为这不是上面的问题),解决方案有点笨拙,因此看来子流程模块似乎还不够用工作(另请参见:http : //www.python.org/dev/peps/pep-3145/),我们需要查看其他内容。

一个更复杂的解决方案是使用Twisted,如下所示:https : //twistedmatrix.com/documents/11.1.0/core/howto/process.html

使用Twisted进行此操作的方法是使用reactor.spawnprocess()并提供ProcessProtocol,然后异步处理输出来创建您的流程。Twisted示例Python代码在这里:https : //twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

A good but “heavyweight” solution is to use Twisted – see the bottom.

If you’re willing to live with only stdout something along those lines should work:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(If you use read() it tries to read the entire “file” which isn’t useful, what we really could use here is something that reads all the data that’s in the pipe right now)

One might also try to approach this with threading, e.g.:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

Now we could potentially add stderr as well by having two threads.

Note however the subprocess docs discourage using these files directly and recommends to use communicate() (mostly concerned with deadlocks which I think isn’t an issue above) and the solutions are a little klunky so it really seems like the subprocess module isn’t quite up to the job (also see: http://www.python.org/dev/peps/pep-3145/ ) and we need to look at something else.

A more involved solution is to use Twisted as shown here: https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

The way you do this with Twisted is to create your process using reactor.spawnprocess() and providing a ProcessProtocol that then processes output asynchronously. The Twisted sample Python code is here: https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py


回答 6

除了所有这些答案之外,一种简单的方法还可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要可读流就循环遍历可读流,如果结果为空,则将其停止。

这里的关键是,只要有输出,就readline()返回一行(\n末尾带有),如果确实是末尾,则返回空。

希望这对某人有帮助。

In addition to all these answer, one simple approach could also be as follows:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

Loop through the readable stream as long as it’s readable and if it gets an empty result, stop.

The key here is that readline() returns a line (with \n at the end) as long as there’s an output and empty if it’s really at the end.

Hope this helps someone.


回答 7

基于以上所有内容,我建议您对版本进行略微修改(python3):

  • while循环调用readline(建议的iter解决方案似乎对我而言永远受阻-Python 3,Windows 7)
  • 经过结构化处理,因此在轮询返回后,不需要重复处理读数据-None
  • 将stderr传递到stdout,以便读取两个输出
  • 添加了代码以获取cmd的退出值。

码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

Based on all the above I suggest a slightly modified version (python3):

  • while loop calling readline (The iter solution suggested seemed to block forever for me – Python 3, Windows 7)
  • structered so handling of read data does not need to be duplicated after poll returns not-None
  • stderr piped into stdout so both output outputs are read
  • Added code to get exit value of cmd.

Code:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

回答 8

看起来行缓冲输出将为您工作,在这种情况下,可能适合以下情况。(注意:未经测试。)这只会实时提供子进程的标准输出。如果您想同时拥有stderr和stdout,则必须使用进行更复杂的操作select

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...

It looks like line-buffered output will work for you, in which case something like the following might suit. (Caveat: it’s untested.) This will only give the subprocess’s stdout in real time. If you want to have both stderr and stdout in real time, you’ll have to do something more complex with select.

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...

回答 9

为什么不stdout直接设置为sys.stdout?而且,如果还需要输出到日志,则可以简单地覆盖f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

Why not set stdout directly to sys.stdout? And if you need to output to a log as well, then you can simply override the write method of f.

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

回答 10

我尝试过的所有上述解决方案都无法将stderr和stdout输出分开(多个管道),或者在OS管道缓冲区已满时永远阻塞,这在运行命令的命令输出速度太快时会发生(在python上有此警告poll()子流程手册)。我发现的唯一可靠方法是通过select,但这是仅posix的解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

All of the above solutions I tried failed either to separate stderr and stdout output, (multiple pipes) or blocked forever when the OS pipe buffer was full which happens when the command you are running outputs too fast (there is a warning for this on python poll() manual of subprocess). The only reliable way I found was through select, but this is a posix-only solution:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

回答 11

与先前的答案类似,但是以下解决方案为我在使用Python3的Windows上提供了一种通用的实时打印和登录方法(get-realtime-output-using-python):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

Similar to previous answers but the following solution worked for for me on windows using Python3 to provide a common method to print and log in realtime (getting-realtime-output-using-python):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

回答 12

我认为该subprocess.communicate方法有点误导:它实际上填充了您在中指定的stdoutstderrsubprocess.Popen

但是,从subprocess.PIPE您可以提供给subprocess.Popenstdoutstderr参数中读取信息,最终将填满OS管道缓冲区并死锁您的应用程序(特别是如果您有多个必须使用的进程/线程)subprocess)。

我提出的解决方案是为stdoutstderr提供文件-并读取文件的内容,而不是从死锁中读取PIPE。这些文件可以是tempfile.NamedTemporaryFile()-在将它们写入时也可以访问以进行读取subprocess.communicate

以下是示例用法:

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

这是准备使用的源代码与我可以用来解释其作用的注释:

如果您使用的是python 2,请确保首先从pypi 安装最新版本的subprocess32软件包。


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode


I think that the subprocess.communicate method is a bit misleading: it actually fills the stdout and stderr that you specify in the subprocess.Popen.

Yet, reading from the subprocess.PIPE that you can provide to the subprocess.Popen‘s stdout and stderr parameters will eventually fill up OS pipe buffers and deadlock your app (especially if you’ve multiple processes/threads that must use subprocess).

My proposed solution is to provide the stdout and stderr with files – and read the files’ content instead of reading from the deadlocking PIPE. These files can be tempfile.NamedTemporaryFile() – which can also be accessed for reading while they’re being written into by subprocess.communicate.

Below is a sample usage:

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

And this is the source code which is ready to be used with as many comments as I could provide to explain what it does:

If you’re using python 2, please make sure to first install the latest version of the subprocess32 package from pypi.


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode




回答 13

这是我在一个项目中使用的类。它将子流程的输出重定向到日志。刚开始,我尝试简单地重写写方法,但是由于子进程将永远不会调用它而无法工作(重定向发生在文件描述符级别)。因此,我使用自己的管道,类似于subprocess-module中的管道。这具有将所有日志记录/打印逻辑封装在适配器中的优点,并且您只需将记录器的实例传递给Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录而只想使用print()它,则可以明显地删除大部分代码并使该类更短。您还可以通过__enter__and __exit__方法将其展开并调用finished__exit__以便可以轻松地将其用作上下文。

Here is a class which I’m using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn’t work as the subprocess will never call it (redirection happens on filedescriptor level). So I’m using my own pipe, similar to how it’s done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

If you don’t need logging but simply want to use print() you can obviously remove large portions of the code and keep the class shorter. You could also expand it by an __enter__ and __exit__ method and call finished in __exit__ so that you could easily use it as context.


回答 14

没有Pythonic解决方案对我有用。事实证明,proc.stdout.read()类似的行为可能永远存在。

因此,我这样使用tee

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果您已经在使用此解决方案,将非常方便shell=True

${PIPESTATUS}捕获整个命令链的成功状态(仅在Bash中可用)。如果我省略&& exit ${PIPESTATUS},则它将始终返回零,因为tee从不失败。

unbuffer可能需要立即将每行打印到终端中,而不是等待太久直到“管道缓冲区”填满。但是,unbuffer吞没了assert(SIG Abort)的退出状态。

2>&1 还将stderror记录到文件中。

None of the Pythonic solutions worked for me. It turned out that proc.stdout.read() or similar may block forever.

Therefore, I use tee like this:

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

This solution is convenient if you are already using shell=True.

${PIPESTATUS} captures the success status of the entire command chain (only available in Bash). If I omitted the && exit ${PIPESTATUS}, then this would always return zero since tee never fails.

unbuffer might be necessary for printing each line immediately into the terminal, instead of waiting way too long until the “pipe buffer” gets filled. However, unbuffer swallows the exit status of assert (SIG Abort)…

2>&1 also logs stderror to the file.


子流程Popen和call有什么区别(我该如何使用它们)?

问题:子流程Popen和call有什么区别(我该如何使用它们)?

我想从Python调用一个外部程序。我已经使用过Popen()并且call()做到了。

两者有什么区别?

我的特定目标是从Python运行以下命令。我不确定重定向如何工作。

./my_script.sh > output

我阅读了文档,并说它call()是便利功能或快捷功能。我们使用call()代替会失去任何功能Popen()吗?

I want to call an external program from Python. I have used both Popen() and call() to do that.

What’s the difference between the two?

My specific goal is to run the following command from Python. I am not sure how redirects work.

./my_script.sh > output

I read the documentation and it says that call() is a convenience function or a shortcut function. Do we lose any power by using call() instead of Popen()?


回答 0

重定向有两种方法。两者都适用于subprocess.Popensubprocess.call

  1. 设置关键字参数shell = Trueexecutable = /path/to/the/shell并在那里指定命令。

  2. 由于您只是将输出重定向到文件,因此请设置关键字参数

    stdout = an_open_writeable_file_object

    对象指向output文件的位置。

subprocess.Popensubprocess.call

Popen不会阻塞,允许您在进程运行时与之进行交互,或者继续进行Python程序中的其他操作。调用Popen返回一个Popen对象。

call 阻止。它支持与Popen构造函数相同的所有参数,因此您仍可以设置进程的输出,环境变量等,脚本将等待程序完成,并call返回表示进程退出状态的代码。

returncode = call(*args, **kwargs) 

与通话基本相同

returncode = Popen(*args, **kwargs).wait()

call只是一种便利功能。它在CPython的实现是在subprocess.py

def call(*popenargs, timeout=None, **kwargs):
    """Run command with arguments.  Wait for command to complete or
    timeout, then return the returncode attribute.

    The arguments are the same as for the Popen constructor.  Example:

    retcode = call(["ls", "-l"])
    """
    with Popen(*popenargs, **kwargs) as p:
        try:
            return p.wait(timeout=timeout)
        except:
            p.kill()
            p.wait()
            raise

如您所见,它周围是薄薄的包装纸Popen

There are two ways to do the redirect. Both apply to either subprocess.Popen or subprocess.call.

  1. Set the keyword argument shell = True or executable = /path/to/the/shell and specify the command just as you have it there.

  2. Since you’re just redirecting the output to a file, set the keyword argument

    stdout = an_open_writeable_file_object
    

    where the object points to the output file.

subprocess.Popen is more general than subprocess.call.

Popen doesn’t block, allowing you to interact with the process while it’s running, or continue with other things in your Python program. The call to Popen returns a Popen object.

call does block. While it supports all the same arguments as the Popen constructor, so you can still set the process’ output, environmental variables, etc., your script waits for the program to complete, and call returns a code representing the process’ exit status.

returncode = call(*args, **kwargs) 

is basically the same as calling

returncode = Popen(*args, **kwargs).wait()

call is just a convenience function. It’s implementation in CPython is in subprocess.py:

def call(*popenargs, timeout=None, **kwargs):
    """Run command with arguments.  Wait for command to complete or
    timeout, then return the returncode attribute.

    The arguments are the same as for the Popen constructor.  Example:

    retcode = call(["ls", "-l"])
    """
    with Popen(*popenargs, **kwargs) as p:
        try:
            return p.wait(timeout=timeout)
        except:
            p.kill()
            p.wait()
            raise

As you can see, it’s a thin wrapper around Popen.


回答 1

另一个答案很完整,但这是一个经验法则:

  • call 正在阻止:

    call('notepad.exe')
    print('hello')  # only executed when notepad is closed
    
  • Popen 是非阻塞的:

    Popen('notepad.exe')
    print('hello')  # immediately executed
    

The other answer is very complete, but here is a rule of thumb:

  • call is blocking:

    call('notepad.exe')
    print('hello')  # only executed when notepad is closed
    
  • Popen is non-blocking:

    Popen('notepad.exe')
    print('hello')  # immediately executed
    

如何在管道上使用`subprocess`命令

问题:如何在管道上使用`subprocess`命令

我想搭配subprocess.check_output()使用ps -A | grep 'process_name'。我尝试了各种解决方案,但到目前为止没有任何效果。有人可以指导我怎么做吗?

I want to use subprocess.check_output() with ps -A | grep 'process_name'. I tried various solutions but so far nothing worked. Can someone guide me how to do it?


回答 0

要将管道与subprocess模块一起使用,必须通过shell=True

但是,出于种种原因,这并不是真正可取的选择,其中不仅包括安全性。相反,请分别创建psgrep流程,并将输出从一个管道传递到另一个管道,如下所示:

ps = subprocess.Popen(('ps', '-A'), stdout=subprocess.PIPE)
output = subprocess.check_output(('grep', 'process_name'), stdin=ps.stdout)
ps.wait()

但是,在您的特定情况下,简单的解决方案是先调用subprocess.check_output(('ps', '-A'))然后str.find在输出上。

To use a pipe with the subprocess module, you have to pass shell=True.

However, this isn’t really advisable for various reasons, not least of which is security. Instead, create the ps and grep processes separately, and pipe the output from one into the other, like so:

ps = subprocess.Popen(('ps', '-A'), stdout=subprocess.PIPE)
output = subprocess.check_output(('grep', 'process_name'), stdin=ps.stdout)
ps.wait()

In your particular case, however, the simple solution is to call subprocess.check_output(('ps', '-A')) and then str.find on the output.


回答 1

或者,您始终可以在子流程对象上使用communication方法。

cmd = "ps -A|grep 'process_name'"
ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = ps.communicate()[0]
print(output)

通信方法返回标准输出和标准错误的元组。

Or you can always use the communicate method on the subprocess objects.

cmd = "ps -A|grep 'process_name'"
ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = ps.communicate()[0]
print(output)

The communicate method returns a tuple of the standard output and the standard error.


回答 2

请参阅有关使用子流程设置管道的文档:http : //docs.python.org/2/library/subprocess.html#replacing-shell-pipeline

我没有测试下面的代码示例,但是它应该大致是您想要的:

query = "process_name"
ps_process = Popen(["ps", "-A"], stdout=PIPE)
grep_process = Popen(["grep", query], stdin=ps_process.stdout, stdout=PIPE)
ps_process.stdout.close()  # Allow ps_process to receive a SIGPIPE if grep_process exits.
output = grep_process.communicate()[0]

See the documentation on setting up a pipeline using subprocess: http://docs.python.org/2/library/subprocess.html#replacing-shell-pipeline

I haven’t tested the following code example but it should be roughly what you want:

query = "process_name"
ps_process = Popen(["ps", "-A"], stdout=PIPE)
grep_process = Popen(["grep", query], stdin=ps_process.stdout, stdout=PIPE)
ps_process.stdout.close()  # Allow ps_process to receive a SIGPIPE if grep_process exits.
output = grep_process.communicate()[0]

回答 3

JKALAVIS解决方案很好,但是我将使用shlex代替SHELL = TRUE进行改进。下面即时查询时间

#!/bin/python
import subprocess
import shlex

cmd = "dig @8.8.4.4 +notcp www.google.com|grep 'Query'"
ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = ps.communicate()[0]
print(output)

JKALAVIS solution is good, however I would add an improvement to use shlex instead of SHELL=TRUE. below im grepping out Query times

#!/bin/python
import subprocess
import shlex

cmd = "dig @8.8.4.4 +notcp www.google.com|grep 'Query'"
ps = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
output = ps.communicate()[0]
print(output)

回答 4

另外,尝试使用'pgrep'命令代替'ps -A | grep 'process_name'

Also, try to use 'pgrep' command instead of 'ps -A | grep 'process_name'


回答 5

您可以在sh.py中尝试管道功能:

import sh
print sh.grep(sh.ps("-ax"), "process_name")

You can try the pipe functionality in sh.py:

import sh
print sh.grep(sh.ps("-ax"), "process_name")

回答 6

Python 3.5之后,您还可以使用:

    import subprocess

    f = open('test.txt', 'w')
    process = subprocess.run(['ls', '-la'], stdout=subprocess.PIPE, universal_newlines=True)
    f.write(process.stdout)
    f.close()

该命令的执行被阻止,输出将在process.stdout中

After Python 3.5 you can also use:

    import subprocess

    f = open('test.txt', 'w')
    process = subprocess.run(['ls', '-la'], stdout=subprocess.PIPE, universal_newlines=True)
    f.write(process.stdout)
    f.close()

The execution of the command is blocking and the output will be in process.stdout.


逐行读取子流程标准输出

问题:逐行读取子流程标准输出

我的python脚本使用子进程来调用非常嘈杂的linux实用程序。我想将所有输出存储到日志文件中,并向用户显示其中的一些内容。我以为下面的方法可以工作,但是直到实用程序产生大量输出后,输出才出现在我的应用程序中。

#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
   print hex(i)*512
   i += 1
   time.sleep(0.5)

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
   #the real code does filtering here
   print "test:", line.rstrip()

我真正想要的行为是过滤器脚本打印从子流程接收到的每一行。Sorta像是做什么tee,但带有python代码。

我想念什么?这有可能吗?


更新:

如果将a sys.stdout.flush()添加到fake_utility.py中,则代码在python 3.1中具有所需的行为。我正在使用python 2.6。您可能会认为使用proc.stdout.xreadlines()与py3k相同,但事实并非如此。


更新2:

这是最小的工作代码。

#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
   print i
   sys.stdout.flush()
   time.sleep(0.5)

#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
   print line.rstrip()

My python script uses subprocess to call a linux utility that is very noisy. I want to store all of the output to a log file and show some of it to the user. I thought the following would work, but the output doesn’t show up in my application until the utility has produced a significant amount of output.

#fake_utility.py, just generates lots of output over time
import time
i = 0
while True:
   print hex(i)*512
   i += 1
   time.sleep(0.5)

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
for line in proc.stdout:
   #the real code does filtering here
   print "test:", line.rstrip()

The behavior I really want is for the filter script to print each line as it is received from the subprocess. Sorta like what tee does but with python code.

What am I missing? Is this even possible?


Update:

If a sys.stdout.flush() is added to fake_utility.py, the code has the desired behavior in python 3.1. I’m using python 2.6. You would think that using proc.stdout.xreadlines() would work the same as py3k, but it doesn’t.


Update 2:

Here is the minimal working code.

#fake_utility.py, just generates lots of output over time
import sys, time
for i in range(10):
   print i
   sys.stdout.flush()
   time.sleep(0.5)

#display out put line by line
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
#works in python 3.0+
#for line in proc.stdout:
for line in iter(proc.stdout.readline,''):
   print line.rstrip()

回答 0

自从我上一次使用Python以来已经很长时间了,但是我认为问题出在语句for line in proc.stdout,该语句在迭代之前读取整个输入。解决方案是改为使用readline()

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

当然,您仍然必须处理子进程的缓冲。

注意:根据文档,使用迭代器的解决方案应该与使用等效readline(),除了预读缓冲区外,但是(或者正因为如此)建议的更改确实为我带来了不同的结果(Windows XP上为Python 2.5)。

It’s been a long time since I last worked with Python, but I think the problem is with the statement for line in proc.stdout, which reads the entire input before iterating over it. The solution is to use readline() instead:

#filters output
import subprocess
proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)
while True:
  line = proc.stdout.readline()
  if not line:
    break
  #the real code does filtering here
  print "test:", line.rstrip()

Of course you still have to deal with the subprocess’ buffering.

Note: according to the documentation the solution with an iterator should be equivalent to using readline(), except for the read-ahead buffer, but (or exactly because of this) the proposed change did produce different results for me (Python 2.5 on Windows XP).


回答 1

参加聚会有点晚,但是很惊讶没有看到我认为这是最简单的解决方案:

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

(这需要Python3。)

Bit late to the party, but was surprised not to see what I think is the simplest solution here:

import io
import subprocess

proc = subprocess.Popen(["prog", "arg"], stdout=subprocess.PIPE)
for line in io.TextIOWrapper(proc.stdout, encoding="utf-8"):  # or another encoding
    # do something with line

(This requires Python 3.)


回答 2

确实,如果您整理出迭代器,那么缓冲现在可能是您的问题。您可以告诉子进程中的python不要缓冲其输出。

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

变成

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

从python内部调用python时,我需要此功能。

Indeed, if you sorted out the iterator then buffering could now be your problem. You could tell the python in the sub-process not to buffer its output.

proc = subprocess.Popen(['python','fake_utility.py'],stdout=subprocess.PIPE)

becomes

proc = subprocess.Popen(['python','-u', 'fake_utility.py'],stdout=subprocess.PIPE)

I have needed this when calling python from within python.


回答 3

您想将这些额外的参数传递给subprocess.Popen

bufsize=1, universal_newlines=True

然后,您可以像示例中那样进行迭代。(使用Python 3.5测试)

You want to pass these extra parameters to subprocess.Popen:

bufsize=1, universal_newlines=True

Then you can iterate as in your example. (Tested with Python 3.5)


回答 4

允许逐行实时地同时迭代stdout和迭代的功能stderr

万一您需要同时获取stdoutstderr在同一时间,你可以使用下面的函数。

该函数使用队列将两个Popen管道合并为一个迭代器。

在这里,我们创建函数read_popen_pipes()

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

read_popen_pipes() 正在使用:

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code

A function that allows iterating over both stdout and stderr concurrently, in realtime, line by line

In case you need to get the output stream for both stdout and stderr at the same time, you can use the following function.

The function uses Queues to merge both Popen pipes into a single iterator.

Here we create the function read_popen_pipes():

from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
            except Empty:
                pass
            try:
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

read_popen_pipes() in use:

import subprocess as sp


with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):

        # Do stuff with each line, e.g.:
        print(out_line, end='')
        print(err_line, end='')

    return p.poll() # return status-code

回答 5

您也可以读取不带循环的行。在python3.6中工作。

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

You can also read lines w/o loop. Works in python3.6.

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
list_of_byte_strings = process.stdout.readlines()

回答 6

我试图与python3和它的工作,

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0

        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

I tried this with python3 and it worked, source

def output_reader(proc):
    for line in iter(proc.stdout.readline, b''):
        print('got line: {0}'.format(line.decode('utf-8')), end='')


def main():
    proc = subprocess.Popen(['python', 'fake_utility.py'],
                            stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)

    t = threading.Thread(target=output_reader, args=(proc,))
    t.start()

    try:
        time.sleep(0.2)
        import time
        i = 0

        while True:
        print (hex(i)*512)
        i += 1
        time.sleep(0.5)
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=0.2)
            print('== subprocess exited with rc =', proc.returncode)
        except subprocess.TimeoutExpired:
            print('subprocess did not terminate in time')
    t.join()

回答 7

Rômulo答案的以下修改对我适用于Python 2和3(2.7.12和3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break

The following modification of Rômulo’s answer works for me on Python 2 and 3 (2.7.12 and 3.6.1):

import os
import subprocess

process = subprocess.Popen(command, stdout=subprocess.PIPE)
while True:
  line = process.stdout.readline()
  if line != '':
    os.write(1, line)
  else:
    break

回答 8

Dunno(已将其添加到子流程模块中),但使用Python 3时,您可以使用proc.stdout.splitlines()

for line in proc.stdout.splitlines():
   print "stdout:", line

Dunno when this has been added to the subprocess module, but with Python 3 you should be fine with using proc.stdout.splitlines():

for line in proc.stdout.splitlines():
   print "stdout:", line

具有修改后环境的Python子进程/ Popen

问题:具有修改后环境的Python子进程/ Popen

我认为在环境稍有修改的情况下运行外部命令是很常见的情况。这就是我倾向于这样做的方式:

import subprocess, os
my_env = os.environ
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)

我感觉到有更好的方法了。看起来还好吗?

I believe that running an external command with a slightly modified environment is a very common case. That’s how I tend to do it:

import subprocess, os
my_env = os.environ
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)

I’ve got a gut feeling that there’s a better way; does it look alright?


回答 0

我认为os.environ.copy()如果您不打算为当前进程修改os.environ会更好:

import subprocess, os
my_env = os.environ.copy()
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)

I think os.environ.copy() is better if you don’t intend to modify the os.environ for the current process:

import subprocess, os
my_env = os.environ.copy()
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)

回答 1

这取决于问题所在。如果要克隆和修改环境,一种解决方案可能是:

subprocess.Popen(my_command, env=dict(os.environ, PATH="path"))

但这在某种程度上取决于被替换的变量是有效的python标识符,它们通常是有效的(您遇到频率不是字母数字+下划线的环境变量名称还是以数字开头的变量的频率是多少?)。

否则,您将可以编写如下内容:

subprocess.Popen(my_command, env=dict(os.environ, 
                                      **{"Not valid python name":"value"}))

在非常奇怪的情况下(您多久在环境变量名称中使用控制代码或非ASCII字符?),bytes您甚至(在python3上)都无法使用该结构的环境键。

正如您所看到的,此处使用的技术(尤其是第一种)对环境键的好处通常是有效的python标识符,并且也预先知道(在编码时),第二种方法存在问题。如果不是这种情况,您可能应该寻找另一种方法

That depends on what the issue is. If it’s to clone and modify the environment one solution could be:

subprocess.Popen(my_command, env=dict(os.environ, PATH="path"))

But that somewhat depends on that the replaced variables are valid python identifiers, which they most often are (how often do you run into environment variable names that are not alphanumeric+underscore or variables that starts with a number?).

Otherwise you’ll could write something like:

subprocess.Popen(my_command, env=dict(os.environ, 
                                      **{"Not valid python name":"value"}))

In the very odd case (how often do you use control codes or non-ascii characters in environment variable names?) that the keys of the environment are bytes you can’t (on python3) even use that construct.

As you can see the techniques (especially the first) used here benefits on the keys of the environment normally is valid python identifiers, and also known in advance (at coding time), the second approach has issues. In cases where that isn’t the case you should probably look for another approach.


回答 2

您可以使用它,my_env.get("PATH", '')而不是my_env["PATH"]PATH某种方式在原始环境中未定义,但除此之外,它看起来还不错。

you might use my_env.get("PATH", '') instead of my_env["PATH"] in case PATH somehow not defined in the original environment, but other than that it looks fine.


回答 3

使用Python 3.5,您可以通过以下方式实现:

import os
import subprocess

my_env = {**os.environ, 'PATH': '/usr/sbin:/sbin:' + os.environ['PATH']}

subprocess.Popen(my_command, env=my_env)

在这里,我们得到的副本os.environ并覆盖了PATH值。

PEP 448(其他拆包概述)使之成为可能。

另一个例子。如果您具有默认环境(例如os.environ),并且想要使用默认值覆盖字典,则可以这样表示:

my_env = {**os.environ, **dict_with_env_variables}

With Python 3.5 you could do it this way:

import os
import subprocess

my_env = {**os.environ, 'PATH': '/usr/sbin:/sbin:' + os.environ['PATH']}

subprocess.Popen(my_command, env=my_env)

Here we end up with a copy of os.environ and overridden PATH value.

It was made possible by PEP 448 (Additional Unpacking Generalizations).

Another example. If you have a default environment (i.e. os.environ), and a dict you want to override defaults with, you can express it like this:

my_env = {**os.environ, **dict_with_env_variables}

回答 4

要临时设置环境变量而不必复制os.envrion对象等,我可以这样做:

process = subprocess.Popen(['env', 'RSYNC_PASSWORD=foobar', 'rsync', \
'rsync://username@foobar.com::'], stdout=subprocess.PIPE)

To temporarily set an environment variable without having to copy the os.envrion object etc, I do this:

process = subprocess.Popen(['env', 'RSYNC_PASSWORD=foobar', 'rsync', \
'rsync://username@foobar.com::'], stdout=subprocess.PIPE)

回答 5

env参数接受字典。您可以简单地使用os.environ,在其中添加键(如果需要,可以将其添加到dict的副本中)(用作键)Popen

The env parameter accepts a dictionary. You can simply take os.environ, add a key (your desired variable) (to a copy of the dict if you must) to that and use it as a parameter to Popen.


回答 6

我知道已经回答了一段时间,但是有些人可能想知道一些有关在环境变量中使用PYTHONPATH而不是PATH的知识。我已经概述了使用cronjobs运行python脚本的说明,该说明以另一种方式(在此处找到)处理修改后的环境。认为这对像我这样需要的人仅比此答案所提供的要多。

I know this has been answered for some time, but there are some points that some may want to know about using PYTHONPATH instead of PATH in their environment variable. I have outlined an explanation of running python scripts with cronjobs that deals with the modified environment in a different way (found here). Thought it would be of some good for those who, like me, needed just a bit more than this answer provided.


回答 7

在某些情况下,您可能只希望传递子流程所需的环境变量,但是我认为您通常具有正确的想法(这也是我的做法)。

In certain circumstances you may want to only pass down the environment variables your subprocess needs, but I think you’ve got the right idea in general (that’s how I do it too).


如何在Python 2.7中隐藏子进程的输出

问题:如何在Python 2.7中隐藏子进程的输出

我在Ubuntu上使用eSpeak,并且有一个Python 2.7脚本可以打印和显示一条消息:

import subprocess
text = 'Hello World.'
print text
subprocess.call(['espeak', text])

eSpeak产生所需的声音,但由于一些错误(ALSA lib …,没有套接字连接)而使外壳混乱,因此我无法轻松读取之前打印的内容。退出代码为0。

不幸的是,没有记录的选项可以关闭其详细信息,因此我正在寻找一种方法,仅在视觉上使其静音并保持打开的外壳干净以进行进一步的交互。

我怎样才能做到这一点?

I’m using eSpeak on Ubuntu and have a Python 2.7 script that prints and speaks a message:

import subprocess
text = 'Hello World.'
print text
subprocess.call(['espeak', text])

eSpeak produces the desired sounds, but clutters the shell with some errors (ALSA lib…, no socket connect) so i cannot easily read what was printed earlier. Exit code is 0.

Unfortunately there is no documented option to turn off its verbosity, so I’m looking for a way to only visually silence it and keep the open shell clean for further interaction.

How can I do this?


回答 0

将输出重定向到DEVNULL:

import os
import subprocess

FNULL = open(os.devnull, 'w')
retcode = subprocess.call(['echo', 'foo'], stdout=FNULL, stderr=subprocess.STDOUT)

它实际上与运行此shell命令相同:

retcode = os.system("echo 'foo' &> /dev/null")

Redirect the output to DEVNULL:

import os
import subprocess

FNULL = open(os.devnull, 'w')
retcode = subprocess.call(['echo', 'foo'], 
    stdout=FNULL, 
    stderr=subprocess.STDOUT)

It is effectively the same as running this shell command:

retcode = os.system("echo 'foo' &> /dev/null")

Update: This answer applies to the original question relating to python 2.7. As of python >= 3.3 an official subprocess.DEVNULL symbol was added.

retcode = subprocess.call(['echo', 'foo'], 
    stdout=subprocess.DEVNULL, 
    stderr=subprocess.STDOUT)

回答 1

这是一个更便携的版本(只是为了好玩,在您的情况下没有必要):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from subprocess import Popen, PIPE, STDOUT

try:
    from subprocess import DEVNULL # py3k
except ImportError:
    import os
    DEVNULL = open(os.devnull, 'wb')

text = u"René Descartes"
p = Popen(['espeak', '-b', '1'], stdin=PIPE, stdout=DEVNULL, stderr=STDOUT)
p.communicate(text.encode('utf-8'))
assert p.returncode == 0 # use appropriate for your program error handling here

Here’s a more portable version (just for fun, it is not necessary in your case):

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from subprocess import Popen, PIPE, STDOUT

try:
    from subprocess import DEVNULL # py3k
except ImportError:
    import os
    DEVNULL = open(os.devnull, 'wb')

text = u"René Descartes"
p = Popen(['espeak', '-b', '1'], stdin=PIPE, stdout=DEVNULL, stderr=STDOUT)
p.communicate(text.encode('utf-8'))
assert p.returncode == 0 # use appropriate for your program error handling here

回答 2

使用subprocess.check_output(python 2.7中的新功能)。如果命令失败,它将抑制stdout并引发异常。(它实际上返回stdout的内容,因此您可以在以后的程序中使用它。)例如:

import subprocess
try:
    subprocess.check_output(['espeak', text])
except subprocess.CalledProcessError:
    # Do something

您还可以使用以下命令抑制stderr:

    subprocess.check_output(["espeak", text], stderr=subprocess.STDOUT)

对于2.7之前的版本,请使用

import os
import subprocess
with open(os.devnull, 'w')  as FNULL:
    try:
        subprocess._check_call(['espeak', text], stdout=FNULL)
    except subprocess.CalledProcessError:
        # Do something

在这里,您可以使用

        subprocess._check_call(['espeak', text], stdout=FNULL, stderr=FNULL)

Use subprocess.check_output (new in python 2.7). It will suppress stdout and raise an exception if the command fails. (It actually returns the contents of stdout, so you can use that later in your program if you want.) Example:

import subprocess
try:
    subprocess.check_output(['espeak', text])
except subprocess.CalledProcessError:
    # Do something

You can also suppress stderr with:

    subprocess.check_output(["espeak", text], stderr=subprocess.STDOUT)

For earlier than 2.7, use

import os
import subprocess
with open(os.devnull, 'w')  as FNULL:
    try:
        subprocess._check_call(['espeak', text], stdout=FNULL)
    except subprocess.CalledProcessError:
        # Do something

Here, you can suppress stderr with

        subprocess._check_call(['espeak', text], stdout=FNULL, stderr=FNULL)

回答 3

由于Python3你不再需要打开devnull并且可以调用subprocess.DEVNULL

您的代码将这样更新:

import subprocess
text = 'Hello World.'
print(text)
subprocess.call(['espeak', text], stderr=subprocess.DEVNULL)

As of Python3 you no longer need to open devnull and can call subprocess.DEVNULL.

Your code would be updated as such:

import subprocess
text = 'Hello World.'
print(text)
subprocess.call(['espeak', text], stderr=subprocess.DEVNULL)

回答 4

为什么不使用commands.getoutput()代替呢?

import commands

text = "Mario Balotelli" 
output = 'espeak "%s"' % text
print text
a = commands.getoutput(output)

Why not use commands.getoutput() instead?

import commands

text = "Mario Balotelli" 
output = 'espeak "%s"' % text
print text
a = commands.getoutput(output)

如何将字符串传递到subprocess.Popen(使用stdin参数)?

问题:如何将字符串传递到subprocess.Popen(使用stdin参数)?

如果我执行以下操作:

import subprocess
from cStringIO import StringIO
subprocess.Popen(['grep','f'],stdout=subprocess.PIPE,stdin=StringIO('one\ntwo\nthree\nfour\nfive\nsix\n')).communicate()[0]

我得到:

Traceback (most recent call last):
  File "<stdin>", line 1, in ?
  File "/build/toolchain/mac32/python-2.4.3/lib/python2.4/subprocess.py", line 533, in __init__
    (p2cread, p2cwrite,
  File "/build/toolchain/mac32/python-2.4.3/lib/python2.4/subprocess.py", line 830, in _get_handles
    p2cread = stdin.fileno()
AttributeError: 'cStringIO.StringI' object has no attribute 'fileno'

显然,cStringIO.StringIO对象没有足够接近库中的子程序来适应subprocess.Popen。我该如何解决?

If I do the following:

import subprocess
from cStringIO import StringIO
subprocess.Popen(['grep','f'],stdout=subprocess.PIPE,stdin=StringIO('one\ntwo\nthree\nfour\nfive\nsix\n')).communicate()[0]

I get:

Traceback (most recent call last):
  File "<stdin>", line 1, in ?
  File "/build/toolchain/mac32/python-2.4.3/lib/python2.4/subprocess.py", line 533, in __init__
    (p2cread, p2cwrite,
  File "/build/toolchain/mac32/python-2.4.3/lib/python2.4/subprocess.py", line 830, in _get_handles
    p2cread = stdin.fileno()
AttributeError: 'cStringIO.StringI' object has no attribute 'fileno'

Apparently a cStringIO.StringIO object doesn’t quack close enough to a file duck to suit subprocess.Popen. How do I work around this?


回答 0

Popen.communicate() 说明文件:

请注意,如果要将数据发送到进程的stdin,则需要使用stdin = PIPE创建Popen对象。同样,要在结果元组中获得除None以外的任何内容,您还需要提供stdout = PIPE和/或stderr = PIPE。

替换os.popen *

    pipe = os.popen(cmd, 'w', bufsize)
    # ==>
    pipe = Popen(cmd, shell=True, bufsize=bufsize, stdin=PIPE).stdin

警告使用communication()而不是stdin.write(),stdout.read()或stderr.read()以避免死锁,因为其他任何OS管道缓冲区都填满并阻塞了子进程。

因此,您的示例可以编写如下:

from subprocess import Popen, PIPE, STDOUT

p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=STDOUT)    
grep_stdout = p.communicate(input=b'one\ntwo\nthree\nfour\nfive\nsix\n')[0]
print(grep_stdout.decode())
# -> four
# -> five
# ->

在当前的Python 3版本中,您可以使用subprocess.run,将输入作为字符串传递给外部命令并获取其退出状态,并在一次调用中将输出作为字符串返回:

#!/usr/bin/env python3
from subprocess import run, PIPE

p = run(['grep', 'f'], stdout=PIPE,
        input='one\ntwo\nthree\nfour\nfive\nsix\n', encoding='ascii')
print(p.returncode)
# -> 0
print(p.stdout)
# -> four
# -> five
# -> 

Popen.communicate() documentation:

Note that if you want to send data to the process’s stdin, you need to create the Popen object with stdin=PIPE. Similarly, to get anything other than None in the result tuple, you need to give stdout=PIPE and/or stderr=PIPE too.

Replacing os.popen*

    pipe = os.popen(cmd, 'w', bufsize)
    # ==>
    pipe = Popen(cmd, shell=True, bufsize=bufsize, stdin=PIPE).stdin

Warning Use communicate() rather than stdin.write(), stdout.read() or stderr.read() to avoid deadlocks due to any of the other OS pipe buffers filling up and blocking the child process.

So your example could be written as follows:

from subprocess import Popen, PIPE, STDOUT

p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=STDOUT)    
grep_stdout = p.communicate(input=b'one\ntwo\nthree\nfour\nfive\nsix\n')[0]
print(grep_stdout.decode())
# -> four
# -> five
# ->

On Python 3.5+ (3.6+ for encoding), you could use subprocess.run, to pass input as a string to an external command and get its exit status, and its output as a string back in one call:

#!/usr/bin/env python3
from subprocess import run, PIPE

p = run(['grep', 'f'], stdout=PIPE,
        input='one\ntwo\nthree\nfour\nfive\nsix\n', encoding='ascii')
print(p.returncode)
# -> 0
print(p.stdout)
# -> four
# -> five
# -> 

回答 1

我想出了解决方法:

>>> p = subprocess.Popen(['grep','f'],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
>>> p.stdin.write(b'one\ntwo\nthree\nfour\nfive\nsix\n') #expects a bytes type object
>>> p.communicate()[0]
'four\nfive\n'
>>> p.stdin.close()

有更好的吗?

I figured out this workaround:

>>> p = subprocess.Popen(['grep','f'],stdout=subprocess.PIPE,stdin=subprocess.PIPE)
>>> p.stdin.write(b'one\ntwo\nthree\nfour\nfive\nsix\n') #expects a bytes type object
>>> p.communicate()[0]
'four\nfive\n'
>>> p.stdin.close()

Is there a better one?


回答 2

令我惊讶的是,没有人建议创建管道,这是将字符串传递给子流程的stdin的最简单方法:

read, write = os.pipe()
os.write(write, "stdin input here")
os.close(write)

subprocess.check_call(['your-command'], stdin=read)

I’m a bit surprised nobody suggested creating a pipe, which is in my opinion the far simplest way to pass a string to stdin of a subprocess:

read, write = os.pipe()
os.write(write, "stdin input here")
os.close(write)

subprocess.check_call(['your-command'], stdin=read)

回答 3

如果您使用的是Python 3.4或更高版本,则有一个不错的解决方案。使用input参数代替stdin参数,该参数接受一个字节参数:

output = subprocess.check_output(
    ["sed", "s/foo/bar/"],
    input=b"foo",
)

这适用于check_outputrun,但不call还是check_call出于某种原因。

There’s a beautiful solution if you’re using Python 3.4 or better. Use the input argument instead of the stdin argument, which accepts a bytes argument:

output = subprocess.check_output(
    ["sed", "s/foo/bar/"],
    input=b"foo",
)

This works for check_output and run, but not call or check_call for some reason.


回答 4

我正在使用python3,发现您需要先对字符串进行编码,然后才能将其传递到stdin中:

p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=PIPE)
out, err = p.communicate(input='one\ntwo\nthree\nfour\nfive\nsix\n'.encode())
print(out)

I am using python3 and found out that you need to encode your string before you can pass it into stdin:

p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=PIPE)
out, err = p.communicate(input='one\ntwo\nthree\nfour\nfive\nsix\n'.encode())
print(out)

回答 5

显然,cStringIO.StringIO对象没有足够接近库中的子文件来适应子进程。

恐怕不是。管道是低级OS概念,因此绝对需要由OS级文件描述符表示的文件对象。您的解决方法是正确的。

Apparently a cStringIO.StringIO object doesn’t quack close enough to a file duck to suit subprocess.Popen

I’m afraid not. The pipe is a low-level OS concept, so it absolutely requires a file object that is represented by an OS-level file descriptor. Your workaround is the right one.


回答 6

from subprocess import Popen, PIPE
from tempfile import SpooledTemporaryFile as tempfile
f = tempfile()
f.write('one\ntwo\nthree\nfour\nfive\nsix\n')
f.seek(0)
print Popen(['/bin/grep','f'],stdout=PIPE,stdin=f).stdout.read()
f.close()
from subprocess import Popen, PIPE
from tempfile import SpooledTemporaryFile as tempfile
f = tempfile()
f.write('one\ntwo\nthree\nfour\nfive\nsix\n')
f.seek(0)
print Popen(['/bin/grep','f'],stdout=PIPE,stdin=f).stdout.read()
f.close()

回答 7

请注意,Popen.communicate(input=s)如果s太大,可能会给您带来麻烦,因为显然父进程会派生子进程之前对其进行缓冲,这意味着此时它需要“两倍多”的已用内存(至少根据“幕后”的解释)以及在此处找到的链接文档)。在我的特定情况下,s是一个生成器,它首先被完全扩展,然后才被写入,stdin因此在生成子代之前,父进程非常庞大,并且没有内存可以分叉它:

File "/opt/local/stow/python-2.7.2/lib/python2.7/subprocess.py", line 1130, in _execute_child self.pid = os.fork() OSError: [Errno 12] Cannot allocate memory

Beware that Popen.communicate(input=s)may give you trouble ifsis too big, because apparently the parent process will buffer it before forking the child subprocess, meaning it needs “twice as much” used memory at that point (at least according to the “under the hood” explanation and linked documentation found here). In my particular case,swas a generator that was first fully expanded and only then written tostdin so the parent process was huge right before the child was spawned, and no memory was left to fork it:

File "/opt/local/stow/python-2.7.2/lib/python2.7/subprocess.py", line 1130, in _execute_child self.pid = os.fork() OSError: [Errno 12] Cannot allocate memory


回答 8

"""
Ex: Dialog (2-way) with a Popen()
"""

p = subprocess.Popen('Your Command Here',
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 stdin=PIPE,
                 shell=True,
                 bufsize=0)
p.stdin.write('START\n')
out = p.stdout.readline()
while out:
  line = out
  line = line.rstrip("\n")

  if "WHATEVER1" in line:
      pr = 1
      p.stdin.write('DO 1\n')
      out = p.stdout.readline()
      continue

  if "WHATEVER2" in line:
      pr = 2
      p.stdin.write('DO 2\n')
      out = p.stdout.readline()
      continue
"""
..........
"""

out = p.stdout.readline()

p.wait()
"""
Ex: Dialog (2-way) with a Popen()
"""

p = subprocess.Popen('Your Command Here',
                 stdout=subprocess.PIPE,
                 stderr=subprocess.STDOUT,
                 stdin=PIPE,
                 shell=True,
                 bufsize=0)
p.stdin.write('START\n')
out = p.stdout.readline()
while out:
  line = out
  line = line.rstrip("\n")

  if "WHATEVER1" in line:
      pr = 1
      p.stdin.write('DO 1\n')
      out = p.stdout.readline()
      continue

  if "WHATEVER2" in line:
      pr = 2
      p.stdin.write('DO 2\n')
      out = p.stdout.readline()
      continue
"""
..........
"""

out = p.stdout.readline()

p.wait()

回答 9

p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=STDOUT)    
p.stdin.write('one\n')
time.sleep(0.5)
p.stdin.write('two\n')
time.sleep(0.5)
p.stdin.write('three\n')
time.sleep(0.5)
testresult = p.communicate()[0]
time.sleep(0.5)
print(testresult)
p = Popen(['grep', 'f'], stdout=PIPE, stdin=PIPE, stderr=STDOUT)    
p.stdin.write('one\n')
time.sleep(0.5)
p.stdin.write('two\n')
time.sleep(0.5)
p.stdin.write('three\n')
time.sleep(0.5)
testresult = p.communicate()[0]
time.sleep(0.5)
print(testresult)

回答 10

在Python 3.7+上执行以下操作:

my_data = "whatever you want\nshould match this f"
subprocess.run(["grep", "f"], text=True, input=my_data)

并且您可能想要添加capture_output=True以获取以字符串形式运行命令的输出。

在旧版本的Python上,替换text=Trueuniversal_newlines=True

subprocess.run(["grep", "f"], universal_newlines=True, input=my_data)

On Python 3.7+ do this:

my_data = "whatever you want\nshould match this f"
subprocess.run(["grep", "f"], text=True, input=my_data)

and you’ll probably want to add capture_output=True to get the output of running the command as a string.

On older versions of Python, replace text=True with universal_newlines=True:

subprocess.run(["grep", "f"], universal_newlines=True, input=my_data)

子过程中“ shell = True”的实际含义

问题:子过程中“ shell = True”的实际含义

我正在使用该subprocess模块调用不同的进程。但是,我有一个问题。

在以下代码中:

callProcess = subprocess.Popen(['ls', '-l'], shell=True)

callProcess = subprocess.Popen(['ls', '-l']) # without shell

两者都可以。阅读文档后,我知道这shell=True意味着通过外壳执行代码。因此,这意味着在不存在的情况下,该过程将直接启动。

因此,对于我的情况,我更喜欢什么-我需要运行一个流程并获取其输出。从外壳内部或外部调用它有什么好处。

I am calling different processes with the subprocess module. However, I have a question.

In the following codes:

callProcess = subprocess.Popen(['ls', '-l'], shell=True)

and

callProcess = subprocess.Popen(['ls', '-l']) # without shell

Both work. After reading the docs, I came to know that shell=True means executing the code through the shell. So that means in absence, the process is directly started.

So what should I prefer for my case – I need to run a process and get its output. What benefit do I have from calling it from within the shell or outside of it.


回答 0

不通过外壳调用的好处是您没有在调用“神秘程序”。在POSIX上,环境变量SHELL控制哪个二进制文件作为“外壳”被调用。在Windows上,没有bourne shell后代,只有cmd.exe。

因此,调用外壳程序将调用用户选择的程序,并且该程序与平台有关。一般来说,避免通过外壳调用。

通过shell调用确实允许您根据shell的通常机制扩展环境变量和文件glob。在POSIX系统上,外壳程序将文件全局扩展为文件列表。在Windows上,无论如何,shell都不会扩展文件glob(例如“ *。*”)(但是cmd.exe 扩展命令行上的环境变量)。

如果您认为需要环境变量扩展和文件文件,请研究ILS1992-ish对网络服务的攻击,这些攻击通过外壳执行子程序调用。例子包括sendmail涉及各种后门ILS

总之,使用shell=False

The benefit of not calling via the shell is that you are not invoking a ‘mystery program.’ On POSIX, the environment variable SHELL controls which binary is invoked as the “shell.” On Windows, there is no bourne shell descendent, only cmd.exe.

So invoking the shell invokes a program of the user’s choosing and is platform-dependent. Generally speaking, avoid invocations via the shell.

Invoking via the shell does allow you to expand environment variables and file globs according to the shell’s usual mechanism. On POSIX systems, the shell expands file globs to a list of files. On Windows, a file glob (e.g., “*.*”) is not expanded by the shell, anyway (but environment variables on a command line are expanded by cmd.exe).

If you think you want environment variable expansions and file globs, research the ILS attacks of 1992-ish on network services which performed subprogram invocations via the shell. Examples include the various sendmail backdoors involving ILS.

In summary, use shell=False.


回答 1

>>> import subprocess
>>> subprocess.call('echo $HOME')
Traceback (most recent call last):
...
OSError: [Errno 2] No such file or directory
>>>
>>> subprocess.call('echo $HOME', shell=True)
/user/khong
0

将shell参数设置为true值会导致子进程产生一个中间shell进程,并告诉它运行命令。换句话说,使用中间外壳程序意味着在运行命令之前先处理命令​​字符串中的变量,全局模式和其他特殊外壳程序功能。在此示例中,$ HOME是在echo命令之前处理的。实际上,这是带有shell扩展的命令的情况,而命令ls -l被视为简单命令。

来源:子流程模块

>>> import subprocess
>>> subprocess.call('echo $HOME')
Traceback (most recent call last):
...
OSError: [Errno 2] No such file or directory
>>>
>>> subprocess.call('echo $HOME', shell=True)
/user/khong
0

Setting the shell argument to a true value causes subprocess to spawn an intermediate shell process, and tell it to run the command. In other words, using an intermediate shell means that variables, glob patterns, and other special shell features in the command string are processed before the command is run. Here, in the example, $HOME was processed before the echo command. Actually, this is the case of command with shell expansion while the command ls -l considered as a simple command.

source: Subprocess Module


回答 2

此处显示了Shell = True可能出错的示例

>>> from subprocess import call
>>> filename = input("What file would you like to display?\n")
What file would you like to display?
non_existent; rm -rf / # THIS WILL DELETE EVERYTHING IN ROOT PARTITION!!!
>>> call("cat " + filename, shell=True) # Uh-oh. This will end badly...

在此处检查文档:subprocess.call()

An example where things could go wrong with Shell=True is shown here

>>> from subprocess import call
>>> filename = input("What file would you like to display?\n")
What file would you like to display?
non_existent; rm -rf / # THIS WILL DELETE EVERYTHING IN ROOT PARTITION!!!
>>> call("cat " + filename, shell=True) # Uh-oh. This will end badly...

Check the doc here: subprocess.call()


回答 3

通过外壳执行程序意味着将根据调用的外壳的语法和语义规则来解释传递给程序的所有用户输入。充其量,这只会给用户带来不便,因为用户必须遵守这些规则。例如,必须转义包含特殊外壳字符(如引号或空格)的路径。最糟糕的是,这会导致安全漏洞,因为用户可以执行任意程序。

shell=True有时可以方便地利用特定的Shell功能(例如单词拆分或参数扩展)。但是,如果需要此功能,则可以利用其他模块(例如,os.path.expandvars()用于参数扩展或shlex字分割)。这意味着更多的工作,但避免了其他问题。

简而言之:务必避免shell=True

Executing programs through the shell means that all user input passed to the program is interpreted according to the syntax and semantic rules of the invoked shell. At best, this only causes inconvenience to the user, because the user has to obey these rules. For instance, paths containing special shell characters like quotation marks or blanks must be escaped. At worst, it causes security leaks, because the user can execute arbitrary programs.

shell=True is sometimes convenient to make use of specific shell features like word splitting or parameter expansion. However, if such a feature is required, make use of other modules are given to you (e.g. os.path.expandvars() for parameter expansion or shlex for word splitting). This means more work, but avoids other problems.

In short: Avoid shell=True by all means.


回答 4

这里的其他答案充分说明了安全警告,subprocess文档中也提到了这些警告。但是除此之外,启动外壳程序以启动要运行的程序的开销通常是不必要的,对于您实际上没有使用任何外壳程序功能的情况而言,这绝对是愚蠢的。而且,额外的隐藏复杂性会让您感到恐惧,特别是如果您对外壳程序或它提供的服务不是很熟悉的话。

在与shell的交互非常简单的地方,您现在需要Python脚本的阅读者和维护者(可能是您将来的自己,也可能不是您将来的自己)来理解Python和shell脚本。记住Python的座右铭“明确胜于隐含”。即使Python代码比等效的(并且通常非常简洁)shell脚本要复杂一些,您最好还是删除外壳并用本机Python构造替换功能。尽量减少在外部过程中完成的工作并尽可能地将控制保持在自己的代码中通常是一个好主意,这仅仅是因为它可以提高可见性并减少副作用(有害的或有害的)的风险。

通配符扩展,变量插值和重定向都很容易用本机Python构造替换。复杂的Shell管道(其中的部分或全部无法用Python合理地重写)可能是您可以考虑使用Shell的一种情况。您仍然应该确保您了解性能和安全性含义。

在平凡的情况下,要避免shell=True,只需替换

subprocess.Popen("command -with -options 'like this' and\\ an\\ argument", shell=True)

subprocess.Popen(['command', '-with','-options', 'like this', 'and an argument'])

请注意,第一个参数如何是要传递给的字符串列表execvp(),以及通常如何不需要引用字符串和使用反斜杠的shell元字符(或有用或正确)。也许还会看到何时将引号括在shell变量周围?

顺便说一句,您经常会想避免Popen包装中的一个较简单的包装subprocess程序满足您的要求。如果您的Python版本足够新,则应该使用subprocess.run

  • 随着check=True如果命令你运行失败就会失败。
  • 有了stdout=subprocess.PIPE这将捕获命令的输出。
  • 有点晦涩难懂,universal_newlines=True它将输出解码为正确的Unicode字符串(bytes否则,在Python 3中只是在系统编码中)。

如果不是这样,则对于许多任务,您希望check_output从命令获取输出,同时检查命令是否成功,或者check_call是否没有要收集的输出。

最后,我引用大卫·科恩(David Korn)的话说:“编写可移植的shell比移植可移植的shell脚本容易。” 甚至subprocess.run('echo "$HOME"', shell=True)不能移植到Windows。

The other answers here adequately explain the security caveats which are also mentioned in the subprocess documentation. But in addition to that, the overhead of starting a shell to start the program you want to run is often unnecessary and definitely silly for situations where you don’t actually use any of the shell’s functionality. Moreover, the additional hidden complexity should scare you, especially if you are not very familiar with the shell or the services it provides.

Where the interactions with the shell are nontrivial, you now require the reader and maintainer of the Python script (which may or may not be your future self) to understand both Python and shell script. Remember the Python motto “explicit is better than implicit”; even when the Python code is going to be somewhat more complex than the equivalent (and often very terse) shell script, you might be better off removing the shell and replacing the functionality with native Python constructs. Minimizing the work done in an external process and keeping control within your own code as far as possible is often a good idea simply because it improves visibility and reduces the risks of — wanted or unwanted — side effects.

Wildcard expansion, variable interpolation, and redirection are all simple to replace with native Python constructs. A complex shell pipeline where parts or all cannot be reasonably rewritten in Python would be the one situation where perhaps you could consider using the shell. You should still make sure you understand the performance and security implications.

In the trivial case, to avoid shell=True, simply replace

subprocess.Popen("command -with -options 'like this' and\\ an\\ argument", shell=True)

with

subprocess.Popen(['command', '-with','-options', 'like this', 'and an argument'])

Notice how the first argument is a list of strings to pass to execvp(), and how quoting strings and backslash-escaping shell metacharacters is generally not necessary (or useful, or correct). Maybe see also When to wrap quotes around a shell variable?

If you don’t want to figure this out yourself, the shlex.split() function can do this for you. It’s part of the Python standard library, but of course, if your shell command string is static, you can just run it once, during development, and paste the result into your script.

As an aside, you very often want to avoid Popen if one of the simpler wrappers in the subprocess package does what you want. If you have a recent enough Python, you should probably use subprocess.run.

  • With check=True it will fail if the command you ran failed.
  • With stdout=subprocess.PIPE it will capture the command’s output.
  • With text=True (or somewhat obscurely, with the synonym universal_newlines=True) it will decode output into a proper Unicode string (it’s just bytes in the system encoding otherwise, on Python 3).

If not, for many tasks, you want check_output to obtain the output from a command, whilst checking that it succeeded, or check_call if there is no output to collect.

I’ll close with a quote from David Korn: “It’s easier to write a portable shell than a portable shell script.” Even subprocess.run('echo "$HOME"', shell=True) is not portable to Windows.


如何终止以shell = True启动的python子进程

问题:如何终止以shell = True启动的python子进程

我正在使用以下命令启动子流程:

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)

但是,当我尝试杀死使用:

p.terminate()

要么

p.kill()

该命令一直在后台运行,所以我想知道如何才能真正终止该过程。

请注意,当我使用以下命令运行命令时:

p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)

发出时,它确实成功终止p.terminate()

I’m launching a subprocess with the following command:

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)

However, when I try to kill using:

p.terminate()

or

p.kill()

The command keeps running in the background, so I was wondering how can I actually terminate the process.

Note that when I run the command with:

p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)

It does terminate successfully when issuing the p.terminate().


回答 0

使用进程组,以便能够向中的所有进程发送信号。为此,您应该将会话ID附加到生成的子进程的父进程中,在您的情况下,这是一个外壳程序。这将使其成为流程的小组负责人。因此,现在,当信号发送到流程组负责人时,它便被传输到该组的所有子流程。

这是代码:

import os
import signal
import subprocess

# The os.setsid() is passed in the argument preexec_fn so
# it's run after the fork() and before  exec() to run the shell.
pro = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
                       shell=True, preexec_fn=os.setsid) 

os.killpg(os.getpgid(pro.pid), signal.SIGTERM)  # Send the signal to all the process groups

Use a process group so as to enable sending a signal to all the process in the groups. For that, you should attach a session id to the parent process of the spawned/child processes, which is a shell in your case. This will make it the group leader of the processes. So now, when a signal is sent to the process group leader, it’s transmitted to all of the child processes of this group.

Here’s the code:

import os
import signal
import subprocess

# The os.setsid() is passed in the argument preexec_fn so
# it's run after the fork() and before  exec() to run the shell.
pro = subprocess.Popen(cmd, stdout=subprocess.PIPE, 
                       shell=True, preexec_fn=os.setsid) 

os.killpg(os.getpgid(pro.pid), signal.SIGTERM)  # Send the signal to all the process groups

回答 1

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
p.kill()

p.kill()最终终止了shell进程,cmd并且仍在运行。

我通过以下方法找到了一个方便的解决方法:

p = subprocess.Popen("exec " + cmd, stdout=subprocess.PIPE, shell=True)

这将导致cmd继承shell进程,而不是让shell启动不会被杀死的子进程。 p.pid然后将是您的cmd进程的ID。

p.kill() 应该管用。

我不知道这会对您的管道产生什么影响。

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
p.kill()

p.kill() ends up killing the shell process and cmd is still running.

I found a convenient fix this by:

p = subprocess.Popen("exec " + cmd, stdout=subprocess.PIPE, shell=True)

This will cause cmd to inherit the shell process, instead of having the shell launch a child process, which does not get killed. p.pid will be the id of your cmd process then.

p.kill() should work.

I don’t know what effect this will have on your pipe though.


回答 2

如果可以使用psutil,则可以完美地工作:

import subprocess

import psutil


def kill(proc_pid):
    process = psutil.Process(proc_pid)
    for proc in process.children(recursive=True):
        proc.kill()
    process.kill()


proc = subprocess.Popen(["infinite_app", "param"], shell=True)
try:
    proc.wait(timeout=3)
except subprocess.TimeoutExpired:
    kill(proc.pid)

If you can use psutil, then this works perfectly:

import subprocess

import psutil


def kill(proc_pid):
    process = psutil.Process(proc_pid)
    for proc in process.children(recursive=True):
        proc.kill()
    process.kill()


proc = subprocess.Popen(["infinite_app", "param"], shell=True)
try:
    proc.wait(timeout=3)
except subprocess.TimeoutExpired:
    kill(proc.pid)

回答 3

我可以用

from subprocess import Popen

process = Popen(command, shell=True)
Popen("TASKKILL /F /PID {pid} /T".format(pid=process.pid))

它杀死了cmd.exe我给命令的程序。

(在Windows上)

I could do it using

from subprocess import Popen

process = Popen(command, shell=True)
Popen("TASKKILL /F /PID {pid} /T".format(pid=process.pid))

it killed the cmd.exe and the program that i gave the command for.

(On Windows)


回答 4

shell=Trueshell是子进程时,命令就是它的子进程。因此,任何SIGTERMSIGKILL将杀死外壳程序但不会杀死它的子进程的过程,我不记得有什么好方法。我能想到的最好方法是使用shell=False,否则当您杀死父shell进程时,它将留下一个已失效的shell进程。

When shell=True the shell is the child process, and the commands are its children. So any SIGTERM or SIGKILL will kill the shell but not its child processes, and I don’t remember a good way to do it. The best way I can think of is to use shell=False, otherwise when you kill the parent shell process, it will leave a defunct shell process.


回答 5

这些答案均不适合我,因此我留下了有效的代码。就我而言,即使在终止进程.kill()并获得.poll()返回代码后,进程也没有终止。

按照subprocess.Popen 文档

“ …为了正确清理行为良好的应用程序,应终止子进程并完成通信…”

proc = subprocess.Popen(...)
try:
    outs, errs = proc.communicate(timeout=15)
except TimeoutExpired:
    proc.kill()
    outs, errs = proc.communicate()

就我而言,我proc.communicate()在打电话后就错过了proc.kill()。这将清理进程stdin,stdout …,并终止进程。

None of this answers worked for me so Im leaving the code that did work. In my case even after killing the process with .kill() and getting a .poll() return code the process didn’t terminate.

Following the subprocess.Popen documentation:

“…in order to cleanup properly a well-behaved application should kill the child process and finish communication…”

proc = subprocess.Popen(...)
try:
    outs, errs = proc.communicate(timeout=15)
except TimeoutExpired:
    proc.kill()
    outs, errs = proc.communicate()

In my case I was missing the proc.communicate() after calling proc.kill(). This cleans the process stdin, stdout … and does terminate the process.


回答 6

正如Sai所说,shell是孩子,因此信号被它拦截了-我发现的最佳方法是使用shell = False并使用shlex拆分命令行:

if isinstance(command, unicode):
    cmd = command.encode('utf8')
args = shlex.split(cmd)

p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

然后p.kill()和p.terminate()应该可以按照您的期望工作。

As Sai said, the shell is the child, so signals are intercepted by it — best way I’ve found is to use shell=False and use shlex to split the command line:

if isinstance(command, unicode):
    cmd = command.encode('utf8')
args = shlex.split(cmd)

p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

Then p.kill() and p.terminate() should work how you expect.


回答 7

我觉得我们可以使用:

import os
import signal
import subprocess
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)

os.killpg(os.getpgid(pro.pid), signal.SIGINT)

这不会杀死您的所有任务,但会杀死p.pid进程

what i feel like we could use:

import os
import signal
import subprocess
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)

os.killpg(os.getpgid(pro.pid), signal.SIGINT)

this will not kill all your task but the process with the p.pid


回答 8

我知道这是一个古老的问题,但这可能会对寻求其他方法的人有所帮助。这就是我在Windows上用来杀死已调用进程的方法。

si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.call(["taskkill", "/IM", "robocopy.exe", "/T", "/F"], startupinfo=si)

/ IM是图像名称,如果需要,您也可以执行/ PID。/ T杀死进程以及子进程。/ F强制终止它。如我所设置的,si是如何执行此操作而不显示CMD窗口。此代码在python 3中使用。

I know this is an old question but this may help someone looking for a different method. This is what I use on windows to kill processes that I’ve called.

si = subprocess.STARTUPINFO()
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.call(["taskkill", "/IM", "robocopy.exe", "/T", "/F"], startupinfo=si)

/IM is the image name, you can also do /PID if you want. /T kills the process as well as the child processes. /F force terminates it. si, as I have it set, is how you do this without showing a CMD window. This code is used in python 3.


回答 9

将信号发送到组中的所有进程

    self.proc = Popen(commands, 
            stdout=PIPE, 
            stderr=STDOUT, 
            universal_newlines=True, 
            preexec_fn=os.setsid)

    os.killpg(os.getpgid(self.proc.pid), signal.SIGHUP)
    os.killpg(os.getpgid(self.proc.pid), signal.SIGTERM)

Send the signal to all the processes in group

    self.proc = Popen(commands, 
            stdout=PIPE, 
            stderr=STDOUT, 
            universal_newlines=True, 
            preexec_fn=os.setsid)

    os.killpg(os.getpgid(self.proc.pid), signal.SIGHUP)
    os.killpg(os.getpgid(self.proc.pid), signal.SIGTERM)

回答 10

我在这里没有看到此内容,因此我将其放在此处,以防有人需要。如果您要做的只是确保子流程成功终止,则可以将其放在上下文管理器中。例如,我希望我的标准打印机打印出图像,并使用上下文管理器确保子进程终止。

import subprocess

with open(filename,'rb') as f:
    img=f.read()
with subprocess.Popen("/usr/bin/lpr", stdin=subprocess.PIPE) as lpr:
    lpr.stdin.write(img)
print('Printed image...')

我相信这种方法也是跨平台的。

I have not seen this mentioned here, so I am putting it out there in case someone needs it. If all you want to do is to make sure that your subprocess terminates successfully, you could put it in a context manager. For example, I wanted my standard printer to print an out image and using the context manager ensured that the subprocess terminated.

import subprocess

with open(filename,'rb') as f:
    img=f.read()
with subprocess.Popen("/usr/bin/lpr", stdin=subprocess.PIPE) as lpr:
    lpr.stdin.write(img)
print('Printed image...')

I believe this method is also cross-platform.


将subprocess.Popen调用的输出存储在字符串中

问题:将subprocess.Popen调用的输出存储在字符串中

我正在尝试在Python中进行系统调用,并将输出存储到我可以在Python程序中操作的字符串中。

#!/usr/bin/python
import subprocess
p2 = subprocess.Popen("ntpq -p")

我尝试了一些事情,包括此处的一些建议:

检索subprocess.call()的输出

但没有任何运气。

I’m trying to make a system call in Python and store the output to a string that I can manipulate in the Python program.

#!/usr/bin/python
import subprocess
p2 = subprocess.Popen("ntpq -p")

I’ve tried a few things including some of the suggestions here:

Retrieving the output of subprocess.call()

but without any luck.


回答 0

在Python 2.7或Python 3中

Popen您可以使用subprocess.check_output()函数将命令的输出存储在字符串中,而不是直接创建对象:

from subprocess import check_output
out = check_output(["ntpq", "-p"])

在Python 2.4-2.6中

使用communicate方法。

import subprocess
p = subprocess.Popen(["ntpq", "-p"], stdout=subprocess.PIPE)
out, err = p.communicate()

out 是你想要的。

有关其他答案的重要说明

请注意我如何传递命令。该"ntpq -p"示例提出了另一回事。由于Popen不调用外壳程序,因此您将使用命令和选项列表["ntpq", "-p"]

In Python 2.7 or Python 3

Instead of making a Popen object directly, you can use the subprocess.check_output() function to store output of a command in a string:

from subprocess import check_output
out = check_output(["ntpq", "-p"])

In Python 2.4-2.6

Use the communicate method.

import subprocess
p = subprocess.Popen(["ntpq", "-p"], stdout=subprocess.PIPE)
out, err = p.communicate()

out is what you want.

Important note about the other answers

Note how I passed in the command. The "ntpq -p" example brings up another matter. Since Popen does not invoke the shell, you would use a list of the command and options—["ntpq", "-p"].


回答 1

这为我重定向标准输出工作(stderr可以类似地处理):

from subprocess import Popen, PIPE
pipe = Popen(path, stdout=PIPE)
text = pipe.communicate()[0]

如果对您不起作用,请确切说明您遇到的问题。

This worked for me for redirecting stdout (stderr can be handled similarly):

from subprocess import Popen, PIPE
pipe = Popen(path, stdout=PIPE)
text = pipe.communicate()[0]

If it doesn’t work for you, please specify exactly the problem you’re having.


回答 2

假设这pwd只是一个示例,可以通过以下方法进行:

import subprocess

p = subprocess.Popen("pwd", stdout=subprocess.PIPE)
result = p.communicate()[0]
print result

看到子文档另一个例子和更多信息。

Assuming that pwd is just an example, this is how you can do it:

import subprocess

p = subprocess.Popen("pwd", stdout=subprocess.PIPE)
result = p.communicate()[0]
print result

See the subprocess documentation for another example and more information.


回答 3

subprocess.Popen:http : //docs.python.org/2/library/subprocess.html#subprocess.Popen

import subprocess

command = "ntpq -p"  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=None, shell=True)

#Launch the shell command:
output = process.communicate()

print output[0]

在Popen构造函数中,如果shellTrue,则应以字符串而不是序列的形式传递命令。否则,只需将命令拆分为一个列表:

command = ["ntpq", "-p"]  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=None)

如果您还需要将标准错误读取到Popen初始化中,则可以将stderr设置为subprocess.PIPEsubprocess.STDOUT

import subprocess

command = "ntpq -p"  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

#Launch the shell command:
output, error = process.communicate()

subprocess.Popen: http://docs.python.org/2/library/subprocess.html#subprocess.Popen

import subprocess

command = "ntpq -p"  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=None, shell=True)

#Launch the shell command:
output = process.communicate()

print output[0]

In the Popen constructor, if shell is True, you should pass the command as a string rather than as a sequence. Otherwise, just split the command into a list:

command = ["ntpq", "-p"]  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=None)

If you need to read also the standard error, into the Popen initialization, you can set stderr to subprocess.PIPE or to subprocess.STDOUT:

import subprocess

command = "ntpq -p"  # the shell command
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)

#Launch the shell command:
output, error = process.communicate()

回答 4

对于Python 2.7+,惯用的答案是使用 subprocess.check_output()

您还应该在调用子流程时注意对参数的处理,因为这可能会造成一些混乱。

如果args只是单个命令而没有自己的args(或您已shell=True设置),则它可以是字符串。否则,它必须是一个列表。

例如…调用ls命令,这很好:

from subprocess import check_call
check_call('ls')

是这样的:

from subprocess import check_call
check_call(['ls',])

但是,如果要将一些args传递给shell命令,则不能这样做:

from subprocess import check_call
check_call('ls -al')

相反,您必须将其作为列表传递:

from subprocess import check_call
check_call(['ls', '-al'])

shlex.split()函数有时在创建子进程之前将字符串拆分成类似shell的语法很有用…就像这样:

from subprocess import check_call
import shlex
check_call(shlex.split('ls -al'))

for Python 2.7+ the idiomatic answer is to use subprocess.check_output()

You should also note the handling of arguments when invoking a subprocess, as it can be a little confusing….

If args is just single command with no args of its own (or you have shell=True set), it can be a string. Otherwise it must be a list.

for example… to invoke the ls command, this is fine:

from subprocess import check_call
check_call('ls')

so is this:

from subprocess import check_call
check_call(['ls',])

however, if you want to pass some args to the shell command, you can’t do this:

from subprocess import check_call
check_call('ls -al')

instead, you must pass it as a list:

from subprocess import check_call
check_call(['ls', '-al'])

the shlex.split() function can sometimes be useful to split a string into shell-like syntax before creating a subprocesses… like this:

from subprocess import check_call
import shlex
check_call(shlex.split('ls -al'))

回答 5

这完全适合我:

import subprocess
try:
    #prints results and merges stdout and std
    result = subprocess.check_output("echo %USERNAME%", stderr=subprocess.STDOUT, shell=True)
    print result
    #causes error and merges stdout and stderr
    result = subprocess.check_output("copy testfds", stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError, ex: # error code <> 0 
    print "--------error------"
    print ex.cmd
    print ex.message
    print ex.returncode
    print ex.output # contains stdout and stderr together 

This works perfectly for me:

import subprocess
try:
    #prints results and merges stdout and std
    result = subprocess.check_output("echo %USERNAME%", stderr=subprocess.STDOUT, shell=True)
    print result
    #causes error and merges stdout and stderr
    result = subprocess.check_output("copy testfds", stderr=subprocess.STDOUT, shell=True)
except subprocess.CalledProcessError, ex: # error code <> 0 
    print "--------error------"
    print ex.cmd
    print ex.message
    print ex.returncode
    print ex.output # contains stdout and stderr together 

回答 6

这对我来说是完美的。您将在元组中获得返回码,stdout和stderr。

from subprocess import Popen, PIPE

def console(cmd):
    p = Popen(cmd, shell=True, stdout=PIPE)
    out, err = p.communicate()
    return (p.returncode, out, err)

例如:

result = console('ls -l')
print 'returncode: %s' % result[0]
print 'output: %s' % result[1]
print 'error: %s' % result[2]

This was perfect for me. You will get the return code, stdout and stderr in a tuple.

from subprocess import Popen, PIPE

def console(cmd):
    p = Popen(cmd, shell=True, stdout=PIPE)
    out, err = p.communicate()
    return (p.returncode, out, err)

For Example:

result = console('ls -l')
print 'returncode: %s' % result[0]
print 'output: %s' % result[1]
print 'error: %s' % result[2]

回答 7

接受的答案仍然是好的,只是对新功能的一些评论。从python 3.6开始,您可以直接在中处理编码check_output,请参阅文档。现在返回一个字符串对象:

import subprocess 
out = subprocess.check_output(["ls", "-l"], encoding="utf-8")

在python 3.7中,capture_output向subprocess.run()添加了一个参数,该参数为我们执行了一些Popen / PIPE处理,请参见python docs

import subprocess 
p2 = subprocess.run(["ls", "-l"], capture_output=True, encoding="utf-8")
p2.stdout

The accepted answer is still good, just a few remarks on newer features. Since python 3.6, you can handle encoding directly in check_output, see documentation. This returns a string object now:

import subprocess 
out = subprocess.check_output(["ls", "-l"], encoding="utf-8")

In python 3.7, a parameter capture_output was added to subprocess.run(), which does some of the Popen/PIPE handling for us, see the python docs :

import subprocess 
p2 = subprocess.run(["ls", "-l"], capture_output=True, encoding="utf-8")
p2.stdout

回答 8

 import os   
 list = os.popen('pwd').read()

在这种情况下,列表中将只有一个元素。

 import os   
 list = os.popen('pwd').read()

In this case you will only have one element in the list.


回答 9

我根据此处的其他答案编写了一个小函数:

def pexec(*args):
    return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].rstrip()

用法:

changeset = pexec('hg','id','--id')
branch = pexec('hg','id','--branch')
revnum = pexec('hg','id','--num')
print('%s : %s (%s)' % (revnum, changeset, branch))

I wrote a little function based on the other answers here:

def pexec(*args):
    return subprocess.Popen(args, stdout=subprocess.PIPE).communicate()[0].rstrip()

Usage:

changeset = pexec('hg','id','--id')
branch = pexec('hg','id','--branch')
revnum = pexec('hg','id','--num')
print('%s : %s (%s)' % (revnum, changeset, branch))

回答 10

在Python 3.7中,capture_output为引入了新的关键字参数subprocess.run。启用简短的说明:

import subprocess

p = subprocess.run("echo 'hello world!'", capture_output=True, shell=True, encoding="utf8")
assert p.stdout == 'hello world!\n'

In Python 3.7 a new keyword argument capture_output was introduced for subprocess.run. Enabling the short and simple:

import subprocess

p = subprocess.run("echo 'hello world!'", capture_output=True, shell=True, encoding="utf8")
assert p.stdout == 'hello world!\n'

回答 11

import subprocess
output = str(subprocess.Popen("ntpq -p",shell = True,stdout = subprocess.PIPE, 
stderr = subprocess.STDOUT).communicate()[0])

这是一线解决方案

import subprocess
output = str(subprocess.Popen("ntpq -p",shell = True,stdout = subprocess.PIPE, 
stderr = subprocess.STDOUT).communicate()[0])

This is one line solution


回答 12

以下内容在单个变量中捕获了进程的stdout和stderr。它与Python 2和3兼容:

from subprocess import check_output, CalledProcessError, STDOUT

command = ["ls", "-l"]
try:
    output = check_output(command, stderr=STDOUT).decode()
    success = True 
except CalledProcessError as e:
    output = e.output.decode()
    success = False

如果您的命令是字符串而不是数组,请在此前缀:

import shlex
command = shlex.split(command)

The following captures stdout and stderr of the process in a single variable. It is Python 2 and 3 compatible:

from subprocess import check_output, CalledProcessError, STDOUT

command = ["ls", "-l"]
try:
    output = check_output(command, stderr=STDOUT).decode()
    success = True 
except CalledProcessError as e:
    output = e.output.decode()
    success = False

If your command is a string rather than an array, prefix this with:

import shlex
command = shlex.split(command)

回答 13

对于python 3.5,我根据先前的答案提出了功能。日志可能会被删除,以为拥有它很高兴

import shlex
from subprocess import check_output, CalledProcessError, STDOUT


def cmdline(command):
    log("cmdline:{}".format(command))
    cmdArr = shlex.split(command)
    try:
        output = check_output(cmdArr,  stderr=STDOUT).decode()
        log("Success:{}".format(output))
    except (CalledProcessError) as e:
        output = e.output.decode()
        log("Fail:{}".format(output))
    except (Exception) as e:
        output = str(e);
        log("Fail:{}".format(e))
    return str(output)


def log(msg):
    msg = str(msg)
    d_date = datetime.datetime.now()
    now = str(d_date.strftime("%Y-%m-%d %H:%M:%S"))
    print(now + " " + msg)
    if ("LOG_FILE" in globals()):
        with open(LOG_FILE, "a") as myfile:
            myfile.write(now + " " + msg + "\n")

For python 3.5 I put up function based on previous answer. Log may be removed, thought it’s nice to have

import shlex
from subprocess import check_output, CalledProcessError, STDOUT


def cmdline(command):
    log("cmdline:{}".format(command))
    cmdArr = shlex.split(command)
    try:
        output = check_output(cmdArr,  stderr=STDOUT).decode()
        log("Success:{}".format(output))
    except (CalledProcessError) as e:
        output = e.output.decode()
        log("Fail:{}".format(output))
    except (Exception) as e:
        output = str(e);
        log("Fail:{}".format(e))
    return str(output)


def log(msg):
    msg = str(msg)
    d_date = datetime.datetime.now()
    now = str(d_date.strftime("%Y-%m-%d %H:%M:%S"))
    print(now + " " + msg)
    if ("LOG_FILE" in globals()):
        with open(LOG_FILE, "a") as myfile:
            myfile.write(now + " " + msg + "\n")

回答 14

使用ckeck_output方法subprocess

import subprocess
address = 192.168.x.x
res = subprocess.check_output(['ping', address, '-c', '3'])

最后解析字符串

for line in res.splitlines():

希望对您有所帮助,编码愉快

Use ckeck_output method of subprocess

import subprocess
address = 192.168.x.x
res = subprocess.check_output(['ping', address, '-c', '3'])

Finally parse the string

for line in res.splitlines():

Hope it helps, happy coding