标签归档:subprocess

将模块“子进程”与超时一起使用

问题:将模块“子进程”与超时一起使用

这是运行任意命令以返回其stdout数据或在非零退出代码上引发异常的Python代码:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate 用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

subprocess模块不支持超时-可以杀死运行时间超过X秒的进程-因此,communicate可能需要永远运行。

在打算在Windows和Linux上运行的Python程序中实现超时的最简单方法是什么?

Here’s the Python code to run an arbitrary command returning its stdout data, or raise an exception on non-zero exit codes:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate is used to wait for the process to exit:

stdoutdata, stderrdata = proc.communicate()

The subprocess module does not support timeout–ability to kill a process running for more than X number of seconds–therefore, communicate may take forever to run.

What is the simplest way to implement timeouts in a Python program meant to run on Windows and Linux?


回答 0

在Python 3.3+中:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output 是一个字节字符串,其中包含命令的合并标准输出,标准错误数据。

check_output提出CalledProcessError问题文本中指定的非零退出状态,这与proc.communicate()的方法。

我已删除,shell=True因为它经常不必要地使用。如果cmd确实需要,您可以随时将其添加回去。如果添加,shell=True即子进程是否产生了自己的后代;check_output()可以比超时指示晚得多返回,请参阅子进程超时失败

超时功能可在Python 2.x上通过subprocess323.2+子进程模块的反向端口使用。

In Python 3.3+:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output is a byte string that contains command’s merged stdout, stderr data.

check_output raises CalledProcessError on non-zero exit status as specified in the question’s text unlike proc.communicate() method.

I’ve removed shell=True because it is often used unnecessarily. You can always add it back if cmd indeed requires it. If you add shell=True i.e., if the child process spawns its own descendants; check_output() can return much later than the timeout indicates, see Subprocess timeout failure.

The timeout feature is available on Python 2.x via the subprocess32 backport of the 3.2+ subprocess module.


回答 1

我对底层细节了解不多;但是,鉴于python 2.6中的API提供了等待线程并终止进程的能力,那么如何在单独的线程中运行进程呢?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

我的计算机中此代码段的输出为:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

从中可以看出,在第一次执行中,进程正确完成了(返回代码0),而在第二次执行中,进程终止了(返回代码-15)。

我没有在Windows中进行测试;但是,除了更新示例命令外,我认为它应该可以工作,因为我在文档中没有发现任何不支持thread.join或process.terminate的内容。

I don’t know much about the low level details; but, given that in python 2.6 the API offers the ability to wait for threads and terminate processes, what about running the process in a separate thread?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

The output of this snippet in my machine is:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

where it can be seen that, in the first execution, the process finished correctly (return code 0), while the in the second one the process was terminated (return code -15).

I haven’t tested in windows; but, aside from updating the example command, I think it should work since I haven’t found in the documentation anything that says that thread.join or process.terminate is not supported.


回答 2

可以使用threading.Timer类简化jcollado的答案:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second

jcollado’s answer can be simplified using the threading.Timer class:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second

回答 3

如果您使用的是Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

If you’re on Unix,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else

回答 4

这是Alex Martelli作为具有适当过程终止功能的模块的解决方案。其他方法不起作用,因为它们不使用proc.communicate()。因此,如果您有一个产生大量输出的进程,它将填充其输出缓冲区,然后阻塞直到您从中读取内容。

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)

Here is Alex Martelli’s solution as a module with proper process killing. The other approaches do not work because they do not use proc.communicate(). So if you have a process that produces lots of output, it will fill its output buffer and then block until you read something from it.

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)

回答 5

我修改了sussudio答案。现在函数返回:( ,returncodestdoutstderrtimeoutstdoutstderr被解码为UTF-8字符串

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

I’ve modified sussudio answer. Now function returns: (returncode, stdout, stderr, timeout) – stdout and stderr is decoded to utf-8 string

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]

回答 6

惊讶没人提到使用 timeout

timeout 5 ping -c 3 somehost

显然,这不适用于每个用例,但是如果您处理的是简单脚本,那么这是很难克服的。

homebrew对于Mac用户,也可以通过coreutils中的gtimeout 使用。

surprised nobody mentioned using timeout

timeout 5 ping -c 3 somehost

This won’t for work for every use case obviously, but if your dealing with a simple script, this is hard to beat.

Also available as gtimeout in coreutils via homebrew for mac users.


回答 7

timeout现在由subprocess模块支持call()communicate()在其中(在Python3.3中):

import subprocess

subprocess.call("command", timeout=20, shell=True)

这将调用命令并引发异常

subprocess.TimeoutExpired

如果20秒后命令仍未完成。

然后,您可以处理异常以继续执行代码,例如:

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

希望这可以帮助。

timeout is now supported by call() and communicate() in the subprocess module (as of Python3.3):

import subprocess

subprocess.call("command", timeout=20, shell=True)

This will call the command and raise the exception

subprocess.TimeoutExpired

if the command doesn’t finish after 20 seconds.

You can then handle the exception to continue your code, something like:

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

Hope this helps.


回答 8

另一种选择是写入临时文件以防止stdout阻塞,而不是需要使用communication()进行轮询。这对我有用,而其他答案却没有。例如在Windows上。

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()

Another option is to write to a temporary file to prevent the stdout blocking instead of needing to poll with communicate(). This worked for me where the other answers did not; for example on windows.

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()

回答 9

我不知道为什么它不mentionned但是因为Python 3.5,有一个新的subprocess.run通用指令(即意味着取代check_callcheck_output……),并且其具有timeout参数也是如此。

subprocess.run(args,*,stdin = None,input = None,stdout = None,stderr = None,shell = False,cwd = None,timeout = None,check = False,encoding = None,errors = None)

Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.

subprocess.TimeoutExpired超时到期时会引发异常。

I don’t know why it isn’t mentionned but since Python 3.5, there’s a new subprocess.run universal command (that is meant to replace check_call, check_output …) and which has the timeout parameter as well.

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None)

Run the command described by args. Wait for command to complete, then return a CompletedProcess instance.

It raises a subprocess.TimeoutExpired exception when the timeout is expired.


回答 10

这是我的解决方案,我正在使用线程和事件:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

实际上:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

Here is my solution, I was using Thread and Event:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

In action:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)

回答 11

我使用的解决方案是给shell命令加上时间限制。如果命令花费的时间太长,则时间限制将停止它,并且Popen将有一个由时间限制设置的返回码。如果大于128,则表示时间限制终止了该进程。

另请参阅具有超时和大输出(> 64K)的python子进程

The solution I use is to prefix the shell command with timelimit. If the comand takes too long, timelimit will stop it and Popen will have a returncode set by timelimit. If it is > 128, it means timelimit killed the process.

See also python subprocess with timeout and large output (>64K)


回答 12

我将带有线程自的解决方案添加jcollado到了我的Python模块easyprocess中

安装:

pip install easyprocess

例:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout

I added the solution with threading from jcollado to my Python module easyprocess.

Install:

pip install easyprocess

Example:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout

回答 13

如果您使用的是python 2,请尝试一下

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e

if you are using python 2, give it a try

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e

回答 14

前置Linux命令timeout不是一个坏的解决方法,它对我有用。

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

Prepending the Linux command timeout isn’t a bad workaround and it worked for me.

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()

回答 15

我已经实现了我可以从其中一些中学到的东西。这在Windows中有效,并且由于这是社区Wiki,因此我想我也将共享我的代码:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

然后从另一个类或文件:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()

I’ve implemented what I could gather from a few of these. This works in Windows, and since this is a community wiki, I figure I would share my code as well:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

Then from another class or file:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()

回答 16

一旦您了解了* unix中运行全过程的机器,您将轻松找到更简单的解决方案:

考虑这个简单的示例,如何使用select.select()使超时的communication()方法(现在几乎在* nix上几乎所有可用)。这也可以用epoll / poll / kqueue编写,但是select.select()变体可能是一个很好的例子。而且select.select()的主要限制(速度和最大1024 fds)不适用于您的任务。

这可以在* nix下工作,不创建线程,不使用信号,可以从任何线程(不仅是主线程)启动,并且速度足够快,可以从我的计算机上的stdout(i5 2.3ghz)读取250mb / s的数据。

在通信结束时加入stdout / stderr存在问题。如果您的程序输出很大,可能会导致占用大量内存。但是您可以在较小的超时时间内多次调用communication()。

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)

Once you understand full process running machinery in *unix, you will easily find simplier solution:

Consider this simple example how to make timeoutable communicate() meth using select.select() (available alsmost everythere on *nix nowadays). This also can be written with epoll/poll/kqueue, but select.select() variant could be a good example for you. And major limitations of select.select() (speed and 1024 max fds) are not applicapable for your task.

This works under *nix, does not create threads, does not uses signals, can be lauched from any thread (not only main), and fast enought to read 250mb/s of data from stdout on my machine (i5 2.3ghz).

There is a problem in join’ing stdout/stderr at the end of communicate. If you have huge program output this could lead to big memory usage. But you can call communicate() several times with smaller timeouts.

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)

回答 17

您可以使用 select

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None

You can do this using select

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None

回答 18

我已经在Windows,Linux和Mac上成功使用killableprocess。如果您使用的是Cygwin Python,则需要OSAF的killableprocess版本,因为否则本机Windows进程将不会被杀死。

I’ve used killableprocess successfully on Windows, Linux and Mac. If you are using Cygwin Python, you’ll need OSAF’s version of killableprocess because otherwise native Windows processes won’t get killed.


回答 19

尽管我没有广泛研究它,但我在ActiveState上发现的这种装饰器似乎对这种事情很有用。与一起subprocess.Popen(..., close_fds=True),至少我已经准备好使用Python编写shell脚本了。

Although I haven’t looked at it extensively, this decorator I found at ActiveState seems to be quite useful for this sort of thing. Along with subprocess.Popen(..., close_fds=True), at least I’m ready for shell-scripting in Python.


回答 20

如果shell = True,此解决方案将杀死进程树,将参数传递给进程(或不传递参数),具有超时并获取回调的stdout,stderr和进程输出(它将psutil用于kill_proc_tree)。这是基于SO中发布的几种解决方案,包括jcollado的解决方案。在jcollado的回答中张贴对Anson和jradice的评论的回应。已在Windows Srvr 2012和Ubuntu 14.04中测试。请注意,对于Ubuntu,您需要将parent.children(…)调用更改为parent.get_children(…)。

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()

This solution kills the process tree in case of shell=True, passes parameters to the process (or not), has a timeout and gets the stdout, stderr and process output of the call back (it uses psutil for the kill_proc_tree). This was based on several solutions posted in SO including jcollado’s. Posting in response to comments by Anson and jradice in jcollado’s answer. Tested in Windows Srvr 2012 and Ubuntu 14.04. Please note that for Ubuntu you need to change the parent.children(…) call to parent.get_children(…).

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()

回答 21

有一个想法可以继承Popen类并使用一些简单的方法装饰器对其进行扩展。我们称之为ExpirablePopen。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

There’s an idea to subclass the Popen class and extend it with some simple method decorators. Let’s call it ExpirablePopen.

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()

回答 22

我遇到的问题是,如果花费的时间比给定的超时时间长,我想终止多线程子进程。我想在中设置一个超时Popen(),但是没有用。然后,我意识到这Popen().wait()等于call(),因此我有了在该.wait(timeout=xxx)方法中设置超时的想法,该方法终于奏效了。因此,我通过以下方式解决了问题:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

I had the problem that I wanted to terminate a multithreading subprocess if it took longer than a given timeout length. I wanted to set a timeout in Popen(), but it did not work. Then, I realized that Popen().wait() is equal to call() and so I had the idea to set a timeout within the .wait(timeout=xxx) method, which finally worked. Thus, I solved it this way:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()

回答 23

不幸的是,我受雇主披露源代码的非常严格的政策约束,因此我无法提供实际的代码。但按我的喜好,最好的解决方案是创建一个重写的子类Popen.wait()以轮询而不是无限期地等待,并Popen.__init__接受超时参数。完成后,所有其他Popen方法(调用wait)将按预期工作,包括communicate

Unfortunately, I’m bound by very strict policies on the disclosure of source code by my employer, so I can’t provide actual code. But for my taste the best solution is to create a subclass overriding Popen.wait() to poll instead of wait indefinitely, and Popen.__init__ to accept a timeout parameter. Once you do that, all the other Popen methods (which call wait) will work as expected, including communicate.


回答 24

https://pypi.python.org/pypi/python-subprocess2提供了子流程模块的扩展,使您可以等待一段时间,否则终止。

因此,要等待10秒钟才能终止进程,否则请终止:

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

这与Windows和UNIX兼容。“结果”是一个字典,它包含“ returnCode”和“ actionTaken”,returnCode是应用程序的返回值(如果必须终止,则为None)。如果该过程正常完成,则显示为“ SUBPROCESS2_PROCESS_COMPLETED”,或者根据执行的操作显示“ SUBPROCESS2_PROCESS_TERMINATED”和SUBPROCESS2_PROCESS_KILLED的掩码(有关详细信息,请参阅文档)

https://pypi.python.org/pypi/python-subprocess2 provides extensions to the subprocess module which allow you to wait up to a certain period of time, otherwise terminate.

So, to wait up to 10 seconds for the process to terminate, otherwise kill:

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

This is compatible with both windows and unix. “results” is a dictionary, it contains “returnCode” which is the return of the app (or None if it had to be killed), as well as “actionTaken”. which will be “SUBPROCESS2_PROCESS_COMPLETED” if the process completed normally, or a mask of “SUBPROCESS2_PROCESS_TERMINATED” and SUBPROCESS2_PROCESS_KILLED depending on action taken (see documentation for full details)


回答 25

对于python 2.6+,请使用gevent

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1

for python 2.6+, use gevent

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1

回答 26

python 2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output

python 2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output

回答 27

import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback

class OutputManager:
    def __init__(self, filename, mode, console, logonly):
        self.con = console
        self.logtoconsole = True
        self.logtofile = False

        if filename:
            try:
                self.f = open(filename, mode)
                self.logtofile = True
                if logonly == True:
                    self.logtoconsole = False
            except IOError:
                print (sys.exc_value)
                print ("Switching to console only output...\n")
                self.logtofile = False
                self.logtoconsole = True

    def write(self, data):
        if self.logtoconsole == True:
            self.con.write(data)
        if self.logtofile == True:
            self.f.write(data)
        sys.stdout.flush()

def getTimeString():
        return time.strftime("%Y-%m-%d", time.gmtime())

def runCommand(command):
    '''
    Execute a command in new thread and return the
    stdout and stderr content of it.
    '''
    try:
        Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0]
    except Exception as e:
        print ("runCommand failed :%s" % (command))
        print (str(e))
        sys.stdout.flush()
        return None
    return Output

def GetOs():
    Os = ""
    if sys.platform.startswith('win32'):
        Os = "win"
    elif sys.platform.startswith('linux'):
        Os = "linux"
    elif sys.platform.startswith('darwin'):
        Os = "mac"
    return Os


def check_output(*popenargs, **kwargs):
    try:
        if 'stdout' in kwargs: 
            raise ValueError('stdout argument not allowed, it will be overridden.') 

        # Get start time.
        startTime = datetime.datetime.now()
        timeoutValue=3600

        cmd = popenargs[0]

        if sys.platform.startswith('win32'):
            process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True) 
        elif sys.platform.startswith('linux'):
            process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) 
        elif sys.platform.startswith('darwin'):
            process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) 

        stdoutdata, stderrdata = process.communicate( timeout = timeoutValue )
        retcode = process.poll()

        ####################################
        # Catch crash error and log it.
        ####################################
        OutputHandle = None
        try:
            if retcode >= 1:
                OutputHandle = OutputManager( 'CrashJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
                OutputHandle.write( cmd )
                print (stdoutdata)
                print (stderrdata)
                sys.stdout.flush()
        except Exception as e:
            print (str(e))

    except subprocess.TimeoutExpired:
            ####################################
            # Catch time out error and log it.
            ####################################
            Os = GetOs()
            if Os == 'win':
                killCmd = "taskkill /FI \"IMAGENAME eq {0}\" /T /F"
            elif Os == 'linux':
                killCmd = "pkill {0)"
            elif Os == 'mac':
                # Linux, Mac OS
                killCmd = "killall -KILL {0}"

            runCommand(killCmd.format("java"))
            runCommand(killCmd.format("YouApp"))

            OutputHandle = None
            try:
                OutputHandle = OutputManager( 'KillJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
                OutputHandle.write( cmd )
            except Exception as e:
                print (str(e))
    except Exception as e:
            for frame in traceback.extract_tb(sys.exc_info()[2]):
                        fname,lineno,fn,text = frame
                        print "Error in %s on line %d" % (fname, lineno)
import subprocess, optparse, os, sys, re, datetime, threading, time, glob, shutil, xml.dom.minidom, traceback

class OutputManager:
    def __init__(self, filename, mode, console, logonly):
        self.con = console
        self.logtoconsole = True
        self.logtofile = False

        if filename:
            try:
                self.f = open(filename, mode)
                self.logtofile = True
                if logonly == True:
                    self.logtoconsole = False
            except IOError:
                print (sys.exc_value)
                print ("Switching to console only output...\n")
                self.logtofile = False
                self.logtoconsole = True

    def write(self, data):
        if self.logtoconsole == True:
            self.con.write(data)
        if self.logtofile == True:
            self.f.write(data)
        sys.stdout.flush()

def getTimeString():
        return time.strftime("%Y-%m-%d", time.gmtime())

def runCommand(command):
    '''
    Execute a command in new thread and return the
    stdout and stderr content of it.
    '''
    try:
        Output = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True).communicate()[0]
    except Exception as e:
        print ("runCommand failed :%s" % (command))
        print (str(e))
        sys.stdout.flush()
        return None
    return Output

def GetOs():
    Os = ""
    if sys.platform.startswith('win32'):
        Os = "win"
    elif sys.platform.startswith('linux'):
        Os = "linux"
    elif sys.platform.startswith('darwin'):
        Os = "mac"
    return Os


def check_output(*popenargs, **kwargs):
    try:
        if 'stdout' in kwargs: 
            raise ValueError('stdout argument not allowed, it will be overridden.') 

        # Get start time.
        startTime = datetime.datetime.now()
        timeoutValue=3600

        cmd = popenargs[0]

        if sys.platform.startswith('win32'):
            process = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True) 
        elif sys.platform.startswith('linux'):
            process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) 
        elif sys.platform.startswith('darwin'):
            process = subprocess.Popen( cmd , stdout=subprocess.PIPE, shell=True ) 

        stdoutdata, stderrdata = process.communicate( timeout = timeoutValue )
        retcode = process.poll()

        ####################################
        # Catch crash error and log it.
        ####################################
        OutputHandle = None
        try:
            if retcode >= 1:
                OutputHandle = OutputManager( 'CrashJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
                OutputHandle.write( cmd )
                print (stdoutdata)
                print (stderrdata)
                sys.stdout.flush()
        except Exception as e:
            print (str(e))

    except subprocess.TimeoutExpired:
            ####################################
            # Catch time out error and log it.
            ####################################
            Os = GetOs()
            if Os == 'win':
                killCmd = "taskkill /FI \"IMAGENAME eq {0}\" /T /F"
            elif Os == 'linux':
                killCmd = "pkill {0)"
            elif Os == 'mac':
                # Linux, Mac OS
                killCmd = "killall -KILL {0}"

            runCommand(killCmd.format("java"))
            runCommand(killCmd.format("YouApp"))

            OutputHandle = None
            try:
                OutputHandle = OutputManager( 'KillJob_' + getTimeString() + '.txt', 'a+', sys.stdout, False)
                OutputHandle.write( cmd )
            except Exception as e:
                print (str(e))
    except Exception as e:
            for frame in traceback.extract_tb(sys.exc_info()[2]):
                        fname,lineno,fn,text = frame
                        print "Error in %s on line %d" % (fname, lineno)

回答 28

只是想写一些简单的东西。

#!/usr/bin/python

from subprocess import Popen, PIPE
import datetime
import time 

popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime =  3

print "Start time %s"%(sttime)

while True:
    popen.poll();
    time.sleep(1)
    rcode = popen.returncode
    now = time.time();
    if [ rcode is None ]  and  [ now > (sttime + waittime) ] :
        print "Killing it now"
        popen.kill()

Was just trying to write something simpler.

#!/usr/bin/python

from subprocess import Popen, PIPE
import datetime
import time 

popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime =  3

print "Start time %s"%(sttime)

while True:
    popen.poll();
    time.sleep(1)
    rcode = popen.returncode
    now = time.time();
    if [ rcode is None ]  and  [ now > (sttime + waittime) ] :
        print "Killing it now"
        popen.kill()

在Python中对子进程进行非阻塞读取

问题:在Python中对子进程进行非阻塞读取

我正在使用子流程模块来启动子流程并连接到其输出流(stdout)。我希望能够在其stdout上执行非阻塞读取。有没有一种方法可以使.readline无阻塞或在调用之前检查流中是否有数据.readline?我希望这是可移植的,或者至少要在Windows和Linux下工作。

这是我目前的操作方式(.readline如果没有可用数据,则会阻塞):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

I’m using the subprocess module to start a subprocess and connect to it’s output stream (stdout). I want to be able to execute non-blocking reads on its stdout. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I’d like this to be portable or at least work under Windows and Linux.

here is how I do it for now (It’s blocking on the .readline if no data is avaible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

回答 0

fcntlselectasyncproc不会在这种情况下帮助。

不管使用什么操作系统,一种可靠地读取流而不阻塞的可靠方法是使用Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

fcntl, select, asyncproc won’t help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

回答 1

我经常遇到类似的问题。我经常编写的Python程序需要具有执行一些主要功能的能力,同时还要从命令行(stdin)接受用户输入。仅将用户输入处理功能放在另一个线程中并不能解决问题,因为会readline()阻塞并且没有超时。如果主要功能已经完成,并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但是不能,因为readline()仍然在另一个线程中等待一行。我发现此问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用选择或信号模块来解决此问题要干净一些,但是再说一次,它仅适用于UNIX …

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn’t solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can’t because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX…


回答 2

Python 3.4引入了用于异步IO 模块的临时APIasyncio

该方法类似于twisted@Bryan Ward的基于答案 -定义协议,并在数据准备好后立即调用其方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅docs中的“子流程”

有一个高级接口asyncio.create_subprocess_exec(),该接口返回允许使用协程 (使用/ Python 3.5+语法)异步读取行的Process对象StreamReader.readline()asyncawait

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() 执行以下任务:

  • 启动子进程,将其标准输出重定向到管道
  • 从子进程的stdout异步读取一行
  • 杀死子进程
  • 等待它退出

如有必要,每个步骤可能会受到超时秒数的限制。

Python 3.4 introduces new provisional API for asynchronous IO — asyncio module.

The approach is similar to twisted-based answer by @Bryan Ward — define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See “Subprocess” in the docs.

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

  • start subprocess, redirect its stdout to a pipe
  • read a line from subprocess’ stdout asynchronously
  • kill subprocess
  • wait for it to exit

Each step could be limited by timeout seconds if necessary.


回答 3

尝试使用asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程。

Try the asyncproc module. For example:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.


回答 4

您可以在Twisted中非常轻松地执行此操作。根据您现有的代码库,这可能不是那么容易使用,但是如果您要构建一个扭曲的应用程序,那么类似这样的事情将变得微不足道。您创建一个ProcessProtocol类,并重写该outReceived()方法。扭曲(取决于所使用的反应堆)通常只是一个大select()循环,其中安装了用于处理来自不同文件描述符(通常是网络套接字)的数据的回调。因此,该outReceived()方法只是安装用于处理来自的数据的回调STDOUT。演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

扭曲的文档对此有一些有用的信息。

如果您围绕Twisted构建整个应用程序,它将与本地或远程的其他进程进行异步通信,就像这样非常优雅。另一方面,如果您的程序不是基于Twisted构建的,那么它实际上并没有那么大的帮助。希望这对其他读者有帮助,即使它不适用于您的特定应用程序。

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn’t built on top of Twisted, this isn’t really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn’t applicable for your particular application.


回答 5

使用select&read(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于类似readline()的:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Use select & read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

回答 6

一种解决方案是使另一个进程执行该进程的读取,或者使该进程的线程超时。

这是超时功能的线程版本:

http://code.activestate.com/recipes/473878/

但是,您需要在输入stdout时对其进行阅读吗?另一个解决方案可能是将输出转储到文件中,并等待使用p.wait()完成该过程。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

Here’s the threaded version of a timeout function:

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it’s coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

回答 7

免责声明:这仅适用于龙卷风

您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现。我将其包装在一个名为tornado_subprocess的鸡蛋中,您可以通过PyPI安装它:

easy_install tornado_subprocess

现在您可以执行以下操作:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

您也可以将其与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Disclaimer: this works only for tornado

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

easy_install tornado_subprocess

now you can do something like this:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

回答 8

现有解决方案对我不起作用(详细信息如下)。最终有效的方法是使用read(1)实现readline(基于此答案)。后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有解决方案不起作用:

  1. 需要readline的解决方案(包括基于Queue的解决方案)始终会阻塞。很难(不可能?)杀死执行readline的线程。它仅在创建它的过程完成时被杀死,而在产生输出的过程被杀死时则不被杀死。
  2. 正如anonnn指出的那样,将低级fcntl与高级别readline调用混合可能无法正常工作。
  3. 使用select.poll()很简单,但是根据python文档,它在Windows上不起作用。
  4. 使用第三方库似乎无法胜任此任务,并增加了其他依赖性。

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  3. Using select.poll() is neat, but doesn’t work on Windows according to python docs.
  4. Using third-party libraries seems overkill for this task and adds additional dependencies.

回答 9

这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。

经过测试并在Python 2.7 linux&Windows上正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

Tested and correctly worked on Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

回答 10

我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

回答 11

这无阻塞读的版本并不需要特殊的模块,并在大多数Linux发行版的工作外的开箱。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

This version of non-blocking read doesn’t require special modules and will work out-of-the-box on majority of Linux distros.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

回答 12

这是一个基于线程的简单解决方案,其中:

  • 在Linux和Windows上均可使用(不依赖select)。
  • 同时读取stdoutstderrasynchronouly。
  • 不依赖于具有任意等待时间的主动轮询(CPU友好)。
  • 不使用asyncio(可能与其他库冲突)。
  • 运行直到子进程终止。

打印机

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

Here is a simple solution based on threads which:

  • works on both Linux and Windows (not relying on select).
  • reads both stdout and stderr asynchronouly.
  • doesn’t rely on active polling with arbitrary waiting time (CPU friendly).
  • doesn’t use asyncio (which may conflict with other libraries).
  • runs until the child process terminates.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

回答 13

在此添加答案,因为它提供了在Windows和Unix上设置非阻塞管道的功能。

所有ctypes细节都感谢@techtonik的回答

在Unix和Windows系统上都可以使用经过稍微修改的版本。

  • 与Python3兼容(仅需要很小的更改)
  • 包括posix版本,并定义要使用的异常。

这样,您可以对Unix和Windows代码使用相同的功能和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的readline生成器(该生成器返回每行的字节字符串)。

它是一个生成器,因此您可以例如…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

All the ctypes details are thanks to @techtonik’s answer.

There is a slightly modified version to be used both on Unix and Windows systems.

  • Python3 compatible (only minor change needed).
  • Includes posix version, and defines exception to use for either.

This way you can use the same function and exception for Unix and Windows code.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

Its a generator so you can for example…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

回答 14

我有原始发问者的问题,但不希望调用线程。我将Jesse的解决方案与管道中的直接read()以及我自己的用于行读取的缓冲区处理程序混合在一起(但是,我的子进程ping总是写完整的行<系统页面大小)。通过仅阅读通过gobject注册的io手表,可以避免繁忙的等待。这些天,我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

观察者是

def watch(f, *other):
print 'reading',f.read()
return True

并且主程序设置ping,然后调用gobject邮件循环。

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

其他任何工作都附加到gobject中的回调中。

I have the original questioner’s problem, but did not wish to invoke threads. I mixed Jesse’s solution with a direct read() from the pipe, and my own buffer-handler for line reads (however, my sub-process – ping – always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
print 'reading',f.read()
return True

And the main program sets up a ping and then calls gobject mail loop.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Any other work is attached to callbacks in gobject.


回答 15

在现代Python中,情况要好得多。

这是一个简单的子程序“ hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

和一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际模式(在此处以及在相关问题中,几乎所有先前的答案也是如此)是将子级的stdout文件描述符设置为非阻塞,然后在某种选择循环中对其进行轮询。这些天,当然,该循环是由asyncio提供的。

Things are a lot better in modern Python.

Here’s a simple child program, “hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

And a program to interact with it:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

That prints out:

b'hello bob\n'
b'hello alice\n'

Note that the actual pattern, which is also by almost all of the previous answers, both here and in related questions, is to set the child’s stdout file descriptor to non-blocking and then poll it in some sort of select loop. These days, of course, that loop is provided by asyncio.


回答 16

选择模块可以帮助您确定下一个有用的输入。

但是,您几乎总是对单独的线程感到满意。一个阻止读取标准输入,另一种则在您不希望阻止的位置进行。

The select module helps you determine where the next useful input is.

However, you’re almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don’t want blocked.


回答 17

为什么要打扰线程和队列?与readline()不同,BufferedReader.read1()不会阻塞等待\ r \ n,如果有任何输出进入,它将返回ASAP。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

回答 18

以我为例,我需要一个日志记录模块,该模块可以捕获后台应用程序的输出并对其进行扩充(添加时间戳,颜色等)。

我最后得到一个执行实际I / O的后台线程。以下代码仅适用于POSIX平台。我剥去了不必要的部分。

如果有人打算长期使用此野兽,请考虑管理开放描述符。就我而言,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

回答 19

我的问题有点不同,因为我想从正在运行的进程中同时收集stdout和stderr,但最终还是一样,因为我想在小部件中生成其生成的输出。

我不想诉诸使用Queues或其他线程的许多建议的解决方法,因为执行诸如运行另一个脚本并收集其输出之类的常见任务不需要它们。

阅读建议的解决方案和python文档后,我通过以下实现解决了我的问题。是的,它仅适用于POSIX,因为我正在使用select函数调用。

我同意这些文档令人困惑,并且对于这种常见的脚本编写任务而言,实现很尴尬。我认为python的旧版本具有不同的默认值Popen和不同的解释,因此造成了很多混乱。这对于Python 2.7.12和3.5.2似乎都很好。

关键是设置bufsize=1行缓冲,然后universal_newlines=True处理为文本文件,而不是二进制文件,而二进制文件似乎在设置时成为默认文件bufsize=1

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR,DEBUG和VERBOSE只是将输出打印到终端的宏。

该解决方案的IMHO 99.99%有效,因为它仍使用阻塞readline功能,因此我们认为子过程很好并且输出了完整的行。

我欢迎反馈以改进解决方案,因为我还是Python的新手。

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I’m using the select function call.

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

I welcome feedback to improve the solution as I am still new to Python.


回答 20

我已经基于JF Sebastian的解决方案创建了一个库。您可以使用它。

https://github.com/cenkalti/什么


回答 21

从JF Sebastian的答案以及其他几个方面的工作出发,我组成了一个简单的子流程管理器。它提供对请求的非阻塞读取,以及并行运行多个进程。它不使用任何操作系统特定的调用(据我所知),因此应该可以在任何地方使用。

可以从pypi获得,所以pip install shelljob。有关示例和完整文档,请参考项目页面

Working from J.F. Sebastian’s answer, and several other sources, I’ve put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn’t use any OS-specific call (that I’m aware) and thus should work anywhere.

It’s available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.


回答 22

编辑:此实现仍会阻止。请改用JFSebastian的答案

我尝试了最佳答案,但是线程代码的额外风险和维护令人担忧。

通过io模块(仅限于2.6),我发现BufferedReader。这是我的无线程,非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

EDIT: This implementation still blocks. Use J.F.Sebastian’s answer instead.

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

回答 23

我最近偶然发现了同一个问题,我需要在非阻塞模式下一次从流中读取一行(在子进程中运行尾部),我想避免下一个问题:不要刻录cpu,不要按一个字节读取流(就像readline一样),等等

这是我的实现 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 它不支持Windows(民意测验),不处理EOF,但是对我来说很好

I recently stumbled upon on the same problem I need to read one line at time from stream ( tail run in subprocess ) in non-blocking mode I wanted to avoid next problems: not to burn cpu, don’t read stream by one byte (like readline did ), etc

Here is my implementation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a it don’t support windows (poll), don’t handle EOF, but it works for me well


回答 24

这是在子进程中运行交互式命令的示例,并且stdout是使用伪终端进行交互的。您可以参考:https : //stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

回答 25

此解决方案使用 select模块从IO流“读取任何可用数据”。此功能最初会阻塞,直到有可用数据为止,然后才读取可用数据,并且不会进一步阻塞。

考虑到它使用select模块的事实,这仅适用于Unix。

该代码完全符合PEP8。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

This solution uses the select module to “read any available data” from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn’t block further.

Given the fact that it uses the select module, this only works on Unix.

The code is fully PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

回答 26

我也遇到了Jesse所描述的问题,并像BradleyAndy和其他人一样使用“选择”来解决该问题,但是以阻塞模式进行以避免繁忙的循环。它使用虚拟管道作为伪造的标准输入。选择阻塞并等待标准输入或管道准备就绪。按下键时,stdin会取消选择的阻塞,并且可以使用read(1)检索键值。当不同的线程写入管道时,管道将解除对选择的阻塞,并且可以将其视为对stdin的需求已结束的指示。这是一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

I also faced the problem described by Jesse and solved it by using “select” as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

回答 27

尝试wexpect,它是pexpect的Windows替代产品

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

Try wexpect, which is the windows alternative of pexpect.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

回答 28

在类似Unix的系统和Python 3.5+上os.set_blocking,它的功能完全符合其要求。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

os.set_blocking评论的是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

On Unix-like systems and Python 3.5+ there’s os.set_blocking which does exactly what it says.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

This outputs:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

With os.set_blocking commented it’s:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

回答 29

这是一个支持python中的非阻塞读取和后台写入的模块:

https://pypi.python.org/pypi/python-nonblock

提供功能

nonblock_read将从流中读取数据(如果有),否则返回一个空字符串(如果流的另一端关闭并且已读取所有可能的数据,则返回None)

您还可以考虑使用python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。因此,在从“ subprocess.Popen”返回的对象上添加了附加方法runInBackground。这将启动一个线程并返回一个对象,该对象将在将内容写入stdout / stderr时自动填充,而不会阻塞您的主线程。

请享用!

Here is a module that supports non-blocking reads and background writes in python:

https://pypi.python.org/pypi/python-nonblock

Provides a function,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

You may also consider the python-subprocess2 module,

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from “subprocess.Popen” is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

Enjoy!


运行shell命令并捕获输出

问题:运行shell命令并捕获输出

我想编写一个函数,它将执行shell命令并以字符串形式返回其输出,无论它是错误还是成功消息。我只想获得与命令行相同的结果。

能做到这一点的代码示例是什么?

例如:

def run_command(cmd):
    # ??????

print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'

I want to write a function that will execute a shell command and return its output as a string, no matter, is it an error or success message. I just want to get the same result that I would have gotten with the command line.

What would be a code example that would do such a thing?

For example:

def run_command(cmd):
    # ??????

print run_command('mysqladmin create test -uroot -pmysqladmin12')
# Should output something like:
# mysqladmin: CREATE DATABASE failed; error: 'Can't create database 'test'; database exists'

回答 0

这个问题的答案取决于您使用的Python版本。最简单的方法是使用以下subprocess.check_output功能:

>>> subprocess.check_output(['ls', '-l'])
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

check_output运行一个仅接受参数作为输入的程序。1它完全返回打印到的结果stdout。如果您需要将输入内容写入stdin,请跳至runPopen部分。如果要执行复杂的Shell命令,请参阅shell=True此答案末尾的注释。

check_output功能适用于仍在广泛使用的几乎所有版本的Python(2.7+)。2但对于较新的版本,不再推荐使用此方法。

现代版本的Python(3.5或更高版本): run

如果您使用的是Python 3.5或更高版本,并且不需要向后兼容,则建议使用run功能。它为该subprocess模块提供了一个非常通用的高级API 。要捕获程序的输出,请将subprocess.PIPE标志传递给stdout关键字参数。然后访问stdout返回CompletedProcess对象的属性:

>>> import subprocess
>>> result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
>>> result.stdout
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

返回值是一个bytes对象,因此,如果需要正确的字符串,则需要decode它。假设被调用的进程返回一个UTF-8编码的字符串:

>>> result.stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

所有这些都可以压缩为单线:

>>> subprocess.run(['ls', '-l'], stdout=subprocess.PIPE).stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

如果要将输入传递给流程的stdinbytes请将一个对象传递给input关键字参数:

>>> cmd = ['awk', 'length($0) > 5']
>>> input = 'foo\nfoofoo\n'.encode('utf-8')
>>> result = subprocess.run(cmd, stdout=subprocess.PIPE, input=input)
>>> result.stdout.decode('utf-8')
'foofoo\n'

您可以通过传递stderr=subprocess.PIPE(捕获到result.stderr)或stderr=subprocess.STDOUT(捕获到result.stdout常规输出)来捕获错误。如果不关心安全性,您还可以shell=True按照下面的说明通过传递来运行更复杂的Shell命令。

与旧的处理方式相比,这仅增加了一点复杂性。但是我认为值得这样做:现在,您仅需使用该run功能就可以完成几乎所有需要做的事情。

旧版本的Python(2.7-3.4): check_output

如果您使用的是旧版本的Python,或者需要适度的向后兼容性,则可以使用check_output上面简要介绍的函数。自python 2.7开始提供。

subprocess.check_output(*popenargs, **kwargs)  

它采用与Popen(请参见下文)相同的参数,并返回一个包含程序输出的字符串。该答案的开头有一个更详细的用法示例。在Python 3.5及更高版本中,check_output等效于run使用check=Truestdout=PIPE,仅返回stdout属性。

您可以通过stderr=subprocess.STDOUT确保错误信息包含在返回的输出-但在Python中通过一些版本stderr=subprocess.PIPE,以check_output可引起死锁。如果不关心安全性,您还可以shell=True按照下面的说明通过传递来运行更复杂的Shell命令。

如果您需要通过管道stderr传递输入或将输入传递给流程,check_output则将无法完成任务。Popen在这种情况下,请参见下面的示例。

复杂的应用程序和Python的旧版(2.6及以下版本): Popen

如果需要深入的向后兼容性,或者需要比check_output提供的功能更复杂的功能,则必须直接使用Popen对象,这些对象封装了用于子流程的低级API。

所述Popen构造器接受单个命令没有参数,或列表包含指令作为其第一项,其次是任意数量的参数,每个作为列表一个单独的项目。shlex.split可以帮助将字符串解析为格式正确的列表。Popen对象还接受用于进程IO管理和低级配置的许多不同参数

发送输入和捕获输出communicate几乎总是首选方法。如:

output = subprocess.Popen(["mycmd", "myarg"], 
                          stdout=subprocess.PIPE).communicate()[0]

要么

>>> import subprocess
>>> p = subprocess.Popen(['ls', '-a'], stdout=subprocess.PIPE, 
...                                    stderr=subprocess.PIPE)
>>> out, err = p.communicate()
>>> print out
.
..
foo

如果设置stdin=PIPEcommunicate还允许您通过以下方式将数据传递到流程stdin

>>> cmd = ['awk', 'length($0) > 5']
>>> p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
...                           stderr=subprocess.PIPE,
...                           stdin=subprocess.PIPE)
>>> out, err = p.communicate('foo\nfoofoo\n')
>>> print out
foofoo

艾伦·霍尔的回答,这表明在某些系统上,你可能需要设置stdoutstderr以及stdin所有PIPE(或DEVNULL)得到communicate工作的。

在极少数情况下,您可能需要复杂的实时输出捕获。Vartec的答案提出了一条前进的道路,但是communicate如果不谨慎使用,则其他方法都容易出现死锁。

与上述所有功能一样,当不考虑安全性时,可以通过传递运行更复杂的Shell命令shell=True

笔记

1.运行shell命令:shell=True参数

通常,对runcheck_outputPopen构造函数的每次调用都会执行一个程序。这意味着没有花哨的bash风格的管道。如果要运行复杂的Shell命令,则可以传递shell=True,这三个功能都支持。

但是,这样做会引起安全问题。如果您要做的不仅仅是轻脚本编写,那么最好单独调用每个进程,并将每个进程的输出作为输入通过以下方式传递给下一个进程:

run(cmd, [stdout=etc...], input=other_output)

要么

Popen(cmd, [stdout=etc...]).communicate(other_output)

直接连接管道的诱惑力很强;抵抗它。否则,您很可能会遇到僵局,或者不得不执行类似此类的骇人行为。

2. Unicode注意事项

check_output在Python 2中返回一个字符串,但bytes在Python 3中返回一个对象。如果您还没有花时间学习unicode,那么值得花一点时间。

The answer to this question depends on the version of Python you’re using. The simplest approach is to use the subprocess.check_output function:

>>> subprocess.check_output(['ls', '-l'])
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

check_output runs a single program that takes only arguments as input.1 It returns the result exactly as printed to stdout. If you need to write input to stdin, skip ahead to the run or Popen sections. If you want to execute complex shell commands, see the note on shell=True at the end of this answer.

The check_output function works on almost all versions of Python still in wide use (2.7+).2 But for more recent versions, it is no longer the recommended approach.

Modern versions of Python (3.5 or higher): run

If you’re using Python 3.5 or higher, and do not need backwards compatibility, the new run function is recommended. It provides a very general, high-level API for the subprocess module. To capture the output of a program, pass the subprocess.PIPE flag to the stdout keyword argument. Then access the stdout attribute of the returned CompletedProcess object:

>>> import subprocess
>>> result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
>>> result.stdout
b'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

The return value is a bytes object, so if you want a proper string, you’ll need to decode it. Assuming the called process returns a UTF-8-encoded string:

>>> result.stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

This can all be compressed to a one-liner:

>>> subprocess.run(['ls', '-l'], stdout=subprocess.PIPE).stdout.decode('utf-8')
'total 0\n-rw-r--r--  1 memyself  staff  0 Mar 14 11:04 files\n'

If you want to pass input to the process’s stdin, pass a bytes object to the input keyword argument:

>>> cmd = ['awk', 'length($0) > 5']
>>> input = 'foo\nfoofoo\n'.encode('utf-8')
>>> result = subprocess.run(cmd, stdout=subprocess.PIPE, input=input)
>>> result.stdout.decode('utf-8')
'foofoo\n'

You can capture errors by passing stderr=subprocess.PIPE (capture to result.stderr) or stderr=subprocess.STDOUT (capture to result.stdout along with regular output). When security is not a concern, you can also run more complex shell commands by passing shell=True as described in the notes below.

This adds just a bit of complexity, compared to the old way of doing things. But I think it’s worth the payoff: now you can do almost anything you need to do with the run function alone.

Older versions of Python (2.7-3.4): check_output

If you are using an older version of Python, or need modest backwards compatibility, you can probably use the check_output function as briefly described above. It has been available since Python 2.7.

subprocess.check_output(*popenargs, **kwargs)  

It takes takes the same arguments as Popen (see below), and returns a string containing the program’s output. The beginning of this answer has a more detailed usage example. In Python 3.5 and greater, check_output is equivalent to executing run with check=True and stdout=PIPE, and returning just the stdout attribute.

You can pass stderr=subprocess.STDOUT to ensure that error messages are included in the returned output — but in some versions of Python passing stderr=subprocess.PIPE to check_output can cause deadlocks. When security is not a concern, you can also run more complex shell commands by passing shell=True as described in the notes below.

If you need to pipe from stderr or pass input to the process, check_output won’t be up to the task. See the Popen examples below in that case.

Complex applications & legacy versions of Python (2.6 and below): Popen

If you need deep backwards compatibility, or if you need more sophisticated functionality than check_output provides, you’ll have to work directly with Popen objects, which encapsulate the low-level API for subprocesses.

The Popen constructor accepts either a single command without arguments, or a list containing a command as its first item, followed by any number of arguments, each as a separate item in the list. shlex.split can help parse strings into appropriately formatted lists. Popen objects also accept a host of different arguments for process IO management and low-level configuration.

To send input and capture output, communicate is almost always the preferred method. As in:

output = subprocess.Popen(["mycmd", "myarg"], 
                          stdout=subprocess.PIPE).communicate()[0]

Or

>>> import subprocess
>>> p = subprocess.Popen(['ls', '-a'], stdout=subprocess.PIPE, 
...                                    stderr=subprocess.PIPE)
>>> out, err = p.communicate()
>>> print out
.
..
foo

If you set stdin=PIPE, communicate also allows you to pass data to the process via stdin:

>>> cmd = ['awk', 'length($0) > 5']
>>> p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
...                           stderr=subprocess.PIPE,
...                           stdin=subprocess.PIPE)
>>> out, err = p.communicate('foo\nfoofoo\n')
>>> print out
foofoo

Note Aaron Hall’s answer, which indicates that on some systems, you may need to set stdout, stderr, and stdin all to PIPE (or DEVNULL) to get communicate to work at all.

In some rare cases, you may need complex, real-time output capturing. Vartec‘s answer suggests a way forward, but methods other than communicate are prone to deadlocks if not used carefully.

As with all the above functions, when security is not a concern, you can run more complex shell commands by passing shell=True.

Notes

1. Running shell commands: the shell=True argument

Normally, each call to run, check_output, or the Popen constructor executes a single program. That means no fancy bash-style pipes. If you want to run complex shell commands, you can pass shell=True, which all three functions support.

However, doing so raises security concerns. If you’re doing anything more than light scripting, you might be better off calling each process separately, and passing the output from each as an input to the next, via

run(cmd, [stdout=etc...], input=other_output)

Or

Popen(cmd, [stdout=etc...]).communicate(other_output)

The temptation to directly connect pipes is strong; resist it. Otherwise, you’ll likely see deadlocks or have to do hacky things like this.

2. Unicode considerations

check_output returns a string in Python 2, but a bytes object in Python 3. It’s worth taking a moment to learn about unicode if you haven’t already.


回答 1

这很容易,但仅适用于Unix(包括Cygwin)和Python2.7。

import commands
print commands.getstatusoutput('wc -l file')

它返回带有(return_value,output)的元组。

对于适用于Python2和Python3的解决方案,请改用subprocess模块:

from subprocess import Popen, PIPE
output = Popen(["date"],stdout=PIPE)
response = output.communicate()
print response

This is way easier, but only works on Unix (including Cygwin) and Python2.7.

import commands
print commands.getstatusoutput('wc -l file')

It returns a tuple with the (return_value, output).

For a solution that works in both Python2 and Python3, use the subprocess module instead:

from subprocess import Popen, PIPE
output = Popen(["date"],stdout=PIPE)
response = output.communicate()
print response

回答 2

像这样:

def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    while(True):
        # returns None while subprocess is running
        retcode = p.poll() 
        line = p.stdout.readline()
        yield line
        if retcode is not None:
            break

请注意,我正在将stderr重定向到stdout,它可能并非您想要的,但我也想要错误消息。

此函数逐行产生(通常,您必须等待子进程完成才能获得整体输出)。

对于您的情况,用法是:

for line in runProcess('mysqladmin create test -uroot -pmysqladmin12'.split()):
    print line,

Something like that:

def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    while(True):
        # returns None while subprocess is running
        retcode = p.poll() 
        line = p.stdout.readline()
        yield line
        if retcode is not None:
            break

Note, that I’m redirecting stderr to stdout, it might not be exactly what you want, but I want error messages also.

This function yields line by line as they come (normally you’d have to wait for subprocess to finish to get the output as a whole).

For your case the usage would be:

for line in runProcess('mysqladmin create test -uroot -pmysqladmin12'.split()):
    print line,

回答 3

Vartec的答案无法读取所有行,因此我制作了一个可以读取的版本:

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

用法与接受的答案相同:

command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
    print(line)

Vartec’s answer doesn’t read all lines, so I made a version that did:

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

Usage is the same as the accepted answer:

command = 'mysqladmin create test -uroot -pmysqladmin12'.split()
for line in run_command(command):
    print(line)

回答 4

这是一个棘手超级简单的解决方案,可在许多情况下使用:

import os
os.system('sample_cmd > tmp')
print open('tmp', 'r').read()

使用命令的输出创建一个临时文件(这里是tmp),您可以从中读取所需的输出。

注释中的额外说明:如果是一次性作业,则可以删除tmp文件。如果您需要多次执行此操作,则无需删除tmp。

os.remove('tmp')

This is a tricky but super simple solution which works in many situations:

import os
os.system('sample_cmd > tmp')
print open('tmp', 'r').read()

A temporary file(here is tmp) is created with the output of the command and you can read from it your desired output.

Extra note from the comments: You can remove the tmp file in the case of one-time job. If you need to do this several times, there is no need to delete the tmp.

os.remove('tmp')

回答 5

我遇到了同样的问题,但是想出了一种非常简单的方法:

import subprocess
output = subprocess.getoutput("ls -l")
print(output)

希望能帮上忙

注意:此解决方案特定subprocess.getoutput()于Python3,因为在Python2中不起作用

I had the same problem but figured out a very simple way of doing this:

import subprocess
output = subprocess.getoutput("ls -l")
print(output)

Hope it helps out

Note: This solution is Python3 specific as subprocess.getoutput() doesn’t work in Python2


回答 6

您可以使用以下命令来运行任何shell命令。我在ubuntu上使用过它们。

import os
os.popen('your command here').read()

注意:自python 2.6起不推荐使用。现在,您必须使用subprocess.Popen。以下是示例

import subprocess

p = subprocess.Popen("Your command", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
print p.split("\n")

You can use following commands to run any shell command. I have used them on ubuntu.

import os
os.popen('your command here').read()

Note: This is deprecated since python 2.6. Now you must use subprocess.Popen. Below is the example

import subprocess

p = subprocess.Popen("Your command", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[0]
print p.split("\n")

回答 7

您的里程可能会有所不同,我尝试使用@senderle在Windows 2.6.5上的Windows中使用Vartec的解决方案,但我遇到了错误,并且没有其他解决方案起作用。我的错误是:WindowsError: [Error 6] The handle is invalid

我发现必须将PIPE分配给每个句柄才能使其返回我期望的输出-以下内容对我有用。

import subprocess

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    return subprocess.Popen(cmd, 
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE).communicate()

并像这样调用([0]获取元组的第一个元素stdout):

run_command('tracert 11.1.0.1')[0]

学习更多之后,我相信我需要这些管道参数,因为我正在使用不同句柄的自定义系统上工作,因此必须直接控制所有std。

要停止控制台弹出窗口(在Windows中),请执行以下操作:

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    # instantiate a startupinfo obj:
    startupinfo = subprocess.STARTUPINFO()
    # set the use show window flag, might make conditional on being in Windows:
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    # pass as the startupinfo keyword argument:
    return subprocess.Popen(cmd,
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE, 
                            startupinfo=startupinfo).communicate()

run_command('tracert 11.1.0.1')

Your Mileage May Vary, I attempted @senderle’s spin on Vartec’s solution in Windows on Python 2.6.5, but I was getting errors, and no other solutions worked. My error was: WindowsError: [Error 6] The handle is invalid.

I found that I had to assign PIPE to every handle to get it to return the output I expected – the following worked for me.

import subprocess

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    return subprocess.Popen(cmd, 
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE).communicate()

and call like this, ([0] gets the first element of the tuple, stdout):

run_command('tracert 11.1.0.1')[0]

After learning more, I believe I need these pipe arguments because I’m working on a custom system that uses different handles, so I had to directly control all the std’s.

To stop console popups (with Windows), do this:

def run_command(cmd):
    """given shell command, returns communication tuple of stdout and stderr"""
    # instantiate a startupinfo obj:
    startupinfo = subprocess.STARTUPINFO()
    # set the use show window flag, might make conditional on being in Windows:
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    # pass as the startupinfo keyword argument:
    return subprocess.Popen(cmd,
                            stdout=subprocess.PIPE, 
                            stderr=subprocess.PIPE, 
                            stdin=subprocess.PIPE, 
                            startupinfo=startupinfo).communicate()

run_command('tracert 11.1.0.1')

回答 8

对于以下问题,我对同一问题的口味略有不同:

  1. 当STDOUT消息在STDOUT缓冲区中累积时(即实时)捕获并返回它们。
    • @vartec通过使用生成器和
      上面的’yield’ 关键字以Python方式解决了这个问题
  2. 打印所有STDOUT行(即使在可以完全读取STDOUT缓冲区之前退出进程
  3. 不要浪费CPU周期以高频率轮询进程
  4. 检查子流程的返回码
  5. 如果得到非零错误返回码,则打印STDERR(与STDOUT分开)。

我结合并调整了先前的答案,以得出以下结论:

import subprocess
from time import sleep

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         shell=True)
    # Read stdout from subprocess until the buffer is empty !
    for line in iter(p.stdout.readline, b''):
        if line: # Don't print blank lines
            yield line
    # This ensures the process has completed, AND sets the 'returncode' attr
    while p.poll() is None:                                                                                                                                        
        sleep(.1) #Don't waste CPU-cycles
    # Empty STDERR buffer
    err = p.stderr.read()
    if p.returncode != 0:
       # The run_command() function is responsible for logging STDERR 
       print("Error: " + str(err))

此代码将与以前的答案相同地执行:

for line in run_command(cmd):
    print(line)

I had a slightly different flavor of the same problem with the following requirements:

  1. Capture and return STDOUT messages as they accumulate in the STDOUT buffer (i.e. in realtime).
    • @vartec solved this Pythonically with his use of generators and the ‘yield’
      keyword above
  2. Print all STDOUT lines (even if process exits before STDOUT buffer can be fully read)
  3. Don’t waste CPU cycles polling the process at high-frequency
  4. Check the return code of the subprocess
  5. Print STDERR (separate from STDOUT) if we get a non-zero error return code.

I’ve combined and tweaked previous answers to come up with the following:

import subprocess
from time import sleep

def run_command(command):
    p = subprocess.Popen(command,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE,
                         shell=True)
    # Read stdout from subprocess until the buffer is empty !
    for line in iter(p.stdout.readline, b''):
        if line: # Don't print blank lines
            yield line
    # This ensures the process has completed, AND sets the 'returncode' attr
    while p.poll() is None:                                                                                                                                        
        sleep(.1) #Don't waste CPU-cycles
    # Empty STDERR buffer
    err = p.stderr.read()
    if p.returncode != 0:
       # The run_command() function is responsible for logging STDERR 
       print("Error: " + str(err))

This code would be executed the same as previous answers:

for line in run_command(cmd):
    print(line)

回答 9

拆分初始命令 subprocess可能会很棘手且麻烦。

采用 shlex.split()帮助自己。

样例命令

git log -n 5 --since "5 years ago" --until "2 year ago"

编码

from subprocess import check_output
from shlex import split

res = check_output(split('git log -n 5 --since "5 years ago" --until "2 year ago"'))
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

没有shlex.split()代码的话看起来如下

res = check_output([
    'git', 
    'log', 
    '-n', 
    '5', 
    '--since', 
    '5 years ago', 
    '--until', 
    '2 year ago'
])
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

Splitting the initial command for the subprocess might be tricky and cumbersome.

Use shlex.split() to help yourself out.

Sample command

git log -n 5 --since "5 years ago" --until "2 year ago"

The code

from subprocess import check_output
from shlex import split

res = check_output(split('git log -n 5 --since "5 years ago" --until "2 year ago"'))
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

Without shlex.split() the code would look as follows

res = check_output([
    'git', 
    'log', 
    '-n', 
    '5', 
    '--since', 
    '5 years ago', 
    '--until', 
    '2 year ago'
])
print(res)
>>> b'commit 7696ab087a163e084d6870bb4e5e4d4198bdc61a\nAuthor: Artur Barseghyan...'

回答 10

如果您需要在多个文件上运行一个shell命令,那么这对我就成功了。

import os
import subprocess

# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

# Get all filenames in working directory
for filename in os.listdir('./'):
    # This command will be run on each file
    cmd = 'nm ' + filename

    # Run the command and capture the output line by line.
    for line in runProcess(cmd.split()):
        # Eliminate leading and trailing whitespace
        line.strip()
        # Split the output 
        output = line.split()

        # Filter the output and print relevant lines
        if len(output) > 2:
            if ((output[2] == 'set_program_name')):
                print filename
                print line

编辑:刚刚看到了JF Sebastian的建议的Max Persson的解决方案。继续前进,并纳入。

If you need to run a shell command on multiple files, this did the trick for me.

import os
import subprocess

# Define a function for running commands and capturing stdout line by line
# (Modified from Vartec's solution because it wasn't printing all lines)
def runProcess(exe):    
    p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    return iter(p.stdout.readline, b'')

# Get all filenames in working directory
for filename in os.listdir('./'):
    # This command will be run on each file
    cmd = 'nm ' + filename

    # Run the command and capture the output line by line.
    for line in runProcess(cmd.split()):
        # Eliminate leading and trailing whitespace
        line.strip()
        # Split the output 
        output = line.split()

        # Filter the output and print relevant lines
        if len(output) > 2:
            if ((output[2] == 'set_program_name')):
                print filename
                print line

Edit: Just saw Max Persson’s solution with J.F. Sebastian’s suggestion. Went ahead and incorporated that.


回答 11

根据@senderle,如果您像我一样使用python3.6:

def sh(cmd, input=""):
    rst = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=input.encode("utf-8"))
    assert rst.returncode == 0, rst.stderr.decode("utf-8")
    return rst.stdout.decode("utf-8")
sh("ls -a")

就像您在bash中运行命令一样

According to @senderle, if you use python3.6 like me:

def sh(cmd, input=""):
    rst = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, input=input.encode("utf-8"))
    assert rst.returncode == 0, rst.stderr.decode("utf-8")
    return rst.stdout.decode("utf-8")
sh("ls -a")

Will act exactly like you run the command in bash


回答 12

如果您使用 subprocess python模块,则可以分别处理STDOUT,STDERR和命令的返回代码。您可以看到完整的命令调用程序实现的示例。当然,您可以根据需要扩展它try..except

下面的函数返回STDOUT,STDERR和Return代码,因此您可以在其他脚本中处理它们。

import subprocess

def command_caller(command=None)
    sp = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=False)
    out, err = sp.communicate()
    if sp.returncode:
        print(
            "Return code: %(ret_code)s Error message: %(err_msg)s"
            % {"ret_code": sp.returncode, "err_msg": err}
            )
    return sp.returncode, out, err

If you use the subprocess python module, you are able to handle the STDOUT, STDERR and return code of command separately. You can see an example for the complete command caller implementation. Of course you can extend it with try..except if you want.

The below function returns the STDOUT, STDERR and Return code so you can handle them in the other script.

import subprocess

def command_caller(command=None)
    sp = subprocess.Popen(command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=False)
    out, err = sp.communicate()
    if sp.returncode:
        print(
            "Return code: %(ret_code)s Error message: %(err_msg)s"
            % {"ret_code": sp.returncode, "err_msg": err}
            )
    return sp.returncode, out, err

回答 13

例如,execute(’ls -ahl’)区分了三种/四种可能的收益和OS平台:

  1. 无输出,但运行成功
  2. 输出空行,运行成功
  3. 运行失败
  4. 输出一些东西,成功运行

功能如下

def execute(cmd, output=True, DEBUG_MODE=False):
"""Executes a bash command.
(cmd, output=True)
output: whether print shell output to screen, only affects screen display, does not affect returned values
return: ...regardless of output=True/False...
        returns shell output as a list with each elment is a line of string (whitespace stripped both sides) from output
        could be 
        [], ie, len()=0 --> no output;    
        [''] --> output empty line;     
        None --> error occured, see below

        if error ocurs, returns None (ie, is None), print out the error message to screen
"""
if not DEBUG_MODE:
    print "Command: " + cmd

    # https://stackoverflow.com/a/40139101/2292993
    def _execute_cmd(cmd):
        if os.name == 'nt' or platform.system() == 'Windows':
            # set stdin, out, err all to PIPE to get results (other than None) after run the Popen() instance
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        else:
            # Use bash; the default is sh
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")

        # the Popen() instance starts running once instantiated (??)
        # additionally, communicate(), or poll() and wait process to terminate
        # communicate() accepts optional input as stdin to the pipe (requires setting stdin=subprocess.PIPE above), return out, err as tuple
        # if communicate(), the results are buffered in memory

        # Read stdout from subprocess until the buffer is empty !
        # if error occurs, the stdout is '', which means the below loop is essentially skipped
        # A prefix of 'b' or 'B' is ignored in Python 2; 
        # it indicates that the literal should become a bytes literal in Python 3 
        # (e.g. when code is automatically converted with 2to3).
        # return iter(p.stdout.readline, b'')
        for line in iter(p.stdout.readline, b''):
            # # Windows has \r\n, Unix has \n, Old mac has \r
            # if line not in ['','\n','\r','\r\n']: # Don't print blank lines
                yield line
        while p.poll() is None:                                                                                                                                        
            sleep(.1) #Don't waste CPU-cycles
        # Empty STDERR buffer
        err = p.stderr.read()
        if p.returncode != 0:
            # responsible for logging STDERR 
            print("Error: " + str(err))
            yield None

    out = []
    for line in _execute_cmd(cmd):
        # error did not occur earlier
        if line is not None:
            # trailing comma to avoid a newline (by print itself) being printed
            if output: print line,
            out.append(line.strip())
        else:
            # error occured earlier
            out = None
    return out
else:
    print "Simulation! The command is " + cmd
    print ""

eg, execute(‘ls -ahl’) differentiated three/four possible returns and OS platforms:

  1. no output, but run successfully
  2. output empty line, run successfully
  3. run failed
  4. output something, run successfully

function below

def execute(cmd, output=True, DEBUG_MODE=False):
"""Executes a bash command.
(cmd, output=True)
output: whether print shell output to screen, only affects screen display, does not affect returned values
return: ...regardless of output=True/False...
        returns shell output as a list with each elment is a line of string (whitespace stripped both sides) from output
        could be 
        [], ie, len()=0 --> no output;    
        [''] --> output empty line;     
        None --> error occured, see below

        if error ocurs, returns None (ie, is None), print out the error message to screen
"""
if not DEBUG_MODE:
    print "Command: " + cmd

    # https://stackoverflow.com/a/40139101/2292993
    def _execute_cmd(cmd):
        if os.name == 'nt' or platform.system() == 'Windows':
            # set stdin, out, err all to PIPE to get results (other than None) after run the Popen() instance
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
        else:
            # Use bash; the default is sh
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable="/bin/bash")

        # the Popen() instance starts running once instantiated (??)
        # additionally, communicate(), or poll() and wait process to terminate
        # communicate() accepts optional input as stdin to the pipe (requires setting stdin=subprocess.PIPE above), return out, err as tuple
        # if communicate(), the results are buffered in memory

        # Read stdout from subprocess until the buffer is empty !
        # if error occurs, the stdout is '', which means the below loop is essentially skipped
        # A prefix of 'b' or 'B' is ignored in Python 2; 
        # it indicates that the literal should become a bytes literal in Python 3 
        # (e.g. when code is automatically converted with 2to3).
        # return iter(p.stdout.readline, b'')
        for line in iter(p.stdout.readline, b''):
            # # Windows has \r\n, Unix has \n, Old mac has \r
            # if line not in ['','\n','\r','\r\n']: # Don't print blank lines
                yield line
        while p.poll() is None:                                                                                                                                        
            sleep(.1) #Don't waste CPU-cycles
        # Empty STDERR buffer
        err = p.stderr.read()
        if p.returncode != 0:
            # responsible for logging STDERR 
            print("Error: " + str(err))
            yield None

    out = []
    for line in _execute_cmd(cmd):
        # error did not occur earlier
        if line is not None:
            # trailing comma to avoid a newline (by print itself) being printed
            if output: print line,
            out.append(line.strip())
        else:
            # error occured earlier
            out = None
    return out
else:
    print "Simulation! The command is " + cmd
    print ""

回答 14

可以将输出重定向到文本文件,然后将其读回。

import subprocess
import os
import tempfile

def execute_to_file(command):
    """
    This function execute the command
    and pass its output to a tempfile then read it back
    It is usefull for process that deploy child process
    """
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()
    path = temp_file.name
    command = command + " > " + path
    proc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    if proc.stderr:
        # if command failed return
        os.unlink(path)
        return
    with open(path, 'r') as f:
        data = f.read()
    os.unlink(path)
    return data

if __name__ == "__main__":
    path = "Somepath"
    command = 'ecls.exe /files ' + path
    print(execute(command))

The output can be redirected to a text file and then read it back.

import subprocess
import os
import tempfile

def execute_to_file(command):
    """
    This function execute the command
    and pass its output to a tempfile then read it back
    It is usefull for process that deploy child process
    """
    temp_file = tempfile.NamedTemporaryFile(delete=False)
    temp_file.close()
    path = temp_file.name
    command = command + " > " + path
    proc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
    if proc.stderr:
        # if command failed return
        os.unlink(path)
        return
    with open(path, 'r') as f:
        data = f.read()
    os.unlink(path)
    return data

if __name__ == "__main__":
    path = "Somepath"
    command = 'ecls.exe /files ' + path
    print(execute(command))

回答 15

刚刚写了一个小的bash脚本来使用curl做到这一点

https://gist.github.com/harish2704/bfb8abece94893c53ce344548ead8ba5

#!/usr/bin/env bash

# Usage: gdrive_dl.sh <url>

urlBase='https://drive.google.com'
fCookie=tmpcookies

curl="curl -L -b $fCookie -c $fCookie"
confirm(){
    $curl "$1" | grep jfk-button-action | sed -e 's/.*jfk-button-action" href="\(\S*\)".*/\1/' -e 's/\&amp;/\&/g'
}

$curl -O -J "${urlBase}$(confirm $1)"

just wrote a small bash script to do this using curl

https://gist.github.com/harish2704/bfb8abece94893c53ce344548ead8ba5

#!/usr/bin/env bash

# Usage: gdrive_dl.sh <url>

urlBase='https://drive.google.com'
fCookie=tmpcookies

curl="curl -L -b $fCookie -c $fCookie"
confirm(){
    $curl "$1" | grep jfk-button-action | sed -e 's/.*jfk-button-action" href="\(\S*\)".*/\1/' -e 's/\&amp;/\&/g'
}

$curl -O -J "${urlBase}$(confirm $1)"

从Python调用外部命令

问题:从Python调用外部命令

您如何在Python脚本中调用外部命令(就像我在Unix Shell或Windows命令提示符下键入的一样)?

How do you call an external command (as if I’d typed it at the Unix shell or Windows command prompt) from within a Python script?


回答 0

查看标准库中的子流程模块:

import subprocess
subprocess.run(["ls", "-l"])

的优势subprocess主场迎战system的是,它是更灵活(你可以得到的stdoutstderr,“真正”的状态代码,更好的错误处理,等等)。

官方文件建议subprocess在替代模块os.system()

subprocess模块提供了更强大的功能来生成新流程并检索其结果。使用该模块优于使用此功能[ os.system()]。

与子模块更换旧的功能中部分subprocess文件可能有一些有益的食谱。

对于3.5之前的Python版本,请使用call

import subprocess
subprocess.call(["ls", "-l"])

Look at the subprocess module in the standard library:

import subprocess
subprocess.run(["ls", "-l"])

The advantage of subprocess vs. system is that it is more flexible (you can get the stdout, stderr, the “real” status code, better error handling, etc…).

The official documentation recommends the subprocess module over the alternative os.system():

The subprocess module provides more powerful facilities for spawning new processes and retrieving their results; using that module is preferable to using this function [os.system()].

The Replacing Older Functions with the subprocess Module section in the subprocess documentation may have some helpful recipes.

For versions of Python before 3.5, use call:

import subprocess
subprocess.call(["ls", "-l"])

回答 1

下面总结了调用外部程序的方法以及每种方法的优缺点:

  1. os.system("some_command with args")将命令和参数传递到系统的外壳程序。很好,因为您实际上可以以这种方式一次运行多个命令,并设置管道和输入/输出重定向。例如:

    os.system("some_command < input_file | another_command > output_file")  

但是,尽管这样做很方便,但您必须手动处理转义字符(例如空格等)的外壳字符。另一方面,这也使您可以运行仅是外壳程序命令而非实际上是外部程序的命令。请参阅文档

  1. stream = os.popen("some_command with args")os.system除了会为您提供类似于文件的对象之外,您可以使用该对象来访问该过程的标准输入/输出,它的作用与之相同。Popen还有其他3种变体,它们对I / O的处理略有不同。如果您将所有内容都作为字符串传递,那么您的命令将传递到外壳程序;如果将它们作为列表传递,则无需担心转义任何内容。请参阅文档

  2. 模块的Popensubprocess。它旨在替代它,os.popen但缺点是由于太全面而使它稍微复杂一些。例如,您会说:

    print subprocess.Popen("echo Hello World", shell=True, stdout=subprocess.PIPE).stdout.read()

    代替:

    print os.popen("echo Hello World").read()

    但是将所有选项都放在一个统一的类中而不是4个不同的popen函数是一件好事。请参阅文档

  3. call来自subprocess模块的功能。基本上就像Popen类一样,并接受所有相同的参数,但是它只是等待命令完成并提供返回代码。例如:

    return_code = subprocess.call("echo Hello World", shell=True)  

    请参阅文档

  4. 如果您使用的是Python 3.5或更高版本,则可以使用新subprocess.run函数,该函数与上面的代码非常相似,但是更加灵活,并CompletedProcess在命令完成执行后返回一个对象。

  5. os模块还具有您在C程序中拥有的所有fork / exec / spawn函数,但是我不建议直接使用它们。

subprocess模块可能是您所使用的模块。

最后,请注意,对于所有方法,在这些方法中,您将要由外壳执行的最终命令作为字符串传递给您,并且您有责任对其进行转义。如果您传递的字符串的任何部分不能被完全信任,则将带来严重的安全隐患。例如,如果用户正在输入字符串的某些/任何部分。如果不确定,请仅将这些方法与常量一起使用。为了给您暗示的含义,请考虑以下代码:

print subprocess.Popen("echo %s " % user_input, stdout=PIPE).stdout.read()

并想象用户输入了“我的妈妈不爱我&& rm -rf /”这可能会擦除整个文件系统的信息。

Here’s a summary of the ways to call external programs and the advantages and disadvantages of each:

  1. os.system("some_command with args") passes the command and arguments to your system’s shell. This is nice because you can actually run multiple commands at once in this manner and set up pipes and input/output redirection. For example:

    os.system("some_command < input_file | another_command > output_file")  
    

However, while this is convenient, you have to manually handle the escaping of shell characters such as spaces, etc. On the other hand, this also lets you run commands which are simply shell commands and not actually external programs. See the documentation.

  1. stream = os.popen("some_command with args") will do the same thing as os.system except that it gives you a file-like object that you can use to access standard input/output for that process. There are 3 other variants of popen that all handle the i/o slightly differently. If you pass everything as a string, then your command is passed to the shell; if you pass them as a list then you don’t need to worry about escaping anything. See the documentation.

  2. The Popen class of the subprocess module. This is intended as a replacement for os.popen but has the downside of being slightly more complicated by virtue of being so comprehensive. For example, you’d say:

    print subprocess.Popen("echo Hello World", shell=True, stdout=subprocess.PIPE).stdout.read()
    

    instead of:

    print os.popen("echo Hello World").read()
    

    but it is nice to have all of the options there in one unified class instead of 4 different popen functions. See the documentation.

  3. The call function from the subprocess module. This is basically just like the Popen class and takes all of the same arguments, but it simply waits until the command completes and gives you the return code. For example:

    return_code = subprocess.call("echo Hello World", shell=True)  
    

    See the documentation.

  4. If you’re on Python 3.5 or later, you can use the new subprocess.run function, which is a lot like the above but even more flexible and returns a CompletedProcess object when the command finishes executing.

  5. The os module also has all of the fork/exec/spawn functions that you’d have in a C program, but I don’t recommend using them directly.

The subprocess module should probably be what you use.

Finally please be aware that for all methods where you pass the final command to be executed by the shell as a string and you are responsible for escaping it. There are serious security implications if any part of the string that you pass can not be fully trusted. For example, if a user is entering some/any part of the string. If you are unsure, only use these methods with constants. To give you a hint of the implications consider this code:

print subprocess.Popen("echo %s " % user_input, stdout=PIPE).stdout.read()

and imagine that the user enters something “my mama didnt love me && rm -rf /” which could erase the whole filesystem.


回答 2

典型的实现:

import subprocess

p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
    print line,
retval = p.wait()

您可以随意使用stdout管道中的数据进行所需的操作。实际上,您可以简单地省略这些参数(stdout=stderr=),其行为类似于os.system()

Typical implementation:

import subprocess

p = subprocess.Popen('ls', shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
for line in p.stdout.readlines():
    print line,
retval = p.wait()

You are free to do what you want with the stdout data in the pipe. In fact, you can simply omit those parameters (stdout= and stderr=) and it’ll behave like os.system().


回答 3

关于从调用者中分离子进程的一些提示(在后台启动子进程)。

假设您要从CGI脚本开始一个长任务。也就是说,子进程的生存期应比CGI脚本执行进程的生存期长。

子流程模块文档中的经典示例是:

import subprocess
import sys

# Some code here

pid = subprocess.Popen([sys.executable, "longtask.py"]) # Call subprocess

# Some more code here

这里的想法是,您不想在long任务.py完成之前在“调用子进程”行中等待。但是尚不清楚示例中“这里有更多代码”行之后会发生什么。

我的目标平台是FreeBSD,但是开发是在Windows上进行的,因此我首先在Windows上遇到了问题。

在Windows(Windows XP)上,直到longtask.py完成工作后,父进程才会完成。这不是CGI脚本中想要的。这个问题不是特定于Python的。在PHP社区中,问题是相同的。

解决方案是将DETACHED_PROCESS 进程创建标志传递给Windows API中的基础CreateProcess函数。如果碰巧安装了pywin32,则可以从win32process模块​​中导入该标志,否则您应该自己定义它:

DETACHED_PROCESS = 0x00000008

pid = subprocess.Popen([sys.executable, "longtask.py"],
                       creationflags=DETACHED_PROCESS).pid

/ * UPD 2015.10.27 @eryksun在下面的注释中指出,语义正确的标志是CREATE_NEW_CONSOLE(0x00000010)* /

在FreeBSD上,我们还有另一个问题:父进程完成后,它也会完成子进程。那也不是您在CGI脚本中想要的。一些实验表明,问题似乎出在共享sys.stdout。可行的解决方案如下:

pid = subprocess.Popen([sys.executable, "longtask.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)

我没有检查其他平台上的代码,也不知道FreeBSD上该行为的原因。如果有人知道,请分享您的想法。谷歌搜索在Python中启动后台进程尚未阐明。

Some hints on detaching the child process from the calling one (starting the child process in background).

Suppose you want to start a long task from a CGI script. That is, the child process should live longer than the CGI script execution process.

The classical example from the subprocess module documentation is:

import subprocess
import sys

# Some code here

pid = subprocess.Popen([sys.executable, "longtask.py"]) # Call subprocess

# Some more code here

The idea here is that you do not want to wait in the line ‘call subprocess’ until the longtask.py is finished. But it is not clear what happens after the line ‘some more code here’ from the example.

My target platform was FreeBSD, but the development was on Windows, so I faced the problem on Windows first.

On Windows (Windows XP), the parent process will not finish until the longtask.py has finished its work. It is not what you want in a CGI script. The problem is not specific to Python; in the PHP community the problems are the same.

The solution is to pass DETACHED_PROCESS Process Creation Flag to the underlying CreateProcess function in Windows API. If you happen to have installed pywin32, you can import the flag from the win32process module, otherwise you should define it yourself:

DETACHED_PROCESS = 0x00000008

pid = subprocess.Popen([sys.executable, "longtask.py"],
                       creationflags=DETACHED_PROCESS).pid

/* UPD 2015.10.27 @eryksun in a comment below notes, that the semantically correct flag is CREATE_NEW_CONSOLE (0x00000010) */

On FreeBSD we have another problem: when the parent process is finished, it finishes the child processes as well. And that is not what you want in a CGI script either. Some experiments showed that the problem seemed to be in sharing sys.stdout. And the working solution was the following:

pid = subprocess.Popen([sys.executable, "longtask.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)

I have not checked the code on other platforms and do not know the reasons of the behaviour on FreeBSD. If anyone knows, please share your ideas. Googling on starting background processes in Python does not shed any light yet.


回答 4

import os
os.system("your command")

请注意,这很危险,因为未清除命令。我留给你去谷歌搜索有关“操作系统”和“系统”模块的相关文档。有很多函数(exec *和spawn *)将执行类似的操作。

import os
os.system("your command")

Note that this is dangerous, since the command isn’t cleaned. I leave it up to you to google for the relevant documentation on the ‘os’ and ‘sys’ modules. There are a bunch of functions (exec* and spawn*) that will do similar things.


回答 5

我建议你使用模块,而不是使用os.system因为它没有外壳逃避你,因此更加安全。

subprocess.call(['ping', 'localhost'])

I’d recommend using the subprocess module instead of os.system because it does shell escaping for you and is therefore much safer.

subprocess.call(['ping', 'localhost'])

回答 6

import os
cmd = 'ls -al'
os.system(cmd)

如果要返回命令的结果,可以使用os.popen。但是,自2.6版以来,不推荐使用此方法,而支持subprocess模块,其他答案已经很好地涵盖了这一点。

import os
cmd = 'ls -al'
os.system(cmd)

If you want to return the results of the command, you can use os.popen. However, this is deprecated since version 2.6 in favor of the subprocess module, which other answers have covered well.


回答 7

有很多不同的库,可让您使用Python调用外部命令。对于每个库,我都进行了描述并显示了调用外部命令的示例。我用作示例的命令是ls -l(列出所有文件)。如果您想了解有关任何库的更多信息,我已列出并链接了每个库的文档。

资料来源:

这些都是库:

希望这可以帮助您决定使用哪个库:)

子过程

子进程允许您调用外部命令,并将其连接到它们的输入/输出/错误管道(stdin,stdout和stderr)。子进程是运行命令的默认选择,但有时其他模块更好。

subprocess.run(["ls", "-l"]) # Run command
subprocess.run(["ls", "-l"], stdout=subprocess.PIPE) # This will run the command and return any output
subprocess.run(shlex.split("ls -l")) # You can also use the shlex library to split the command

操作系统

os用于“取决于操作系统的功能”。它也可以用于通过os.system和调用外部命令os.popen(注意:还有一个subprocess.popen)。os将始终运行该shell,对于不需要或不知道如何使用的人来说,它是一个简单的选择subprocess.run

os.system("ls -l") # run command
os.popen("ls -l").read() # This will run the command and return any output

SH

sh是一个子进程接口,可让您像调用程序一样调用程序。如果要多次运行命令,这很有用。

sh.ls("-l") # Run command normally
ls_cmd = sh.Command("ls") # Save command as a variable
ls_cmd() # Run command as if it were a function

plumbum是用于“类似脚本”的Python程序的库。您可以像中的函数那样调用程序sh。如果您要在没有外壳的情况下运行管道,则Plumbum很有用。

ls_cmd = plumbum.local("ls -l") # get command
ls_cmd() # run command

期待

pexpect使您可以生成子应用程序,对其进行控制并在其输出中找到模式。对于在Unix上需要tty的命令,这是子过程的更好选择。

pexpect.run("ls -l") # Run command as normal
child = pexpect.spawn('scp foo user@example.com:.') # Spawns child application
child.expect('Password:') # When this is the output
child.sendline('mypassword')

fabric是一个Python 2.5和2.7库。它允许您执行本地和远程Shell命令。Fabric是在安全Shell(SSH)中运行命令的简单替代方案

fabric.operations.local('ls -l') # Run command as normal
fabric.operations.local('ls -l', capture = True) # Run command and receive output

使者

特使被称为“人类子过程”。它用作subprocess模块周围的便利包装。

r = envoy.run("ls -l") # Run command
r.std_out # get output

命令

commands包含的包装函数os.popen,但由于subprocess是更好的选择,它已从Python 3中删除。

该编辑基于JF Sebastian的评论。

There are lots of different libraries which allow you to call external commands with Python. For each library I’ve given a description and shown an example of calling an external command. The command I used as the example is ls -l (list all files). If you want to find out more about any of the libraries I’ve listed and linked the documentation for each of them.

Sources:

These are all the libraries:

Hopefully this will help you make a decision on which library to use :)

subprocess

Subprocess allows you to call external commands and connect them to their input/output/error pipes (stdin, stdout, and stderr). Subprocess is the default choice for running commands, but sometimes other modules are better.

subprocess.run(["ls", "-l"]) # Run command
subprocess.run(["ls", "-l"], stdout=subprocess.PIPE) # This will run the command and return any output
subprocess.run(shlex.split("ls -l")) # You can also use the shlex library to split the command

os

os is used for “operating system dependent functionality”. It can also be used to call external commands with os.system and os.popen (Note: There is also a subprocess.popen). os will always run the shell and is a simple alternative for people who don’t need to, or don’t know how to use subprocess.run.

os.system("ls -l") # run command
os.popen("ls -l").read() # This will run the command and return any output

sh

sh is a subprocess interface which lets you call programs as if they were functions. This is useful if you want to run a command multiple times.

sh.ls("-l") # Run command normally
ls_cmd = sh.Command("ls") # Save command as a variable
ls_cmd() # Run command as if it were a function

plumbum

plumbum is a library for “script-like” Python programs. You can call programs like functions as in sh. Plumbum is useful if you want to run a pipeline without the shell.

ls_cmd = plumbum.local("ls -l") # get command
ls_cmd() # run command

pexpect

pexpect lets you spawn child applications, control them and find patterns in their output. This is a better alternative to subprocess for commands that expect a tty on Unix.

pexpect.run("ls -l") # Run command as normal
child = pexpect.spawn('scp foo user@example.com:.') # Spawns child application
child.expect('Password:') # When this is the output
child.sendline('mypassword')

fabric

fabric is a Python 2.5 and 2.7 library. It allows you to execute local and remote shell commands. Fabric is simple alternative for running commands in a secure shell (SSH)

fabric.operations.local('ls -l') # Run command as normal
fabric.operations.local('ls -l', capture = True) # Run command and receive output

envoy

envoy is known as “subprocess for humans”. It is used as a convenience wrapper around the subprocess module.

r = envoy.run("ls -l") # Run command
r.std_out # get output

commands

commands contains wrapper functions for os.popen, but it has been removed from Python 3 since subprocess is a better alternative.

The edit was based on J.F. Sebastian’s comment.


回答 8

我总是用fabric这样的东西:

from fabric.operations import local
result = local('ls', capture=True)
print "Content:/n%s" % (result, )

但这似乎是一个很好的工具:sh(Python子进程接口)

看一个例子:

from sh import vgdisplay
print vgdisplay()
print vgdisplay('-v')
print vgdisplay(v=True)

I always use fabric for this things like:

from fabric.operations import local
result = local('ls', capture=True)
print "Content:/n%s" % (result, )

But this seem to be a good tool: sh (Python subprocess interface).

Look at an example:

from sh import vgdisplay
print vgdisplay()
print vgdisplay('-v')
print vgdisplay(v=True)

回答 9

还要检查“ pexpect” Python库。

它允许交互式控制外部程序/命令,甚至包括ssh,ftp,telnet等。您可以键入以下内容:

child = pexpect.spawn('ftp 192.168.0.24')

child.expect('(?i)name .*: ')

child.sendline('anonymous')

child.expect('(?i)password')

Check the “pexpect” Python library, too.

It allows for interactive controlling of external programs/commands, even ssh, ftp, telnet, etc. You can just type something like:

child = pexpect.spawn('ftp 192.168.0.24')

child.expect('(?i)name .*: ')

child.sendline('anonymous')

child.expect('(?i)password')

回答 10

与标准库

使用子流程模块(Python 3):

import subprocess
subprocess.run(['ls', '-l'])

这是推荐的标准方式。但是,构造和编写更复杂的任务(管道,输出,输入等)可能很繁琐。

关于Python版本的注意事项:如果您仍在使用Python 2,subprocess.call的工作方式与此类似。

ProTip:shlex.split可以帮助您解析,和其他功能的命令run,以防您不想(或您不能!)以列表形式提供它们:callsubprocess

import shlex
import subprocess
subprocess.run(shlex.split('ls -l'))

具有外部依赖性

如果您不介意外部依赖性,请使用plumbum

from plumbum.cmd import ifconfig
print(ifconfig['wlan0']())

这是最好的subprocess包装纸。它是跨平台的,即可以在Windows和类似Unix的系统上运行。由安装pip install plumbum

另一个受欢迎的图书馆是sh

from sh import ifconfig
print(ifconfig('wlan0'))

但是,已sh放弃Windows支持,因此它不像以前那样出色。由安装pip install sh

With the standard library

Use the subprocess module (Python 3):

import subprocess
subprocess.run(['ls', '-l'])

It is the recommended standard way. However, more complicated tasks (pipes, output, input, etc.) can be tedious to construct and write.

Note on Python version: If you are still using Python 2, subprocess.call works in a similar way.

ProTip: shlex.split can help you to parse the command for run, call, and other subprocess functions in case you don’t want (or you can’t!) provide them in form of lists:

import shlex
import subprocess
subprocess.run(shlex.split('ls -l'))

With external dependencies

If you do not mind external dependencies, use plumbum:

from plumbum.cmd import ifconfig
print(ifconfig['wlan0']())

It is the best subprocess wrapper. It’s cross-platform, i.e. it works on both Windows and Unix-like systems. Install by pip install plumbum.

Another popular library is sh:

from sh import ifconfig
print(ifconfig('wlan0'))

However, sh dropped Windows support, so it’s not as awesome as it used to be. Install by pip install sh.


回答 11

如果您需要从您所呼叫的命令的输出,那么你可以使用subprocess.check_output(Python的2.7+)。

>>> subprocess.check_output(["ls", "-l", "/dev/null"])
'crw-rw-rw- 1 root root 1, 3 Oct 18  2007 /dev/null\n'

还要注意shell参数。

如果shell是True,则将通过Shell执行指定的命令。如果您主要将Python用于大多数系统外壳程序提供的增强控制流,并且仍希望方便地访问其他外壳程序功能(例如外壳程序管道,文件名通配符,环境变量扩展以及〜扩展到用户的家),则这可能很有用。目录。但是请注意,Python本身提供的实现多壳状的功能(尤其是globfnmatchos.walk()os.path.expandvars()os.path.expanduser(),和shutil)。

If you need the output from the command you are calling, then you can use subprocess.check_output (Python 2.7+).

>>> subprocess.check_output(["ls", "-l", "/dev/null"])
'crw-rw-rw- 1 root root 1, 3 Oct 18  2007 /dev/null\n'

Also note the shell parameter.

If shell is True, the specified command will be executed through the shell. This can be useful if you are using Python primarily for the enhanced control flow it offers over most system shells and still want convenient access to other shell features such as shell pipes, filename wildcards, environment variable expansion, and expansion of ~ to a user’s home directory. However, note that Python itself offers implementations of many shell-like features (in particular, glob, fnmatch, os.walk(), os.path.expandvars(), os.path.expanduser(), and shutil).


回答 12

这就是我运行命令的方式。这段代码包含了您非常需要的所有内容

from subprocess import Popen, PIPE
cmd = "ls -l ~/"
p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
print "Return code: ", p.returncode
print out.rstrip(), err.rstrip()

This is how I run my commands. This code has everything you need pretty much

from subprocess import Popen, PIPE
cmd = "ls -l ~/"
p = Popen(cmd , shell=True, stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
print "Return code: ", p.returncode
print out.rstrip(), err.rstrip()

回答 13

更新:

subprocess.run如果您的代码不需要保持与早期Python版本的兼容性,则从python 3.5开始,建议使用此方法。它更加一致,并且提供与Envoy类似的易用性。(管道并不是那么简单。有关如何查看此问题的信息。)

以下是文档中的一些示例。

运行一个过程:

>>> subprocess.run(["ls", "-l"])  # Doesn't capture output
CompletedProcess(args=['ls', '-l'], returncode=0)

在失败的跑步上加薪:

>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

捕获输出:

>>> subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n')

原始答案:

我建议尝试Envoy。它是子流程的包装,后者旨在替换较旧的模块和功能。特使是人类的子过程。

自述文件中的示例用法:

>>> r = envoy.run('git config', data='data to pipe in', timeout=2)

>>> r.status_code
129
>>> r.std_out
'usage: git config [options]'
>>> r.std_err
''

管道周围的东西:

>>> r = envoy.run('uptime | pbcopy')

>>> r.command
'pbcopy'
>>> r.status_code
0

>>> r.history
[<Response 'uptime'>]

Update:

subprocess.run is the recommended approach as of Python 3.5 if your code does not need to maintain compatibility with earlier Python versions. It’s more consistent and offers similar ease-of-use as Envoy. (Piping isn’t as straightforward though. See this question for how.)

Here’s some examples from the documentation.

Run a process:

>>> subprocess.run(["ls", "-l"])  # Doesn't capture output
CompletedProcess(args=['ls', '-l'], returncode=0)

Raise on failed run:

>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

Capture output:

>>> subprocess.run(["ls", "-l", "/dev/null"], stdout=subprocess.PIPE)
CompletedProcess(args=['ls', '-l', '/dev/null'], returncode=0,
stdout=b'crw-rw-rw- 1 root root 1, 3 Jan 23 16:23 /dev/null\n')

Original answer:

I recommend trying Envoy. It’s a wrapper for subprocess, which in turn aims to replace the older modules and functions. Envoy is subprocess for humans.

Example usage from the README:

>>> r = envoy.run('git config', data='data to pipe in', timeout=2)

>>> r.status_code
129
>>> r.std_out
'usage: git config [options]'
>>> r.std_err
''

Pipe stuff around too:

>>> r = envoy.run('uptime | pbcopy')

>>> r.command
'pbcopy'
>>> r.status_code
0

>>> r.history
[<Response 'uptime'>]

回答 14

使用子过程

…或一个非常简单的命令:

import os
os.system('cat testfile')

Use subprocess.

…or for a very simple command:

import os
os.system('cat testfile')

回答 15

在Python中调用外部命令

简单,使用subprocess.run,它返回一个CompletedProcess对象:

>>> import subprocess
>>> completed_process = subprocess.run('python --version')
Python 3.6.1 :: Anaconda 4.4.0 (64-bit)
>>> completed_process
CompletedProcess(args='python --version', returncode=0)

为什么?

从Python 3.5开始,文档建议使用subprocess.run

推荐的调用子流程的方法是将run()函数用于它可以处理的所有用例。对于更高级的用例,可以直接使用基础Popen接口。

这是最简单的用法示例-完全按照要求进行:

>>> import subprocess
>>> completed_process = subprocess.run('python --version')
Python 3.6.1 :: Anaconda 4.4.0 (64-bit)
>>> completed_process
CompletedProcess(args='python --version', returncode=0)

run等待命令成功完成,然后返回一个CompletedProcess对象。相反,它可以引发TimeoutExpired(如果您给它提供一个timeout=参数)或CalledProcessError(如果失败并通过check=True)。

从上面的示例可以推断,默认情况下,stdout和stderr都通过管道传递到您自己的stdout和stderr。

我们可以检查返回的对象,并查看给出的命令和返回码:

>>> completed_process.args
'python --version'
>>> completed_process.returncode
0

捕获输出

如果要捕获输出,则可以传递subprocess.PIPE给相应的stderrstdout

>>> cp = subprocess.run('python --version', 
                        stderr=subprocess.PIPE, 
                        stdout=subprocess.PIPE)
>>> cp.stderr
b'Python 3.6.1 :: Anaconda 4.4.0 (64-bit)\r\n'
>>> cp.stdout
b''

(我发现将版本信息放入stderr而不是stdout很有意思,并且有点违反直觉。)

传递命令清单

可以很容易地从手动提供命令字符串(如问题所提示)转变为以编程方式构建的字符串。不要以编程方式构建字符串。这是一个潜在的安全问题。最好假设您不信任输入。

>>> import textwrap
>>> args = ['python', textwrap.__file__]
>>> cp = subprocess.run(args, stdout=subprocess.PIPE)
>>> cp.stdout
b'Hello there.\r\n  This is indented.\r\n'

注意,仅args应按位置传递。

完整签名

这是源中的实际签名,如下所示help(run)

def run(*popenargs, input=None, timeout=None, check=False, **kwargs):

popenargskwargs被给予Popen的构造。input可以是universal_newlines=True将通过管道传输到子进程的stdin 的字节字符串(或unicode,如果指定encoding或)。

该文档描述了timeout=check=True比我更可以:

超时参数传递给Popen.communicate()。如果超时到期,子进程将被终止并等待。子进程终止后,将重新引发TimeoutExpired异常。

如果check为true,并且进程以非零退出代码退出,则将引发CalledProcessError异常。该异常的属性包含参数,退出代码以及stdout和stderr(如果已捕获)。

这个示例check=True比我能想到的更好:

>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

扩展签名

这是文档中提供的扩展签名:

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, 
shell=False, cwd=None, timeout=None, check=False, encoding=None, 
errors=None)

请注意,这表明只应在位置传递args列表。因此,将其余参数作为关键字参数传递。

开普

何时使用Popen代替?我将很难仅根据参数来找到用例。Popen但是,直接使用pollwill 将使您能够访问其方法,包括,“ send_signal”,“ terminate”和“ wait”。

这是来源中Popen给出的签名。我认为这是信息的最精确封装(与相对):help(Popen)

def __init__(self, args, bufsize=-1, executable=None,
             stdin=None, stdout=None, stderr=None,
             preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
             shell=False, cwd=None, env=None, universal_newlines=False,
             startupinfo=None, creationflags=0,
             restore_signals=True, start_new_session=False,
             pass_fds=(), *, encoding=None, errors=None):

但更多的是信息Popen文档

subprocess.Popen(args, bufsize=-1, executable=None, stdin=None,
                 stdout=None, stderr=None, preexec_fn=None, close_fds=True,
                 shell=False, cwd=None, env=None, universal_newlines=False,
                 startupinfo=None, creationflags=0, restore_signals=True,
                 start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

在新进程中执行子程序。在POSIX上,该类使用类似于os.execvp()的行为来执行子程序。在Windows上,该类使用Windows CreateProcess()函数。Popen的参数如下。

剩下的内容Popen将作为读者的练习。

Calling an external command in Python

Simple, use subprocess.run, which returns a CompletedProcess object:

>>> import subprocess
>>> completed_process = subprocess.run('python --version')
Python 3.6.1 :: Anaconda 4.4.0 (64-bit)
>>> completed_process
CompletedProcess(args='python --version', returncode=0)

Why?

As of Python 3.5, the documentation recommends subprocess.run:

The recommended approach to invoking subprocesses is to use the run() function for all use cases it can handle. For more advanced use cases, the underlying Popen interface can be used directly.

Here’s an example of the simplest possible usage – and it does exactly as asked:

>>> import subprocess
>>> completed_process = subprocess.run('python --version')
Python 3.6.1 :: Anaconda 4.4.0 (64-bit)
>>> completed_process
CompletedProcess(args='python --version', returncode=0)

run waits for the command to successfully finish, then returns a CompletedProcess object. It may instead raise TimeoutExpired (if you give it a timeout= argument) or CalledProcessError (if it fails and you pass check=True).

As you might infer from the above example, stdout and stderr both get piped to your own stdout and stderr by default.

We can inspect the returned object and see the command that was given and the returncode:

>>> completed_process.args
'python --version'
>>> completed_process.returncode
0

Capturing output

If you want to capture the output, you can pass subprocess.PIPE to the appropriate stderr or stdout:

>>> cp = subprocess.run('python --version', 
                        stderr=subprocess.PIPE, 
                        stdout=subprocess.PIPE)
>>> cp.stderr
b'Python 3.6.1 :: Anaconda 4.4.0 (64-bit)\r\n'
>>> cp.stdout
b''

(I find it interesting and slightly counterintuitive that the version info gets put to stderr instead of stdout.)

Pass a command list

One might easily move from manually providing a command string (like the question suggests) to providing a string built programmatically. Don’t build strings programmatically. This is a potential security issue. It’s better to assume you don’t trust the input.

>>> import textwrap
>>> args = ['python', textwrap.__file__]
>>> cp = subprocess.run(args, stdout=subprocess.PIPE)
>>> cp.stdout
b'Hello there.\r\n  This is indented.\r\n'

Note, only args should be passed positionally.

Full Signature

Here’s the actual signature in the source and as shown by help(run):

def run(*popenargs, input=None, timeout=None, check=False, **kwargs):

The popenargs and kwargs are given to the Popen constructor. input can be a string of bytes (or unicode, if specify encoding or universal_newlines=True) that will be piped to the subprocess’s stdin.

The documentation describes timeout= and check=True better than I could:

The timeout argument is passed to Popen.communicate(). If the timeout expires, the child process will be killed and waited for. The TimeoutExpired exception will be re-raised after the child process has terminated.

If check is true, and the process exits with a non-zero exit code, a CalledProcessError exception will be raised. Attributes of that exception hold the arguments, the exit code, and stdout and stderr if they were captured.

and this example for check=True is better than one I could come up with:

>>> subprocess.run("exit 1", shell=True, check=True)
Traceback (most recent call last):
  ...
subprocess.CalledProcessError: Command 'exit 1' returned non-zero exit status 1

Expanded Signature

Here’s an expanded signature, as given in the documentation:

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, 
shell=False, cwd=None, timeout=None, check=False, encoding=None, 
errors=None)

Note that this indicates that only the args list should be passed positionally. So pass the remaining arguments as keyword arguments.

Popen

When use Popen instead? I would struggle to find use-case based on the arguments alone. Direct usage of Popen would, however, give you access to its methods, including poll, ‘send_signal’, ‘terminate’, and ‘wait’.

Here’s the Popen signature as given in the source. I think this is the most precise encapsulation of the information (as opposed to help(Popen)):

def __init__(self, args, bufsize=-1, executable=None,
             stdin=None, stdout=None, stderr=None,
             preexec_fn=None, close_fds=_PLATFORM_DEFAULT_CLOSE_FDS,
             shell=False, cwd=None, env=None, universal_newlines=False,
             startupinfo=None, creationflags=0,
             restore_signals=True, start_new_session=False,
             pass_fds=(), *, encoding=None, errors=None):

But more informative is the Popen documentation:

subprocess.Popen(args, bufsize=-1, executable=None, stdin=None,
                 stdout=None, stderr=None, preexec_fn=None, close_fds=True,
                 shell=False, cwd=None, env=None, universal_newlines=False,
                 startupinfo=None, creationflags=0, restore_signals=True,
                 start_new_session=False, pass_fds=(), *, encoding=None, errors=None)

Execute a child program in a new process. On POSIX, the class uses os.execvp()-like behavior to execute the child program. On Windows, the class uses the Windows CreateProcess() function. The arguments to Popen are as follows.

Understanding the remaining documentation on Popen will be left as an exercise for the reader.


回答 16

os.system可以,但是有点过时。这也不是很安全。相反,请尝试subprocesssubprocess不会直接调用sh,因此比os.system

在此处获取更多信息。

os.system is OK, but kind of dated. It’s also not very secure. Instead, try subprocess. subprocess does not call sh directly and is therefore more secure than os.system.

Get more information here.


回答 17

还有

>>> from plumbum import local
>>> ls = local["ls"]
>>> ls
LocalCommand(<LocalPath /bin/ls>)
>>> ls()
u'build.py\ndist\ndocs\nLICENSE\nplumbum\nREADME.rst\nsetup.py\ntests\ntodo.txt\n'
>>> notepad = local["c:\\windows\\notepad.exe"]
>>> notepad()                                   # Notepad window pops up
u''                                             # Notepad window is closed by user, command returns

There is also Plumbum

>>> from plumbum import local
>>> ls = local["ls"]
>>> ls
LocalCommand(<LocalPath /bin/ls>)
>>> ls()
u'build.py\ndist\ndocs\nLICENSE\nplumbum\nREADME.rst\nsetup.py\ntests\ntodo.txt\n'
>>> notepad = local["c:\\windows\\notepad.exe"]
>>> notepad()                                   # Notepad window pops up
u''                                             # Notepad window is closed by user, command returns

回答 18

采用:

import os

cmd = 'ls -al'

os.system(cmd)

os-此模块提供了使用依赖于操作系统的功能的可移植方式。

对于更多的os功能,这里是文档。

Use:

import os

cmd = 'ls -al'

os.system(cmd)

os – This module provides a portable way of using operating system-dependent functionality.

For the more os functions, here is the documentation.


回答 19

可能很简单:

import os
cmd = "your command"
os.system(cmd)

It can be this simple:

import os
cmd = "your command"
os.system(cmd)

回答 20

我非常喜欢shell_command的简单性。它建立在子流程模块的顶部。

这是文档中的示例:

>>> from shell_command import shell_call
>>> shell_call("ls *.py")
setup.py  shell_command.py  test_shell_command.py
0
>>> shell_call("ls -l *.py")
-rw-r--r-- 1 ncoghlan ncoghlan  391 2011-12-11 12:07 setup.py
-rw-r--r-- 1 ncoghlan ncoghlan 7855 2011-12-11 16:16 shell_command.py
-rwxr-xr-x 1 ncoghlan ncoghlan 8463 2011-12-11 16:17 test_shell_command.py
0

I quite like shell_command for its simplicity. It’s built on top of the subprocess module.

Here’s an example from the documentation:

>>> from shell_command import shell_call
>>> shell_call("ls *.py")
setup.py  shell_command.py  test_shell_command.py
0
>>> shell_call("ls -l *.py")
-rw-r--r-- 1 ncoghlan ncoghlan  391 2011-12-11 12:07 setup.py
-rw-r--r-- 1 ncoghlan ncoghlan 7855 2011-12-11 16:16 shell_command.py
-rwxr-xr-x 1 ncoghlan ncoghlan 8463 2011-12-11 16:17 test_shell_command.py
0

回答 21

这里还有另一个以前没有提到的区别。

subprocess.Popen将<command>作为子进程执行。就我而言,我需要执行文件<a>,该文件需要与另一个程序<b>通信。

我尝试了子流程,执行成功。但是<b>无法与<a>通信。当我从终端运行时,一切都正常。

还有一个:(注意:kwrite的行为与其他应用程序不同。如果在Firefox上尝试以下操作,结果将有所不同。)

如果尝试这样做os.system("kwrite"),程序流将冻结,直到用户关闭kwrite。为了克服这个问题,我改为尝试os.system(konsole -e kwrite)。这个时间程序继续进行,但是kwrite成为了控制台的子进程。

任何人都运行kwrite而不是子进程(即,它必须在系统监视器中显示在树的最左侧)。

There is another difference here which is not mentioned previously.

subprocess.Popen executes the <command> as a subprocess. In my case, I need to execute file <a> which needs to communicate with another program, <b>.

I tried subprocess, and execution was successful. However <b> could not communicate with <a>. Everything is normal when I run both from the terminal.

One more: (NOTE: kwrite behaves different from other applications. If you try the below with Firefox, the results will not be the same.)

If you try os.system("kwrite"), program flow freezes until the user closes kwrite. To overcome that I tried instead os.system(konsole -e kwrite). This time program continued to flow, but kwrite became the subprocess of the console.

Anyone runs the kwrite not being a subprocess (i.e. in the system monitor it must appear at the leftmost edge of the tree).


回答 22

os.system不允许您存储结果,因此,如果要将结果存储在某个列表或其他内容中,则subprocess.call可以使用。

os.system does not allow you to store results, so if you want to store results in some list or something, a subprocess.call works.


回答 23

subprocess.check_call如果您不想测试返回值,则非常方便。任何错误都会引发异常。

subprocess.check_call is convenient if you don’t want to test return values. It throws an exception on any error.


回答 24

我倾向于将进程shlex一起使用(以处理带引号的字符串的转义):

>>> import subprocess, shlex
>>> command = 'ls -l "/your/path/with spaces/"'
>>> call_params = shlex.split(command)
>>> print call_params
["ls", "-l", "/your/path/with spaces/"]
>>> subprocess.call(call_params)

I tend to use subprocess together with shlex (to handle escaping of quoted strings):

>>> import subprocess, shlex
>>> command = 'ls -l "/your/path/with spaces/"'
>>> call_params = shlex.split(command)
>>> print call_params
["ls", "-l", "/your/path/with spaces/"]
>>> subprocess.call(call_params)

回答 25

无耻的插件,我为此写了一个库:P https://github.com/houqp/shell.py

目前,它基本上是popen和shlex的包装。它还支持管道命令,因此您可以在Python中更轻松地链接命令。因此,您可以执行以下操作:

ex('echo hello shell.py') | "awk '{print $2}'"

Shameless plug, I wrote a library for this :P https://github.com/houqp/shell.py

It’s basically a wrapper for popen and shlex for now. It also supports piping commands so you can chain commands easier in Python. So you can do things like:

ex('echo hello shell.py') | "awk '{print $2}'"

回答 26

在Windows中你可以导入subprocess模块,并通过调用运行外部命令subprocess.Popen()subprocess.Popen().communicate()subprocess.Popen().wait()如下:

# Python script to run a command line
import subprocess

def execute(cmd):
    """
        Purpose  : To execute a command and return exit status
        Argument : cmd - command to execute
        Return   : exit_code
    """
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (result, error) = process.communicate()

    rc = process.wait()

    if rc != 0:
        print "Error: failed to execute command:", cmd
        print error
    return result
# def

command = "tasklist | grep python"
print "This process detail: \n", execute(command)

输出:

This process detail:
python.exe                     604 RDP-Tcp#0                  4      5,660 K

In Windows you can just import the subprocess module and run external commands by calling subprocess.Popen(), subprocess.Popen().communicate() and subprocess.Popen().wait() as below:

# Python script to run a command line
import subprocess

def execute(cmd):
    """
        Purpose  : To execute a command and return exit status
        Argument : cmd - command to execute
        Return   : exit_code
    """
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (result, error) = process.communicate()

    rc = process.wait()

    if rc != 0:
        print "Error: failed to execute command:", cmd
        print error
    return result
# def

command = "tasklist | grep python"
print "This process detail: \n", execute(command)

Output:

This process detail:
python.exe                     604 RDP-Tcp#0                  4      5,660 K

回答 27

在Linux下,如果您想调用将独立执行的外部命令(在python脚本终止后将继续运行),则可以使用简单的队列作为任务假脱机程序at命令

任务假脱机程序的示例:

import os
os.system('ts <your-command>')

有关任务后台处理程序(ts)的注意事项:

  1. 您可以使用以下命令设置要运行的并发进程数(“插槽”):

    ts -S <number-of-slots>

  2. 安装ts不需要管理员权限。您可以使用简单的源代码下载并编译它make,然后将其添加到路径中,即可完成操作。

Under Linux, in case you would like to call an external command that will execute independently (will keep running after the python script terminates), you can use a simple queue as task spooler or the at command

An example with task spooler:

import os
os.system('ts <your-command>')

Notes about task spooler (ts):

  1. You could set the number of concurrent processes to be run (“slots”) with:

    ts -S <number-of-slots>

  2. Installing ts doesn’t requires admin privileges. You can download and compile it from source with a simple make, add it to your path and you’re done.


回答 28

您可以使用Popen,然后可以检查过程的状态:

from subprocess import Popen

proc = Popen(['ls', '-l'])
if proc.poll() is None:
    proc.kill()

签出subprocess.Popen

You can use Popen, and then you can check the procedure’s status:

from subprocess import Popen

proc = Popen(['ls', '-l'])
if proc.poll() is None:
    proc.kill()

Check out subprocess.Popen.


回答 29

要从OpenStack Neutron获取网络ID :

#!/usr/bin/python
import os
netid = "nova net-list | awk '/ External / { print $2 }'"
temp = os.popen(netid).read()  /* Here temp also contains new line (\n) */
networkId = temp.rstrip()
print(networkId)

nova net-list的输出

+--------------------------------------+------------+------+
| ID                                   | Label      | CIDR |
+--------------------------------------+------------+------+
| 431c9014-5b5d-4b51-a357-66020ffbb123 | test1      | None |
| 27a74fcd-37c0-4789-9414-9531b7e3f126 | External   | None |
| 5a2712e9-70dc-4b0e-9281-17e02f4684c9 | management | None |
| 7aa697f5-0e60-4c15-b4cc-9cb659698512 | Internal   | None |
+--------------------------------------+------------+------+

打印输出(networkId)

27a74fcd-37c0-4789-9414-9531b7e3f126

To fetch the network id from the OpenStack Neutron:

#!/usr/bin/python
import os
netid = "nova net-list | awk '/ External / { print $2 }'"
temp = os.popen(netid).read()  /* Here temp also contains new line (\n) */
networkId = temp.rstrip()
print(networkId)

Output of nova net-list

+--------------------------------------+------------+------+
| ID                                   | Label      | CIDR |
+--------------------------------------+------------+------+
| 431c9014-5b5d-4b51-a357-66020ffbb123 | test1      | None |
| 27a74fcd-37c0-4789-9414-9531b7e3f126 | External   | None |
| 5a2712e9-70dc-4b0e-9281-17e02f4684c9 | management | None |
| 7aa697f5-0e60-4c15-b4cc-9cb659698512 | Internal   | None |
+--------------------------------------+------------+------+

Output of print(networkId)

27a74fcd-37c0-4789-9414-9531b7e3f126