问题:线程池类似于多处理池?
是否有用于工作线程的Pool类,类似于多处理模块的Pool类?
我喜欢例如并行化地图功能的简单方法
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
但是,我希望这样做而不会产生新流程的开销。
我知道GIL。但是,在我的用例中,该函数将是IO绑定的C函数,python包装程序将在实际函数调用之前为其释放GIL。
我必须编写自己的线程池吗?
回答 0
我刚刚发现模块中实际上 有一个基于线程的Pool接口multiprocessing
,但是它有些隐藏并且没有正确记录。
可以通过导入
from multiprocessing.pool import ThreadPool
它是使用包装Python线程的虚拟Process类实现的。可以找到基于线程的Process类multiprocessing.dummy
,在docs中对其进行了简要介绍。该虚拟模块应该提供基于线程的整个多处理接口。
回答 1
在Python 3中,您可以使用concurrent.futures.ThreadPoolExecutor
,即:
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
有关更多信息和示例,请参阅文档。
回答 2
是的,它似乎(或多或少)具有相同的API。
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
回答 3
对于非常简单和轻巧的东西(从此处稍作修改):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
要支持完成任务的回调,您只需将回调添加到任务元组即可。
回答 4
嗨,在Python中使用线程池可以使用以下库:
from multiprocessing.dummy import Pool as ThreadPool
然后使用,这个库就是这样的:
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
线程是所需的线程数,任务是大多数映射到服务的任务列表。
回答 5
这是我最终使用的结果。它是上述dgorissen类的修改版本。
文件: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
使用游泳池
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print('sleeping for (%d)sec' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
回答 6
创建新流程的开销非常小,尤其是其中只有4个时。我怀疑这是您应用程序的性能热点。保持简单,优化您必须去的地方以及分析结果指向的地方。
回答 7
没有基于线程的内置池。但是,用Queue
该类实现生产者/消费者队列可能很快。
来自:https : //docs.python.org/2/library/queue.html
from threading import Thread
from Queue import Queue
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done