python multithreading等到所有线程完成

问题:python multithreading等到所有线程完成

可能是在类似的情况下提出的,但是经过大约20分钟的搜索,我无法找到答案,所以我会提出。

我写了一个Python脚本(让我们说:scriptA.py)和一个脚本(让我们说scriptB.py)。

在scriptB中,我想使用不同的参数多次调用scriptA,每次运行大约需要一个小时(它是一个巨大的脚本,做了很多事情。。不用担心),并且我希望能够运行scriptA同时具有所有不同的参数,但是我需要等到所有参数都完成后才能继续;我的代码:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

我想同时运行所有程序subprocess.call(),然后等到它们全部完成后,该怎么办?

我试图像这里的例子一样使用线程:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

但是我认为这是不对的。

我怎么知道他们都已经跑完了才去我家do_finish()

This may have been asked in a similar context but I was unable to find an answer after about 20 minutes of searching, so I will ask.

I have written a Python script (lets say: scriptA.py) and a script (lets say scriptB.py)

In scriptB I want to call scriptA multiple times with different arguments, each time takes about an hour to run, (its a huge script, does lots of stuff.. don’t worry about it) and I want to be able to run the scriptA with all the different arguments simultaneously, but I need to wait till ALL of them are done before continuing; my code:

import subprocess

#setup
do_setup()

#run scriptA
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

#finish
do_finish()

I want to do run all the subprocess.call() at the same time, and then wait till they are all done, how should I do this?

I tried to use threading like the example here:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

#run scriptA   
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()

But I do not think this is right.

How do I know they have all finished running before going to my do_finish()?


回答 0

您需要在脚本末尾使用object的join方法Thread

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

因此,主线程将等到t1t2t3完成执行。

You need to use join method of Thread object in the end of the script.

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Thus the main thread will wait till t1, t2 and t3 finish execution.


回答 1

将线程放入列表中,然后使用Join方法

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()

Put the threads in a list and then use the Join method

 threads = []

 t = Thread(...)
 threads.append(t)

 ...repeat as often as necessary...

 # Start all threads
 for x in threads:
     x.start()

 # Wait for all of them to finish
 for x in threads:
     x.join()

回答 2

在Python3中,由于使用Python 3.2,因此有一种新方法可以达到相同的结果,我个人更喜欢传统的线程创建/启动/连接程序包concurrent.futureshttps : //docs.python.org/3/library/concurrent.futures .html

使用ThreadPoolExecutor代码将是:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

先前代码的输出类似于:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

优点之一是,您可以控制吞吐量,设置最大并行工作器数。

In Python3, since Python 3.2 there is a new approach to reach the same result, that I personally prefer to the traditional thread creation/start/join, package concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html

Using a ThreadPoolExecutor the code would be:

from concurrent.futures.thread import ThreadPoolExecutor
import time

def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')

args = ['argumentsA', 'argumentsB', 'argumentsC']

with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

The output of the previous code is something like:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

One of the advantages is that you can control the throughput setting the max concurrent workers.


回答 3

我更喜欢根据输入列表使用列表理解:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

I prefer using list comprehension based on an input list:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

回答 4

您可以在下面使用类似的类,从中可以添加要并行执行的n个功能或console_scripts,并开始执行并等待所有作业完成。

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()

You can have class something like below from which you can add ‘n’ number of functions or console_scripts you want to execute in parallel passion and start the execution and wait for all jobs to complete..

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()

回答 5

threading 模块文档

有一个“主线程”对象;这对应于Python程序中的初始控制线程。它不是守护程序线程。

有可能会创建“虚拟线程对象”。这些是与“外来线程”相对应的线程对象,“外来线程”是在线程模块外部启动的控制线程,例如直接从C代码启动的线程。虚拟线程对象的功能有限。它们始终被认为是活动的和守护程序的,不能被join()编辑。它们永远不会被删除,因为不可能检测到外来线程的终止。

因此,要在不希望保留所创建线程的列表的情况下捕获这两种情况:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

于是:

>>> print(data)
[0, 4, 12, 40]

From the threading module documentation

There is a “main thread” object; this corresponds to the initial thread of control in the Python program. It is not a daemon thread.

There is the possibility that “dummy thread objects” are created. These are thread objects corresponding to “alien threads”, which are threads of control started outside the threading module, such as directly from C code. Dummy thread objects have limited functionality; they are always considered alive and daemonic, and cannot be join()ed. They are never deleted, since it is impossible to detect the termination of alien threads.

So, to catch those two cases when you are not interested in keeping a list of the threads you create:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

Whereupon:

>>> print(data)
[0, 4, 12, 40]

回答 6

也许像

for t in threading.enumerate():
    if t.daemon:
        t.join()

Maybe, something like

for t in threading.enumerate():
    if t.daemon:
        t.join()

回答 7

我遇到了同样的问题,我需要等待使用for循环创建的所有线程。我只是尝试了以下代码段,它可能不是完美的解决方案,但我认为这将是一个简单的解决方案去测试:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise

I just came across the same problem where I needed to wait for all the threads which were created using the for loop.I just tried out the following piece of code.It may not be the perfect solution but I thought it would be a simple solution to test:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise