问题:线程和多处理模块之间有什么区别?
我正在学习如何在Python中使用threading
和multiprocessing
模块并行运行某些操作并加快代码速度。
我发现很难理解一个threading.Thread()
对象与一个对象之间的区别(也许是因为我没有任何理论背景)multiprocessing.Process()
。
另外,对我来说,如何实例化一个作业队列并使其只有4个(例如)并行运行,而另一个则等待资源释放后再执行,对我来说也不是很清楚。
我发现文档中的示例很清楚,但并不十分详尽。一旦尝试使事情复杂化,我就会收到很多奇怪的错误(例如无法腌制的方法,等等)。
那么,什么时候应该使用threading
and multiprocessing
模块?
您能否将我链接到一些资源,以解释这两个模块的概念以及如何在复杂的任务中正确使用它们?
回答 0
什么朱利奥·佛朗哥说,对于多线程多处理与真一般。
但是,Python *还有一个问题:有一个全局解释器锁,可以防止同一进程中的两个线程同时运行Python代码。这意味着,如果您有8个核心,并且将代码更改为使用8个线程,则它将无法使用800%的CPU并无法以8倍的速度运行;它会使用相同的100%CPU,并以相同的速度运行。(实际上,它的运行速度会稍慢一些,因为即使您没有任何共享数据,线程处理也会带来额外的开销,但是现在暂时忽略它。)
也有exceptions。如果您的代码繁重的计算实际上不是在Python中发生的,而是在某些具有自定义C代码的库中执行的,这些代码可以正确地进行GIL处理,例如numpy应用程序,则您将从线程中获得预期的性能收益。如果繁重的计算是由运行并等待的某些子进程完成的,则情况也是如此。
更重要的是,在某些情况下,这无关紧要。例如,网络服务器花费大部分时间来读取网络中的数据包,而GUI应用花费大部分时间来等待用户事件。在网络服务器或GUI应用程序中使用线程的原因之一是允许您执行长时间运行的“后台任务”,而不会阻止主线程继续为网络数据包或GUI事件提供服务。这在Python线程中工作得很好。(从技术上讲,这意味着Python线程为您提供了并发性,即使它们没有为您提供核心并行性。)
但是,如果您使用纯Python编写受CPU约束的程序,则使用更多线程通常无济于事。
对于GIL,使用单独的进程没有这种问题,因为每个进程都有自己的单独的GIL。当然,线程和进程之间仍然具有与其他任何语言相同的权衡取舍–在进程之间共享数据比在线程之间共享更加困难,而且成本更高,运行大量进程或创建和销毁这些开销可能会很高等等。但是GIL在处理方面的平衡上权衡沉重,这对于C或Java而言并非如此。因此,您会发现自己在Python中比在C或Java中使用多处理的频率更高。
同时,Python的“含电池”理念带来了一些好消息:编写代码很容易,只需进行一次更改即可在线程和进程之间来回切换。
如果您根据独立的“作业”来设计代码,除了输入和输出,这些作业不与其他作业(或主程序)共享任何内容,则可以使用该concurrent.futures
库在线程池周围编写代码,如下所示:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
executor.submit(job, argument)
executor.map(some_function, collection_of_independent_things)
# ...
您甚至可以获取这些作业的结果,并将其传递给其他作业,按执行顺序或完成顺序等待;等等。阅读有关Future
对象的部分以获取详细信息。
现在,如果事实证明您的程序一直在使用100%CPU,并且添加更多线程只会使其速度变慢,那么您就遇到了GIL问题,因此您需要切换到进程。您要做的就是更改第一行:
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
唯一真正的警告是,作业的自变量和返回值必须可腌制(而不需要花费太多时间或内存来腌制)才能使用跨进程。通常这不是问题,但有时是问题。
但是,如果您的工作不能自给自足怎么办?如果您可以根据将消息从一个传递到另一个的工作来设计代码,那仍然很容易。您可能必须使用threading.Thread
或multiprocessing.Process
代替依赖池。并且您将必须显式创建queue.Queue
或multiprocessing.Queue
对象。(还有很多其他选择,例如管道,套接字,带有斑点的文件等等。但是,要点是,如果执行器的自动魔力不足,则必须手动执行某些操作。)
但是,如果您甚至不能依靠消息传递怎么办?如果您需要两项工作来同时改变同一个结构并看到彼此的变化,该怎么办?在这种情况下,您将需要进行手动同步(锁定,信号量,条件等),并且,如果要使用进程,则需要显式的共享内存对象进行引导。这是当多线程(或多处理)变得困难时。如果可以避免,那就太好了;如果不能,那么您将需要阅读的内容超过某人可以提供的答案。
通过评论,您想了解Python中的线程和进程之间的区别。的确,如果您阅读了朱利奥·佛朗哥(Giulio Franco)的答案和我的知识以及我们所有的链接,那应该涵盖了所有内容……但是总结肯定会很有用,所以这里是:
- 线程默认共享数据;流程没有。
- 作为(1)的结果,在进程之间发送数据通常需要对其进行酸洗和酸洗。**
- (1)的另一个结果是,在进程之间直接共享数据通常需要将其放入低级格式,如Value,Array和
ctypes
Types。 - 流程不受GIL约束。
- 在某些平台(主要是Windows)上,创建和销毁进程的成本要高得多。
- 对流程有一些额外的限制,其中某些限制在不同平台上有所不同。有关详细信息,请参见编程指南。
- 该
threading
模块不具有该模块的某些功能multiprocessing
。(您可以使用multiprocessing.dummy
大多数缺少的API放在线程之上,也可以使用更高级别的模块,例如concurrent.futures
,不必担心。)
*出现此问题的实际上不是Python语言,而是该语言的“标准”实现CPython。其他一些实现没有JIL,例如Jython。
**如果您正在使用fork start方法进行多处理(在大多数非Windows平台上可以使用),则每个子进程都将获得启动子级时父级拥有的任何资源,这可能是将数据传递给子级的另一种方式。
回答 1
一个进程中可以存在多个线程。属于同一进程的线程共享同一内存区域(可以读取和写入相同的变量,并且可以互相干扰)。相反,不同的进程驻留在不同的内存区域中,并且每个进程都有自己的变量。为了进行通信,进程必须使用其他通道(文件,管道或套接字)。
如果要并行化计算,则可能需要多线程处理,因为您可能希望线程在同一内存上进行协作。
说到性能,线程的创建和管理速度比进程要快(因为操作系统不需要分配整个新的虚拟内存区域),并且线程间通信通常比进程间通信快。但是线程很难编程。线程可以互相干扰,并且可以互相写入内存,但是这种情况并不总是很明显(由于多种因素,主要是指令重新排序和内存缓存),因此您将需要同步原语来控制访问您的变量。
回答 2
我相信此链接可以优雅地回答您的问题。
简而言之,如果您的一个子问题不得不等待另一个子问题完成,那么多线程就可以了(例如,在I / O繁重的操作中);相反,如果您的子问题确实可能同时发生,则建议进行多处理。但是,创建的进程数不会超过核心数。
回答 3
Python文档引号
我在以下位置突出了有关Process vs Threads和GIL的重要Python文档报价:CPython中的全局解释器锁(GIL)是什么?
进程与线程实验
我做了一些基准测试,以便更具体地显示差异。
在基准测试中,我为8个超线程上的各种线程的CPU和IO绑定时间计时 CPU。每个线程提供的功总是相同的,因此,更多线程意味着提供更多的总功。
结果是:
绘制数据。
结论:
对于CPU限制的工作,多处理总是更快,大概是由于GIL
用于IO绑定工作。两者的速度完全一样
由于我在8个超线程计算机上,因此线程最多只能扩展到大约4倍,而不是预期的8倍。
与C POSIX绑定的CPU工作达到预期的8倍加速相比,它是什么:time(1)输出中的“ real”,“ user”和“ sys”是什么意思?
TODO:我不知道是什么原因,一定还有其他Python低效率正在发挥作用。
测试代码:
#!/usr/bin/env python3
import multiprocessing
import threading
import time
import sys
def cpu_func(result, niters):
'''
A useless CPU bound function.
'''
for i in range(niters):
result = (result * result * i + 2 * result * i * i + 3) % 10000000
return result
class CpuThread(threading.Thread):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class CpuProcess(multiprocessing.Process):
def __init__(self, niters):
super().__init__()
self.niters = niters
self.result = 1
def run(self):
self.result = cpu_func(self.result, self.niters)
class IoThread(threading.Thread):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
class IoProcess(multiprocessing.Process):
def __init__(self, sleep):
super().__init__()
self.sleep = sleep
self.result = self.sleep
def run(self):
time.sleep(self.sleep)
if __name__ == '__main__':
cpu_n_iters = int(sys.argv[1])
sleep = 1
cpu_count = multiprocessing.cpu_count()
input_params = [
(CpuThread, cpu_n_iters),
(CpuProcess, cpu_n_iters),
(IoThread, sleep),
(IoProcess, sleep),
]
header = ['nthreads']
for thread_class, _ in input_params:
header.append(thread_class.__name__)
print(' '.join(header))
for nthreads in range(1, 2 * cpu_count):
results = [nthreads]
for thread_class, work_size in input_params:
start_time = time.time()
threads = []
for i in range(nthreads):
thread = thread_class(work_size)
threads.append(thread)
thread.start()
for i, thread in enumerate(threads):
thread.join()
results.append(time.time() - start_time)
print(' '.join('{:.6e}'.format(result) for result in results))
在带有CPU的Lenovo ThinkPad P51笔记本电脑上的Ubuntu 18.10,Python 3.6.7上进行了测试:Intel Core i7-7820HQ CPU(4核/ 8线程),RAM:2x三星M471A2K43BB1-CRC(2x 16GiB),SSD:Samsung MZVLB512HAJQ- 000L7(3,000 MB / s)。
可视化在给定时间正在运行的线程
这篇帖子https://rohanvarma.me/GIL/告诉我,只要调度线程的target=
参数threading.Thread
与相同,就可以运行回调multiprocessing.Process
。
这使我们可以精确查看每次运行哪个线程。完成此操作后,我们将看到类似的内容(我制作了此特定图形):
+--------------------------------------+
+ Active threads / processes +
+-----------+--------------------------------------+
|Thread 1 |******** ************ |
| 2 | ***** *************|
+-----------+--------------------------------------+
|Process 1 |*** ************** ****** **** |
| 2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
+ Time --> +
+--------------------------------------+
这将表明:
- 线程由GIL完全序列化
- 进程可以并行运行
回答 4
这是python 2.6.x的一些性能数据,这些数据使人们质疑在IO绑定的情况下线程比多处理的性能更高。这些结果来自40个处理器的IBM System x3650 M4 BD。
IO绑定处理:进程池的性能优于线程池
>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms
>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms
CPU限制处理:进程池的性能优于线程池
>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms
>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms
这些不是严格的测试,但是它们告诉我,与线程处理相比,多处理并非完全没有表现。
交互式python控制台中用于上述测试的代码
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob
text_for_test = str(range(1,100000))
def fileio(i):
try :
os.remove(glob('./test/test-*'))
except :
pass
f=open('./test/test-'+str(i),'a')
f.write(text_for_test)
f.close()
f=open('./test/test-'+str(i),'r')
text = f.read()
f.close()
def square(i):
return i*i
def timing(f):
def wrap(*args):
time1 = time.time()
ret = f(*args)
time2 = time.time()
print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
return ret
return wrap
result = None
@timing
def do_work(process_count, items, process_type, method) :
pool = None
if process_type == 'process' :
pool = Pool(processes=process_count)
else :
pool = ThreadPool(processes=process_count)
if method == 'square' :
multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
result = [res.get() for res in multiple_results]
else :
multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
result = [res.get() for res in multiple_results]
do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')
do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')
回答 5
好吧,朱利奥·佛朗哥(Giulio Franco)回答了大多数问题。我将进一步阐述消费者-生产者问题,我想这将使您走上使用多线程应用程序的解决方案的正确轨道。
fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()
def producer(fill_count, empty_count, buffer):
while True:
item = produceItem()
empty_count.down();
buffer.push(item)
fill_count.up()
def consumer(fill_count, empty_count, buffer):
while True:
fill_count.down()
item = buffer.pop()
empty_count.up()
consume_item(item)
您可以从以下网站阅读有关同步原语的更多信息:
http://linux.die.net/man/7/sem_overview
http://docs.python.org/2/library/threading.html
伪代码在上面。我想您应该搜索生产者-消费者问题以获取更多参考。