问题:用Python发送100,000个HTTP请求的最快方法是什么?
我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python 并发库,但无法弄清楚如何正确编写此程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。
I am opening a file which has 100,000 URL’s. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible – I suppose that means ‘concurrently’.
回答 0
无捻解决方案:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
这比扭曲的解决方案要快一点,并且使用的CPU更少。
Twistedless solution:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open('urllist.txt'):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
This one is slighty faster than the twisted solution and uses less CPU.
回答 1
使用龙卷风异步联网库的解决方案
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
A solution using tornado asynchronous networking library
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
i += 1
http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()
回答 2
自2010年发布以来,情况发生了很大变化,我还没有尝试所有其他答案,但是尝试了一些答案,我发现使用python3.6对我来说最有效。
我能够每秒获取约150个在AWS上运行的唯一域。
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
Things have changed quite a bit since 2010 when this was posted and I haven’t tried all the other answers but I have tried a few, and I found this to work the best for me using python3.6.
I was able to fetch about ~150 unique domains per second running on AWS.
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="\r")
time2 = time.time()
print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())
回答 3
线程绝对不是这里的答案。如果总体目标是“最快的方法”,它们将提供进程和内核瓶颈,以及吞吐量限制。
一点点的twisted
及其异步HTTP
客户端将为您带来更好的结果。
Threads are absolutely not the answer here. They will provide both process and kernel bottlenecks, as well as throughput limits that are not acceptable if the overall goal is “the fastest way”.
A little bit of twisted
and its asynchronous HTTP
client would give you much better results.
回答 4
我知道这是一个古老的问题,但是在Python 3.7中,您可以使用asyncio
和来实现aiohttp
。
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
您可以阅读有关它的更多信息,并在此处查看示例。
I know this is an old question, but in Python 3.7 you can do this using asyncio
and aiohttp
.
import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError
async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
try:
resp = await session.request(method="GET", url=url, **kwargs)
except ClientConnectorError:
return (url, 404)
return (url, resp.status)
async def make_requests(urls: set, **kwargs) -> None:
async with ClientSession() as session:
tasks = []
for url in urls:
tasks.append(
fetch_html(url=url, session=session, **kwargs)
)
results = await asyncio.gather(*tasks)
for result in results:
print(f'{result[1]} - {str(result[0])}')
if __name__ == "__main__":
import pathlib
import sys
assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
here = pathlib.Path(__file__).parent
with open(here.joinpath("urls.txt")) as infile:
urls = set(map(str.strip, infile))
asyncio.run(make_requests(urls=urls))
You can read more about it and see an example here.
回答 5
使用grequests,它是request + Gevent模块的组合。
GRequests允许您将Requests与Gevent一起使用,以轻松进行异步HTTP请求。
用法很简单:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
创建一组未发送的请求:
>>> rs = (grequests.get(u) for u in urls)
同时发送它们:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
Use grequests , it’s a combination of requests + Gevent module .
GRequests allows you to use Requests with Gevent to make asyncronous HTTP Requests easily.
Usage is simple:
import grequests
urls = [
'http://www.heroku.com',
'http://tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com'
]
Create a set of unsent Requests:
>>> rs = (grequests.get(u) for u in urls)
Send them all at the same time:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
回答 6
解决此问题的一种好方法是首先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。
在理想情况下,这仅意味着同时启动100,000个线程,这些线程会将其结果输出到字典或列表中以供以后处理,但是实际上,您可以通过这种方式发出多少个并行HTTP请求受到限制。在本地,您可以同时打开多少个套接字,Python解释器允许多少个执行线程受到限制。从远程来看,如果所有请求都针对一台或多台服务器,则同时连接的数量可能会受到限制。这些限制可能导致您必须以一种方式来编写脚本,以便在任何时候仅轮询一小部分网址(如另一位发帖人所述,100可能是一个不错的线程池大小,尽管您可能会发现自己可以成功部署更多)。
您可以按照以下设计模式来解决上述问题:
- 启动一个线程以启动新的请求线程,直到当前正在运行的线程(您可以通过threading.active_count()或通过将线程对象推入数据结构来跟踪它们)的数量大于等于您的同时请求的最大数量(例如100) ,然后短暂睡眠。没有更多URL可处理时,该线程应终止。因此,线程将不断唤醒,启动新线程并进入休眠状态,直到完成操作为止。
- 让请求线程将其结果存储在某种数据结构中,以供以后检索和输出。如果您要在其中存储结果的结构是C
list
或dict
CPython,则可以安全地从线程中添加或插入不带锁的唯一项,但是如果您写入文件或需要更复杂的跨线程数据交互,则应使用a互斥锁,以保护该状态免于腐败。
我建议您使用线程模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是裸露的,但是对问题的描述表明它完全可以满足您的需求。
最后,如果你想看到用Python编写的并行网络应用的一个非常简单的应用程序,请ssh.py。这是一个小型库,使用Python线程并行化许多SSH连接。该设计足够接近您的要求,您可能会发现它是很好的资源。
A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.
In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).
You can follow this design pattern to resolve the above issue:
- Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
- Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a
list
or dict
in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.
I would suggest you use the threading module. You can use it to launch and track running threads. Python’s threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.
Finally, if you’d like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It’s a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.
回答 7
If you’re looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.
Specifically, I’d suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted’s style of asynchronous programming.
A HowTo on Twisted’s asynchronous web client API is available at:
http://twistedmatrix.com/documents/current/web/howto/client.html
回答 8
一个解法:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
测试时间:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
平时:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
A solution:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open('urllist.txt'):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Testtime:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
回答 9
使用线程池是一个不错的选择,这将使此操作相当容易。不幸的是,python没有使线程池变得异常简单的标准库。但是,这里有一个不错的库,它可以帮助您入门:http : //www.chrisarndt.de/projects/threadpool/
来自其站点的代码示例:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
希望这可以帮助。
Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn’t have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/
Code example from their site:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
Hope this helps.
回答 10
创建epoll
对象,
开放许多客户端TCP套接字,
调整自己的发送缓冲区是有点超过请求头,
发送一个请求头-它应该是即时的,只是放置到一个缓冲区,在注册插座epoll
对象,
做.poll
对epoll
obect,
阅读前3来自每个套接字的字节.poll
,
将其写入sys.stdout
后跟\n
(不要刷新),关闭客户端套接字。
限制同时打开的套接字数-创建套接字时处理错误。仅当另一个插座关闭时,才创建一个新的插座。
调整操作系统限制。
尝试分叉到几个(不是很多)进程:这可能有助于更有效地使用CPU。
Create epoll
object,
open many client TCP sockets,
adjust their send buffers to be a bit more than request header,
send a request header — it should be immediate, just placing into a buffer,
register socket in epoll
object,
do .poll
on epoll
obect,
read first 3 bytes from each socket from .poll
,
write them to sys.stdout
followed by \n
(don’t flush),
close the client socket.
Limit number of sockets opened simultaneously — handle errors when sockets are created. Create a new socket only if another is closed.
Adjust OS limits.
Try forking into a few (not many) processes: this may help to use CPU a bit more effectively.
回答 11
对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块(例如Queue)可能会有所帮助。
我之前并行下载文件时也做了类似的事情,这对我来说已经足够了,但是这并不是您正在谈论的范围。
如果您的任务更多地受CPU限制,则可能需要查看多处理模块,该模块将允许您利用更多的CPU /核心/线程(更多的进程不会互相阻塞,因为锁定是按进程进行的)
For your case, threading will probably do the trick as you’ll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.
I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn’t on the scale you are talking about.
If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won’t block each other since the locking is per process)
回答 12
考虑使用Windmill,尽管Windmill可能无法执行那么多线程。
您可以在5台计算机上使用手动滚动的Python脚本来完成此操作,每台计算机都使用端口40000-60000连接出站,从而打开100,000个端口连接。
同样,使用线程良好的QA应用程序(例如OpenSTA) 进行示例测试可能会有所帮助,以便了解每个服务器可以处理的数量。
另外,尝试研究仅将简单的Perl与LWP :: ConnCache类一起使用。这样,您可能会获得更多的性能(更多的连接)。
Consider using Windmill , although Windmill probably cant do that many threads.
You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.
Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.
Also, try looking into just using simple Perl with the LWP::ConnCache class. You’ll probably get more performance (more connections) that way.
回答 13
这个扭曲的异步Web客户端运行得很快。
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
This twisted async web client goes pretty fast.
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request('HEAD', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
回答 14
最简单的方法是使用Python的内置线程库。它们不是“真实的” /内核线程,它们有问题(例如序列化),但是足够好。您需要一个队列和线程池。一种选择在这里,但是编写自己的东西很简单。您无法并行处理所有100,000个呼叫,但是可以同时触发100个(或大约)呼叫。
The easiest way would be to use Python’s built-in threading library. They’re not “real” / kernel threads They have issues (like serialization), but are good enough. You’d want a queue & thread pool. One option is here, but it’s trivial to write your own. You can’t parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。