标签归档:concurrency

GreenletVS。线程数

问题:GreenletVS。线程数

我是gevents和greenlets的新手。我找到了一些有关如何使用它们的很好的文档,但是没有一个文档为我提供有关如何以及何时使用greenlets的理由!

  • 他们真正擅长的是什么?
  • 是否在代理服务器中使用它们是一个好主意吗?
  • 为什么不线程?

我不确定的是,如果它们基本上是例程,它们如何为我们提供并发性。

I am new to gevents and greenlets. I found some good documentation on how to work with them, but none gave me justification on how and when I should use greenlets!

  • What are they really good at?
  • Is it a good idea to use them in a proxy server or not?
  • Why not threads?

What I am not sure about is how they can provide us with concurrency if they’re basically co-routines.


回答 0

Greenlets提供并发性,但提供并行性。并发是指代码可以独立于其他代码运行的时间。并行是同时执行并发代码。当在用户空间中有很多工作要做时,并行性特别有用,而这通常是占用大量CPU的工作。并发对于解决问题非常有用,它可以更轻松地并行调度和管理不同的部分。

Greenlets确实在网络编程中大放异彩,其中与一个套接字的交互可以独立于与其他套接字的交互而发生。这是并发的经典示例。由于每个greenlet都在其自己的上下文中运行,因此您可以继续使用同步API,而无需使用线程。这很好,因为就虚拟内存和内核开销而言,线程非常昂贵,因此线程可以实现的并发性要少得多。此外,由于使用GIL,Python中的线程比平时更昂贵且更受限制。并发的替代方法通常是Twisted,libevent,libuv,node.js等项目,其中所有代码共享相同的执行上下文,并注册事件处理程序。

使用greenlet(具有适当的网络支持,例如通过gevent)来编写代理是一个好主意,因为对请求的处理可以独立执行,因此应这样编写。

出于我之前提到的原因,Greenlets提供了并发性。并发不是并行性。通过隐藏事件注册并为通常会阻塞当前线程的调用执行调度,gevent之类的项目无需更改异步API即可公开此并发性,而系统的成本却大大降低。

Greenlets provide concurrency but not parallelism. Concurrency is when code can run independently of other code. Parallelism is the execution of concurrent code simultaneously. Parallelism is particularly useful when there’s a lot of work to be done in userspace, and that’s typically CPU-heavy stuff. Concurrency is useful for breaking apart problems, enabling different parts to be scheduled and managed more easily in parallel.

Greenlets really shine in network programming where interactions with one socket can occur independently of interactions with other sockets. This is a classic example of concurrency. Because each greenlet runs in its own context, you can continue to use synchronous APIs without threading. This is good because threads are very expensive in terms of virtual memory and kernel overhead, so the concurrency you can achieve with threads is significantly less. Additionally, threading in Python is more expensive and more limited than usual due to the GIL. Alternatives to concurrency are usually projects like Twisted, libevent, libuv, node.js etc, where all your code shares the same execution context, and register event handlers.

It’s an excellent idea to use greenlets (with appropriate networking support such as through gevent) for writing a proxy, as your handling of requests are able to execute independently and should be written as such.

Greenlets provide concurrency for the reasons I gave earlier. Concurrency is not parallelism. By concealing event registration and performing scheduling for you on calls that would normally block the current thread, projects like gevent expose this concurrency without requiring change to an asynchronous API, and at significantly less cost to your system.


回答 1

拿@Max的答案并为其添加一些相关性以进行缩放,您可以看到区别。我是通过更改要填充的URL来实现的,如下所示:

URLS_base = ['www.google.com', 'www.example.com', 'www.python.org', 'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']
URLS = []
for _ in range(10000):
    for url in URLS_base:
        URLS.append(url)

在我有500个版本之前,我不得不放弃多进程版本。但经过10,000次迭代:

Using gevent it took: 3.756914
-----------
Using multi-threading it took: 15.797028

因此,您可以看到使用gevent的I / O有一些明显的不同

Taking @Max’s answer and adding some relevance to it for scaling, you can see the difference. I achieved this by changing the URLs to be filled as follows:

URLS_base = ['www.google.com', 'www.example.com', 'www.python.org', 'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']
URLS = []
for _ in range(10000):
    for url in URLS_base:
        URLS.append(url)

I had to drop out the multiprocess version as it fell before I had 500; but at 10,000 iterations:

Using gevent it took: 3.756914
-----------
Using multi-threading it took: 15.797028

So you can see there is some significant difference in I/O using gevent


回答 2

纠正上面的@TemporalBeing的答案,greenlets的速度并不比线程“快”,并且产生60000个线程来解决并发问题是不正确的编程技术,相反,较小的线程池是合适的。这是一个更合理的比较(根据我在reddit帖子中对有人引用此SO帖子的回应)。

import gevent
from gevent import socket as gsock
import socket as sock
import threading
from datetime import datetime


def timeit(fn, URLS):
    t1 = datetime.now()
    fn()
    t2 = datetime.now()
    print(
        "%s / %d hostnames, %s seconds" % (
            fn.__name__,
            len(URLS),
            (t2 - t1).total_seconds()
        )
    )


def run_gevent_without_a_timeout():
    ip_numbers = []

    def greenlet(domain_name):
        ip_numbers.append(gsock.gethostbyname(domain_name))

    jobs = [gevent.spawn(greenlet, domain_name) for domain_name in URLS]
    gevent.joinall(jobs)
    assert len(ip_numbers) == len(URLS)


def run_threads_correctly():
    ip_numbers = []

    def process():
        while queue:
            try:
                domain_name = queue.pop()
            except IndexError:
                pass
            else:
                ip_numbers.append(sock.gethostbyname(domain_name))

    threads = [threading.Thread(target=process) for i in range(50)]

    queue = list(URLS)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    assert len(ip_numbers) == len(URLS)

URLS_base = ['www.google.com', 'www.example.com', 'www.python.org',
             'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']

for NUM in (5, 50, 500, 5000, 10000):
    URLS = []

    for _ in range(NUM):
        for url in URLS_base:
            URLS.append(url)

    print("--------------------")
    timeit(run_gevent_without_a_timeout, URLS)
    timeit(run_threads_correctly, URLS)

结果如下:

--------------------
run_gevent_without_a_timeout / 30 hostnames, 0.044888 seconds
run_threads_correctly / 30 hostnames, 0.019389 seconds
--------------------
run_gevent_without_a_timeout / 300 hostnames, 0.186045 seconds
run_threads_correctly / 300 hostnames, 0.153808 seconds
--------------------
run_gevent_without_a_timeout / 3000 hostnames, 1.834089 seconds
run_threads_correctly / 3000 hostnames, 1.569523 seconds
--------------------
run_gevent_without_a_timeout / 30000 hostnames, 19.030259 seconds
run_threads_correctly / 30000 hostnames, 15.163603 seconds
--------------------
run_gevent_without_a_timeout / 60000 hostnames, 35.770358 seconds
run_threads_correctly / 60000 hostnames, 29.864083 seconds

每个人对使用Python进行非阻塞IO的误解都认为,Python解释器可以比网络连接本身返回IO的速度更快地完成从套接字检索结果的工作。尽管在某些情况下这确实是正确的,但事实并非如人们想象的那么频繁,因为Python解释器的确非常慢。在我的博客文章中,我说明了一些图形配置文件,这些图形配置文件显示即使对于非常简单的事情,如果您要处理对数据库或DNS服务器等事物的快速便捷的网络访问,这些服务的返回速度都将比Python代码快得多。可以参加成千上万的此类联系。

Correcting for @TemporalBeing ‘s answer above, greenlets are not “faster” than threads and it is an incorrect programming technique to spawn 60000 threads to solve a concurrency problem, a small pool of threads is instead appropriate. Here is a more reasonable comparison (from my reddit post in response to people citing this SO post).

import gevent
from gevent import socket as gsock
import socket as sock
import threading
from datetime import datetime


def timeit(fn, URLS):
    t1 = datetime.now()
    fn()
    t2 = datetime.now()
    print(
        "%s / %d hostnames, %s seconds" % (
            fn.__name__,
            len(URLS),
            (t2 - t1).total_seconds()
        )
    )


def run_gevent_without_a_timeout():
    ip_numbers = []

    def greenlet(domain_name):
        ip_numbers.append(gsock.gethostbyname(domain_name))

    jobs = [gevent.spawn(greenlet, domain_name) for domain_name in URLS]
    gevent.joinall(jobs)
    assert len(ip_numbers) == len(URLS)


def run_threads_correctly():
    ip_numbers = []

    def process():
        while queue:
            try:
                domain_name = queue.pop()
            except IndexError:
                pass
            else:
                ip_numbers.append(sock.gethostbyname(domain_name))

    threads = [threading.Thread(target=process) for i in range(50)]

    queue = list(URLS)
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    assert len(ip_numbers) == len(URLS)

URLS_base = ['www.google.com', 'www.example.com', 'www.python.org',
             'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']

for NUM in (5, 50, 500, 5000, 10000):
    URLS = []

    for _ in range(NUM):
        for url in URLS_base:
            URLS.append(url)

    print("--------------------")
    timeit(run_gevent_without_a_timeout, URLS)
    timeit(run_threads_correctly, URLS)

Here are some results:

--------------------
run_gevent_without_a_timeout / 30 hostnames, 0.044888 seconds
run_threads_correctly / 30 hostnames, 0.019389 seconds
--------------------
run_gevent_without_a_timeout / 300 hostnames, 0.186045 seconds
run_threads_correctly / 300 hostnames, 0.153808 seconds
--------------------
run_gevent_without_a_timeout / 3000 hostnames, 1.834089 seconds
run_threads_correctly / 3000 hostnames, 1.569523 seconds
--------------------
run_gevent_without_a_timeout / 30000 hostnames, 19.030259 seconds
run_threads_correctly / 30000 hostnames, 15.163603 seconds
--------------------
run_gevent_without_a_timeout / 60000 hostnames, 35.770358 seconds
run_threads_correctly / 60000 hostnames, 29.864083 seconds

the misunderstanding everyone has about non-blocking IO with Python is the belief that the Python interpreter can attend to the work of retrieving results from sockets at a large scale faster than the network connections themselves can return IO. While this is certainly true in some cases, it is not true nearly as often as people think, because the Python interpreter is really, really slow. In my blog post here, I illustrate some graphical profiles that show that for even very simple things, if you are dealing with crisp and fast network access to things like databases or DNS servers, those services can come back a lot faster than the Python code can attend to many thousands of those connections.


回答 3

这足以分析有趣。这是一个代码,用于比较greenlet与多处理池与多线程的性能:

import gevent
from gevent import socket as gsock
import socket as sock
from multiprocessing import Pool
from threading import Thread
from datetime import datetime

class IpGetter(Thread):
    def __init__(self, domain):
        Thread.__init__(self)
        self.domain = domain
    def run(self):
        self.ip = sock.gethostbyname(self.domain)

if __name__ == "__main__":
    URLS = ['www.google.com', 'www.example.com', 'www.python.org', 'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']
    t1 = datetime.now()
    jobs = [gevent.spawn(gsock.gethostbyname, url) for url in URLS]
    gevent.joinall(jobs, timeout=2)
    t2 = datetime.now()
    print "Using gevent it took: %s" % (t2-t1).total_seconds()
    print "-----------"
    t1 = datetime.now()
    pool = Pool(len(URLS))
    results = pool.map(sock.gethostbyname, URLS)
    t2 = datetime.now()
    pool.close()
    print "Using multiprocessing it took: %s" % (t2-t1).total_seconds()
    print "-----------"
    t1 = datetime.now()
    threads = []
    for url in URLS:
        t = IpGetter(url)
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t2 = datetime.now()
    print "Using multi-threading it took: %s" % (t2-t1).total_seconds()

结果如下:

Using gevent it took: 0.083758
-----------
Using multiprocessing it took: 0.023633
-----------
Using multi-threading it took: 0.008327

我认为greenlet声称它不像多线程库那样不受GIL的约束。而且,Greenlet doc说它是用于网络操作的。对于网络密集型操作,线程切换很好,您可以看到多线程方法非常快。同样,使用python的官方库总是很可取的。我尝试在Windows上安装greenlet并遇到dll依赖关系问题,因此我在linux vm上运行了该测试。始终尝试编写代码,希望它可以在任何计算机上运行。

This is interesting enough to analyze. Here is a code to compare performance of greenlets versus multiprocessing pool versus multi-threading:

import gevent
from gevent import socket as gsock
import socket as sock
from multiprocessing import Pool
from threading import Thread
from datetime import datetime

class IpGetter(Thread):
    def __init__(self, domain):
        Thread.__init__(self)
        self.domain = domain
    def run(self):
        self.ip = sock.gethostbyname(self.domain)

if __name__ == "__main__":
    URLS = ['www.google.com', 'www.example.com', 'www.python.org', 'www.yahoo.com', 'www.ubc.ca', 'www.wikipedia.org']
    t1 = datetime.now()
    jobs = [gevent.spawn(gsock.gethostbyname, url) for url in URLS]
    gevent.joinall(jobs, timeout=2)
    t2 = datetime.now()
    print "Using gevent it took: %s" % (t2-t1).total_seconds()
    print "-----------"
    t1 = datetime.now()
    pool = Pool(len(URLS))
    results = pool.map(sock.gethostbyname, URLS)
    t2 = datetime.now()
    pool.close()
    print "Using multiprocessing it took: %s" % (t2-t1).total_seconds()
    print "-----------"
    t1 = datetime.now()
    threads = []
    for url in URLS:
        t = IpGetter(url)
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    t2 = datetime.now()
    print "Using multi-threading it took: %s" % (t2-t1).total_seconds()

here are the results:

Using gevent it took: 0.083758
-----------
Using multiprocessing it took: 0.023633
-----------
Using multi-threading it took: 0.008327

I think that greenlet claims that it is not bound by GIL unlike the multithreading library. Moreover, Greenlet doc says that it is meant for network operations. For a network intensive operation, thread-switching is fine and you can see that the multithreading approach is pretty fast. Also it’s always prefeerable to use python’s official libraries; I tried installing greenlet on windows and encountered a dll dependency problem so I ran this test on a linux vm. Alway try to write a code with the hope that it runs on any machine.


用Python发送100,000个HTTP请求的最快方法是什么?

问题:用Python发送100,000个HTTP请求的最快方法是什么?

我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python 并发库,但无法弄清楚如何正确编写此程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。

I am opening a file which has 100,000 URL’s. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible – I suppose that means ‘concurrently’.


回答 0

无捻解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这比扭曲的解决方案要快一点,并且使用的CPU更少。

Twistedless solution:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

This one is slighty faster than the twisted solution and uses less CPU.


回答 1

使用龙卷风异步联网库的解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

A solution using tornado asynchronous networking library

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

回答 2

自2010年发布以来,情况发生了很大变化,我还没有尝试所有其他答案,但是尝试了一些答案,我发现使用python3.6对我来说最有效。

我能够每秒获取约150个在AWS上运行的唯一域。

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

Things have changed quite a bit since 2010 when this was posted and I haven’t tried all the other answers but I have tried a few, and I found this to work the best for me using python3.6.

I was able to fetch about ~150 unique domains per second running on AWS.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

回答 3

线程绝对不是这里的答案。如果总体目标是“最快的方法”,它们将提供进程和内核瓶颈,以及吞吐量限制。

一点点的twisted及其异步HTTP客户端将为您带来更好的结果。

Threads are absolutely not the answer here. They will provide both process and kernel bottlenecks, as well as throughput limits that are not acceptable if the overall goal is “the fastest way”.

A little bit of twisted and its asynchronous HTTP client would give you much better results.


回答 4

我知道这是一个古老的问题,但是在Python 3.7中,您可以使用asyncio和来实现aiohttp

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

您可以阅读有关它的更多信息,并在此处查看示例。

I know this is an old question, but in Python 3.7 you can do this using asyncio and aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

You can read more about it and see an example here.


回答 5

使用grequests,它是request + Gevent模块的组合。

GRequests允许您将Requests与Gevent一起使用,以轻松进行异步HTTP请求。

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送它们:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

Use grequests , it’s a combination of requests + Gevent module .

GRequests allows you to use Requests with Gevent to make asyncronous HTTP Requests easily.

Usage is simple:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Create a set of unsent Requests:

>>> rs = (grequests.get(u) for u in urls)

Send them all at the same time:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

回答 6

解决此问题的一种好方法是首先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。

在理想情况下,这仅意味着同时启动100,000个线程,这些线程会将其结果输出到字典或列表中以供以后处理,但是实际上,您可以通过这种方式发出多少个并行HTTP请求受到限制。在本地,您可以同时打开多少个套接字,Python解释器允许多少个执行线程受到限制。从远程来看,如果所有请求都针对一台或多台服务器,则同时连接的数量可能会受到限制。这些限制可能导致您必须以一种方式来编写脚本,以便在任何时候仅轮询一小部分网址(如另一位发帖人所述,100可能是一个不错的线程池大小,尽管您可能会发现自己可以成功部署更多)。

您可以按照以下设计模式来解决上述问题:

  1. 启动一个线程以启动新的请求线程,直到当前正在运行的线程(您可以通过threading.active_count()或通过将线程对象推入数据结构来跟踪它们)的数量大于等于您的同时请求的最大数量(例如100) ,然后短暂睡眠。没有更多URL可处理时,该线程应终止。因此,线程将不断唤醒,启动新线程并进入休眠状态,直到完成操作为止。
  2. 让请求线程将其结果存储在某种数据结构中,以供以后检索和输出。如果您要在其中存储结果的结构是C listdictCPython,则可以安全地从线程中添加或插入不带锁的唯一项,但是如果您写入文件或需要更复杂的跨线程数据交互,则应使用a互斥锁,以保护该状态免于腐败

我建议您使用线程模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是裸露的,但是对问题的描述表明它完全可以满足您的需求。

最后,如果你想看到用Python编写的并行网络应用的一个非常简单的应用程序,请ssh.py。这是一个小型库,使用Python线程并行化许多SSH连接。该设计足够接近您的要求,您可能会发现它是很好的资源。

A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

You can follow this design pattern to resolve the above issue:

  1. Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
  2. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

I would suggest you use the threading module. You can use it to launch and track running threads. Python’s threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.

Finally, if you’d like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It’s a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.


回答 7

如果希望获得最佳性能,则可能要考虑使用异步I / O而不是线程。与成千上万个OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在此之上增加了更多。线程化肯定会完成工作,但是我怀疑异步路由会提供更好的整体性能。

具体来说,我建议在Twisted库(http://www.twistedmatrix.com)中使用异步Web客户端。它具有公认的陡峭的学习曲线,但是一旦您掌握了Twisted的异步编程风格,就很容易使用。

Twisted的异步Web客户端API的HowTo可在以下位置找到:

http://twistedmatrix.com/documents/current/web/howto/client.html

If you’re looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.

Specifically, I’d suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted’s style of asynchronous programming.

A HowTo on Twisted’s asynchronous web client API is available at:

http://twistedmatrix.com/documents/current/web/howto/client.html


回答 8

一个解法:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

测试时间:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

平时:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

A solution:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

回答 9

使用线程池是一个不错的选择,这将使此操作相当容易。不幸的是,python没有使线程池变得异常简单的标准库。但是,这里有一个不错的库,它可以帮助您入门:http : //www.chrisarndt.de/projects/threadpool/

来自其站点的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这可以帮助。

Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn’t have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/

Code example from their site:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Hope this helps.


回答 10

创建epoll对象,
开放许多客户端TCP套接字,
调整自己的发送缓冲区是有点超过请求头,
发送一个请求头-它应该是即时的,只是放置到一个缓冲区,在注册插座epoll对象,
.pollepollobect,
阅读前3来自每个套接字的字节.poll
将其写入sys.stdout后跟\n(不要刷新),关闭客户端套接字。

限制同时打开的套接字数-创建套接字时处理错误。仅当另一个插座关闭时,才创建一个新的插座。
调整操作系统限制。
尝试分叉到几个(不是很多)进程:这可能有助于更有效地使用CPU。

Create epoll object,
open many client TCP sockets,
adjust their send buffers to be a bit more than request header,
send a request header — it should be immediate, just placing into a buffer, register socket in epoll object,
do .poll on epoll obect,
read first 3 bytes from each socket from .poll,
write them to sys.stdout followed by \n (don’t flush), close the client socket.

Limit number of sockets opened simultaneously — handle errors when sockets are created. Create a new socket only if another is closed.
Adjust OS limits.
Try forking into a few (not many) processes: this may help to use CPU a bit more effectively.


回答 11

对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块(例如Queue)可能会有所帮助。

我之前并行下载文件时也做了类似的事情,这对我来说已经足够了,但是这并不是您正在谈论的范围。

如果您的任务更多地受CPU限制,则可能需要查看多处理模块,该模块将允许您利用更多的CPU /核心/线程(更多的进程不会互相阻塞,因为锁定是按进程进行的)

For your case, threading will probably do the trick as you’ll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.

I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn’t on the scale you are talking about.

If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won’t block each other since the locking is per process)


回答 12

考虑使用Windmill,尽管Windmill可能无法执行那么多线程。

您可以在5台计算机上使用手动滚动的Python脚本来完成此操作,每台计算机都使用端口40000-60000连接出站,从而打开100,000个端口连接。

同样,使用线程良好的QA应用程序(例如OpenSTA) 进行示例测试可能会有所帮助,以便了解每个服务器可以处理的数量。

另外,尝试研究仅将简单的Perl与LWP :: ConnCache类一起使用。这样,您可能会获得更多的性能(更多的连接)。

Consider using Windmill , although Windmill probably cant do that many threads.

You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.

Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.

Also, try looking into just using simple Perl with the LWP::ConnCache class. You’ll probably get more performance (more connections) that way.


回答 13

这个扭曲的异步Web客户端运行得很快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

This twisted async web client goes pretty fast.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

回答 14

最简单的方法是使用Python的内置线程库。它们不是“真实的” /内核线程,它们有问题(例如序列化),但是足够好。您需要一个队列和线程池。一种选择在这里,但是编写自己的东西很简单。您无法并行处理所有100,000个呼叫,但是可以同时触发100个(或大约)呼叫。

The easiest way would be to use Python’s built-in threading library. They’re not “real” / kernel threads They have issues (like serialization), but are good enough. You’d want a queue & thread pool. One option is here, but it’s trivial to write your own. You can’t parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.


multiprocessing.Pool:何时使用apply,apply_async或map?

问题:multiprocessing.Pool:何时使用apply,apply_async或map?

我还没有看到关于Pool.applyPool.apply_asyncPool.map用例的清晰示例。我主要使用Pool.map; 别人的优势是什么?

I have not seen clear examples with use-cases for Pool.apply, Pool.apply_async and Pool.map. I am mainly using Pool.map; what are the advantages of others?


回答 0

在Python的早期,要使用任意参数调用函数,可以使用apply

apply(f,args,kwargs)

apply尽管在Python2.7中仍然存在,但在Python3中仍然存在,并且通常不再使用。如今,

f(*args,**kwargs)

是首选。这些multiprocessing.Pool模块尝试提供类似的接口。

Pool.apply就像Python一样apply,不同之处在于函数调用是在单独的进程中执行的。Pool.apply直到功能完成为止。

Pool.apply_async也类似于Python的内置函数apply,除了调用立即返回而不是等待结果而已。AsyncResult返回一个对象。您调用其get()方法以检索函数调用的结果。该get()方法将阻塞直到功能完成。因此,pool.apply(func, args, kwargs)等效于pool.apply_async(func, args, kwargs).get()

与相比Pool.apply,该Pool.apply_async方法还具有一个回调,如果提供该回调,则在函数完成时调用该回调。可以使用它来代替get()

例如:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

可能会产生如下结果

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

请注意,与不同pool.map,结果的顺序可能与pool.apply_async调用的顺序不同。


因此,如果您需要在一个单独的进程中运行一个函数,但是希望当前进程在该函数返回之前一直阻塞,请使用Pool.apply。像一样Pool.applyPool.map阻塞直到返回完整的结果。

如果希望工作进程池异步执行许多功能调用,请使用Pool.apply_async。结果的顺序不能保证与调用的顺序相同Pool.apply_async

还要注意,您可以使用调用许多不同的函数Pool.apply_async(并非所有调用都需要使用同一函数)。

相反,Pool.map将相同的函数应用于许多参数。但是,与不同Pool.apply_async,结果按与参数顺序相对应的顺序返回。

Back in the old days of Python, to call a function with arbitrary arguments, you would use apply:

apply(f,args,kwargs)

apply still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,

f(*args,**kwargs)

is preferred. The multiprocessing.Pool modules tries to provide a similar interface.

Pool.apply is like Python apply, except that the function call is performed in a separate process. Pool.apply blocks until the function is completed.

Pool.apply_async is also like Python’s built-in apply, except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call. The get() method blocks until the function is completed. Thus, pool.apply(func, args, kwargs) is equivalent to pool.apply_async(func, args, kwargs).get().

In contrast to Pool.apply, the Pool.apply_async method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get().

For example:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

may yield a result such as

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

Notice, unlike pool.map, the order of the results may not correspond to the order in which the pool.apply_async calls were made.


So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply. Like Pool.apply, Pool.map blocks until the complete result is returned.

If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

Notice also that you could call a number of different functions with Pool.apply_async (not all calls need to use the same function).

In contrast, Pool.map applies the same function to many arguments. However, unlike Pool.apply_async, the results are returned in an order corresponding to the order of the arguments.


回答 1

关于applyvs map

pool.apply(f, args)f仅在池中的一个工作线程中执行。因此,池中的一个进程将运行f(args)

pool.map(f, iterable):此方法将可迭代项分为多个块,将其作为单独的任务提交给流程池。因此,您可以利用池中的所有进程。

Regarding apply vs map:

pool.apply(f, args): f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run f(args).

pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool.


回答 2

以下是在一个表的格式,以显示之间的差异的概述Pool.applyPool.apply_asyncPool.mapPool.map_async。选择一个时,必须考虑多个参数,并发性,阻塞和排序:

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            yes          yes
Pool.starmap_async| yes          yes            no           no

笔记:

  • Pool.imapPool.imap_async–地图和map_async的惰性版本。

  • Pool.starmap 方法,除了接受多个参数外,与map方法非常相似。

  • Async方法一次提交所有流程,并在完成后检索结果。使用get方法获取结果。

  • Pool.map(或Pool.apply)方法与Python内置map(或套用)非常相似。它们阻塞主流程,直到所有流程完成并返回结果。

例子:

地图

一次调用一份工作清单

results = pool.map(func, [1, 2, 3])

应用

只能被要求一份工作

for x, y in [[1, 1], [2, 2]]:
    results.append(pool.apply(func, (x, y)))

def collect_result(result):
    results.append(result)

map_async

一次调用一份工作清单

pool.map_async(func, jobs, callback=collect_result)

apply_async

只能调用一个作业并在后台并行执行一个作业

for x, y in [[1, 1], [2, 2]]:
    pool.apply_async(worker, (x, y), callback=collect_result)

星图

pool.map支持多个参数的变体

pool.starmap(func, [(1, 1), (2, 1), (3, 1)])

starmap_async

starmap()和map_async()的组合,它对可迭代的可迭代对象进行迭代,并在未包装可迭代对象的情况下调用func。返回结果对象。

pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)

参考:

在此处找到完整的文档:https : //docs.python.org/3/library/multiprocessing.html

Here is an overview in a table format in order to show the differences between Pool.apply, Pool.apply_async, Pool.map and Pool.map_async. When choosing one, you have to take multi-args, concurrency, blocking, and ordering into account:

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            yes          yes
Pool.starmap_async| yes          yes            no           no

Notes:

  • Pool.imap and Pool.imap_async – lazier version of map and map_async.

  • Pool.starmap method, very much similar to map method besides it acceptance of multiple arguments.

  • Async methods submit all the processes at once and retrieve the results once they are finished. Use get method to obtain the results.

  • Pool.map(or Pool.apply)methods are very much similar to Python built-in map(or apply). They block the main process until all the processes complete and return the result.

Examples:

map

Is called for a list of jobs in one time

results = pool.map(func, [1, 2, 3])

apply

Can only be called for one job

for x, y in [[1, 1], [2, 2]]:
    results.append(pool.apply(func, (x, y)))

def collect_result(result):
    results.append(result)

map_async

Is called for a list of jobs in one time

pool.map_async(func, jobs, callback=collect_result)

apply_async

Can only be called for one job and executes a job in the background in parallel

for x, y in [[1, 1], [2, 2]]:
    pool.apply_async(worker, (x, y), callback=collect_result)

starmap

Is a variant of pool.map which support multiple arguments

pool.starmap(func, [(1, 1), (2, 1), (3, 1)])

starmap_async

A combination of starmap() and map_async() that iterates over iterable of iterables and calls func with the iterables unpacked. Returns a result object.

pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)

Reference:

Find complete documentation here: https://docs.python.org/3/library/multiprocessing.html


如何在Python中使用线程?

问题:如何在Python中使用线程?

我试图了解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我难以理解它们。

您如何清楚地显示为多线程而划分的任务?

I am trying to understand threading in Python. I’ve looked at the documentation and examples, but quite frankly, many examples are overly sophisticated and I’m having trouble understanding them.

How do you clearly show tasks being divided for multi-threading?


回答 0

自2010年提出这个问题以来,如何使用带有mappool的 Python进行简单的多线程处理已经有了真正的简化。

下面的代码来自于一篇文章/博客文章,您绝对应该检出(没有从属关系)- 并行显示在一行中:更好的日常线程任务模型。我将在下面进行总结-最终仅是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

这是以下内容的多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很棒的小功能,是轻松将并行性注入Python代码的关键。对于那些不熟悉的人来说,地图是从Lisp之类的功能语言中提炼出来的。它是将另一个功能映射到序列上的功能。

Map为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。


实作

map函数的并行版本由以下两个库提供:multiprocessing,以及鲜为人知但同样出色的继子child:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但是使用线程代替一个重要的区别 -使用多个进程来执行CPU密集型任务;用于I / O的线程)。

multiprocessing.dummy复制了多处理的API,但仅不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

以及计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数仅在Python 3.3和更高版本中才这样):

要传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是Python的早期版本,则可以通过此变通方法()传递多个参数。

(感谢user136036的有用评论。)

Since this question was asked in 2010, there has been real simplification in how to do simple multithreading with Python with map and pool.

The code below comes from an article/blog post that you should definitely check out (no affiliation) – Parallelism in one line: A Better Model for Day to Day Threading Tasks. I’ll summarize below – it ends up being just a few lines of code:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

Which is the multithreaded version of:

results = []
for item in my_array:
    results.append(my_function(item))

Description

Map is a cool little function, and the key to easily injecting parallelism into your Python code. For those unfamiliar, map is something lifted from functional languages like Lisp. It is a function which maps another function over a sequence.

Map handles the iteration over the sequence for us, applies the function, and stores all of the results in a handy list at the end.


Implementation

Parallel versions of the map function are provided by two libraries:multiprocessing, and also its little known, but equally fantastic step child:multiprocessing.dummy.

multiprocessing.dummy is exactly the same as multiprocessing module, but uses threads instead (an important distinction – use multiple processes for CPU-intensive tasks; threads for (and during) I/O):

multiprocessing.dummy replicates the API of multiprocessing, but is no more than a wrapper around the threading module.

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

And the timing results:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Passing multiple arguments (works like this only in Python 3.3 and later):

To pass multiple arrays:

results = pool.starmap(function, zip(list_a, list_b))

Or to pass a constant and an array:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

If you are using an earlier version of Python, you can pass multiple arguments via this workaround).

(Thanks to user136036 for the helpful comment.)


回答 1

这是一个简单的示例:您需要尝试一些备用URL并返回第一个URL的内容以进行响应。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

在这种情况下,线程被用作简单的优化:每个子线程都在等待URL解析和响应,以便将其内容放入队列中。每个线程都是一个守护进程(如果主线程结束,则不会使进程继续运行-这比不常见);主线程启动所有子线程,get在队列中执行a ,以等待直到其中一个完成a put,然后发出结果并终止(由于它们是守护线程,因此将取消可能仍在运行的所有子线程)。

正确使用Python中的线程总是会与I / O操作相关联(因为CPython无论如何都不会使用多个内核来运行受CPU约束的任务,因此,线程的唯一原因是在等待某些I / O时不会阻塞进程)。顺便说一句,队列几乎总是将工作分配到线程和/或收集工作结果的最佳方法,并且它们本质上是线程安全的,因此它们使您不必担心锁,条件,事件,信号量以及其他相互之间的关系。线程协调/通信概念。

Here’s a simple example: you need to try a few alternative URLs and return the contents of the first one to respond.

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

This is a case where threading is used as a simple optimization: each subthread is waiting for a URL to resolve and respond, in order to put its contents on the queue; each thread is a daemon (won’t keep the process up if main thread ends — that’s more common than not); the main thread starts all subthreads, does a get on the queue to wait until one of them has done a put, then emits the results and terminates (which takes down any subthreads that might still be running, since they’re daemon threads).

Proper use of threads in Python is invariably connected to I/O operations (since CPython doesn’t use multiple cores to run CPU-bound tasks anyway, the only reason for threading is not blocking the process while there’s a wait for some I/O). Queues are almost invariably the best way to farm out work to threads and/or collect the work’s results, by the way, and they’re intrinsically threadsafe, so they save you from worrying about locks, conditions, events, semaphores, and other inter-thread coordination/communication concepts.


回答 2

注意:对于Python中的实际并行化,您应该使用多处理模块来分叉多个并行执行的进程(由于全局解释器锁,Python线程提供了交织,但实际上它们是串行执行的,而不是并行执行的,并且仅仅是在交错I / O操作时很有用)。

但是,如果您只是在寻找交织(或者正在进行尽管可以使用全局解释器锁而可以并行化的I / O操作),那么就可以从线程模块开始。作为一个非常简单的示例,让我们考虑通过并行求和子范围来求和一个大范围的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,以上示例是一个非常愚蠢的示例,因为它完全不执行任何I / O操作,并且由于全局解释器锁定,尽管在CPython中是交错执行的(带有上下文切换的额外开销),但仍将串行执行。

NOTE: For actual parallelization in Python, you should use the multiprocessing module to fork multiple processes that execute in parallel (due to the global interpreter lock, Python threads provide interleaving, but they are in fact executed serially, not in parallel, and are only useful when interleaving I/O operations).

However, if you are merely looking for interleaving (or are doing I/O operations that can be parallelized despite the global interpreter lock), then the threading module is the place to start. As a really simple example, let’s consider the problem of summing a large range by summing subranges in parallel:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Note that the above is a very stupid example, as it does absolutely no I/O and will be executed serially albeit interleaved (with the added overhead of context switching) in CPython due to the global interpreter lock.


回答 3

像其他提到的一样,由于GIL,CPython只能将线程用于I / O等待。

如果您想从多个内核中受益于CPU绑定任务,请使用multiprocessing

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Like others mentioned, CPython can use threads only for I/O waits due to GIL.

If you want to benefit from multiple cores for CPU-bound tasks, use multiprocessing:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

回答 4

仅需注意:线程不需要队列。

这是我能想到的最简单的示例,其中显示了10个进程同时运行。

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

Just a note: A queue is not required for threading.

This is the simplest example I could imagine that shows 10 processes running concurrently.

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

回答 5

Alex Martelli的回答对我有所帮助。但是,这是我认为更有用的修改版本(至少对我而言)。

更新:在Python 2和Python 3中均可使用

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

The answer from Alex Martelli helped me. However, here is a modified version that I thought was more useful (at least to me).

Updated: works in both Python 2 and Python 3

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

回答 6

给定一个函数,将f其像这样进行线程化:

import threading
threading.Thread(target=f).start()

将参数传递给 f

threading.Thread(target=f, args=(a,b,c)).start()

Given a function, f, thread it like this:

import threading
threading.Thread(target=f).start()

To pass arguments to f

threading.Thread(target=f, args=(a,b,c)).start()

回答 7

我发现这非常有用:创建与内核一样多的线程,并让它们执行(大量)任务(在这种情况下,调用Shell程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

I found this very useful: create as many threads as cores and let them execute a (large) number of tasks (in this case, calling a shell program):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

回答 8

Python 3具有启动并行任务的功能。这使我们的工作更加轻松。

它具有线程池进程池

以下提供了一个见解:

ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Python 3 has the facility of launching parallel tasks. This makes our work easier.

It has thread pooling and process pooling.

The following gives an insight:

ThreadPoolExecutor Example (source)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor (source)

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

回答 9

使用新的并发模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

对于所有以前接触过Java的人来说,执行者方法似乎都很熟悉。

另外请注意:为了使Universe保持理智,如果您不使用with上下文,请不要忘记关闭池/执行器(它非常强大,它可以为您完成此工作)

Using the blazing new concurrent.futures module

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

The executor approach might seem familiar to all those who have gotten their hands dirty with Java before.

Also on a side note: To keep the universe sane, don’t forget to close your pools/executors if you don’t use with context (which is so awesome that it does it for you)


回答 10

对我而言,线程的完美示例是监视异步事件。看这段代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话并执行以下操作来处理此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等一下

>>> a[0] = 2
Mon = 2

For me, the perfect example for threading is monitoring asynchronous events. Look at this code.

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

You can play with this code by opening an IPython session and doing something like:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Wait a few minutes

>>> a[0] = 2
Mon = 2

回答 11

大多数文档和教程都使用Python ThreadingQueue模块,对于初学者来说,它们似乎不胜枚举。

也许考虑使用concurrent.futures.ThreadPoolExecutorPython 3 的模块。

结合with子句和列表理解,这可能是一个真正的魅力。

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

Most documentation and tutorials use Python’s Threading and Queue module, and they could seem overwhelming for beginners.

Perhaps consider the concurrent.futures.ThreadPoolExecutor module of Python 3.

Combined with with clause and list comprehension it could be a real charm.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

回答 12

我在这里看到了很多没有执行任何实际工作的示例,这些示例主要是CPU约束的。这是一个CPU限制任务的示例,该任务计算1000万到10.05百万之间的所有素数。我在这里使用了所有四种方法:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

这是我的Mac OS X四核计算机上的结果

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

I saw a lot of examples here where no real work was being performed, and they were mostly CPU-bound. Here is an example of a CPU-bound task that computes all prime numbers between 10 million and 10.05 million. I have used all four methods here:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Here are the results on my Mac OS X four-core machine

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

回答 13

这是使用线程导入CSV的非常简单的示例。(图书馆收录的目的可能有所不同。)

辅助功能:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

驱动功能:

import_handler(csv_file_name)

Here is the very simple example of CSV import using threading. (Library inclusion may differ for different purpose.)

Helper Functions:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

Driver Function:

import_handler(csv_file_name)

回答 14

我想举一个简单的例子,当我不得不自己解决这个问题时,我发现这些解释很有用。

在此答案中,您将找到有关Python的GIL(全局解释器锁)的一些信息,以及使用multiprocessing.dummy和一些简单的基准编写的简单的日常示例。

全局翻译锁定(GIL)

Python不允许真正意义上的多线程。它具有一个多线程程序包,但是如果您想使用多线程来加快代码速度,那么使用它通常不是一个好主意。

Python具有称为全局解释器锁(GIL)的构造。GIL确保您的“线程”只能在任何一次执行。线程获取GIL,做一些工作,然后将GIL传递到下一个线程。

这发生得非常快,以至于人眼似乎您的线程正在并行执行,但是实际上它们只是使用相同的CPU内核轮流执行。

所有这些GIL传递都会增加执行开销。这意味着,如果您想使代码运行更快,那么使用线程包通常不是一个好主意。

有理由使用Python的线程包。如果您想同时运行某些东西,而效率不是问题,那么它就很好而且很方便。或者,如果您正在运行的代码需要等待某些东西(例如某些I / O),那么这很有意义。但是线程库不允许您使用额外的CPU内核。

多线程可以外包给操作系统(通过执行多处理),某些外部应用程序可以调用Python代码(例如SparkHadoop),也可以外包给Python代码调用的某些代码(例如:让您的Python代码调用C函数来执行昂贵的多线程任务)。

为什么如此重要

因为很多人在学习GIL是什么之前,会花费大量时间试图在他们喜欢的Python多线程代码中找到瓶颈。

清除此信息后,这是我的代码:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

I would like to contribute with a simple example and the explanations I’ve found useful when I had to tackle this problem myself.

In this answer you will find some information about Python’s GIL (global interpreter lock) and a simple day-to-day example written using multiprocessing.dummy plus some simple benchmarks.

Global Interpreter Lock (GIL)

Python doesn’t allow multi-threading in the truest sense of the word. It has a multi-threading package, but if you want to multi-thread to speed your code up, then it’s usually not a good idea to use it.

Python has a construct called the global interpreter lock (GIL). The GIL makes sure that only one of your ‘threads’ can execute at any one time. A thread acquires the GIL, does a little work, then passes the GIL onto the next thread.

This happens very quickly so to the human eye it may seem like your threads are executing in parallel, but they are really just taking turns using the same CPU core.

All this GIL passing adds overhead to execution. This means that if you want to make your code run faster then using the threading package often isn’t a good idea.

There are reasons to use Python’s threading package. If you want to run some things simultaneously, and efficiency is not a concern, then it’s totally fine and convenient. Or if you are running code that needs to wait for something (like some I/O) then it could make a lot of sense. But the threading library won’t let you use extra CPU cores.

Multi-threading can be outsourced to the operating system (by doing multi-processing), and some external application that calls your Python code (for example, Spark or Hadoop), or some code that your Python code calls (for example: you could have your Python code call a C function that does the expensive multi-threaded stuff).

Why This Matters

Because lots of people spend a lot of time trying to find bottlenecks in their fancy Python multi-threaded code before they learn what the GIL is.

Once this information is clear, here’s my code:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

回答 15

这是带有一个简单示例的多线程,将很有帮助。您可以运行它并轻松了解Python中多线程的工作方式。在以前的线程完成其工作之前,我使用了一个锁来防止访问其他线程。通过使用这一行代码,

tLock = threading.BoundedSemaphore(值= 4)

您可以一次允许多个进程,并保留其余线程,这些线程将在以后的进程或之前的进程完成后运行。

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

Here is multi threading with a simple example which will be helpful. You can run it and understand easily how multi threading is working in Python. I used a lock for preventing access to other threads until the previous threads finished their work. By the use of this line of code,

tLock = threading.BoundedSemaphore(value=4)

you can allow a number of processes at a time and keep hold to the rest of the threads which will run later or after finished previous processes.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

回答 16

通过从这篇文章中借用,我们知道在多线程,多处理和异步/ asyncio及其用法之间进行选择。

Python 3具有新的内置库以实现并发性和并行性:current.futures

因此,我将通过一个实验来演示如何通过以下方式运行四个任务(即.sleep()方法)Threading-Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[ 注意 ]:

  • 从以上结果中可以看出,最好的情况是这四个任务需要3个工作人员。
  • 如果您有流程任务而不是I / O绑定或阻止(multiprocessingthreading),则可以将更ThreadPoolExecutor改为ProcessPoolExecutor

With borrowing from this post we know about choosing between the multithreading, multiprocessing, and async/asyncio and their usage.

Python 3 has a new built-in library in order to concurrency and parallelism: concurrent.futures

So I’ll demonstrate through an experiment to run four tasks (i.e. .sleep() method) by Threading-Pool manner:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

Output:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[NOTE]:

  • As you can see in the above results, the best case was 3 workers for those four tasks.
  • If you have a process task instead of I/O bound or blocking (multiprocessing vs threading) you could change the ThreadPoolExecutor to ProcessPoolExecutor.

回答 17

先前的解决方案均未在我的GNU / Linux服务器(我没有管理员权限)上实际使用多个内核。他们只是在一个核心上运行。

我使用了较低级别的os.fork界面来生成多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

None of the previous solutions actually used multiple cores on my GNU/Linux server (where I don’t have administrator rights). They just ran on a single core.

I used the lower level os.fork interface to spawn multiple processes. This is the code that worked for me:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

回答 18

import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()