标签归档:http

Python请求-打印整个HTTP请求(原始)?

问题:Python请求-打印整个HTTP请求(原始)?

在使用requests模块时,有什么方法可以打印原始HTTP请求?

我不仅想要标题,还想要请求行,标题和内容打印输出。是否有可能看到HTTP请求最终构造了什么?

While using the requests module, is there any way to print the raw HTTP request?

I don’t want just the headers, I want the request line, headers, and content printout. Is it possible to see what ultimately is constructed from HTTP request?


回答 0

从v1.2.3开始,请求添加了PreparedRequest对象。按照文档“它包含将发送到服务器的确切字节”。

可以使用它来漂亮地打印请求,如下所示:

import requests

req = requests.Request('POST','http://stackoverflow.com',headers={'X-Custom':'Test'},data='a=1&b=2')
prepared = req.prepare()

def pretty_print_POST(req):
    """
    At this point it is completely built and ready
    to be fired; it is "prepared".

    However pay attention at the formatting used in 
    this function because it is programmed to be pretty 
    printed and may differ from the actual request.
    """
    print('{}\n{}\r\n{}\r\n\r\n{}'.format(
        '-----------START-----------',
        req.method + ' ' + req.url,
        '\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
        req.body,
    ))

pretty_print_POST(prepared)

生成:

-----------START-----------
POST http://stackoverflow.com/
Content-Length: 7
X-Custom: Test

a=1&b=2

然后,您可以使用以下命令发送实际请求:

s = requests.Session()
s.send(prepared)

这些链接是可用的最新文档,因此它们的内容可能会发生变化: 高级-准备的请求API-较低级别的类

Since v1.2.3 Requests added the PreparedRequest object. As per the documentation “it contains the exact bytes that will be sent to the server”.

One can use this to pretty print a request, like so:

import requests

req = requests.Request('POST','http://stackoverflow.com',headers={'X-Custom':'Test'},data='a=1&b=2')
prepared = req.prepare()

def pretty_print_POST(req):
    """
    At this point it is completely built and ready
    to be fired; it is "prepared".

    However pay attention at the formatting used in 
    this function because it is programmed to be pretty 
    printed and may differ from the actual request.
    """
    print('{}\n{}\r\n{}\r\n\r\n{}'.format(
        '-----------START-----------',
        req.method + ' ' + req.url,
        '\r\n'.join('{}: {}'.format(k, v) for k, v in req.headers.items()),
        req.body,
    ))

pretty_print_POST(prepared)

which produces:

-----------START-----------
POST http://stackoverflow.com/
Content-Length: 7
X-Custom: Test

a=1&b=2

Then you can send the actual request with this:

s = requests.Session()
s.send(prepared)

These links are to the latest documentation available, so they might change in content: Advanced – Prepared requests and API – Lower level classes


回答 1

import requests
response = requests.post('http://httpbin.org/post', data={'key1':'value1'})
print(response.request.body)
print(response.request.headers)

我正在使用请求版本2.18.4和Python 3

import requests
response = requests.post('http://httpbin.org/post', data={'key1':'value1'})
print(response.request.body)
print(response.request.headers)

I am using requests version 2.18.4 and Python 3


回答 2

注意:此答案已过时。requests 作为AntonioHerraizS的答复文件,较新的支持版本直接获取请求内容

无法获得请求的真实原始内容requests,因为它仅处理更高级别的对象,例如标头方法类型requests利用urllib3发送请求,但urllib3 不能与原始数据处理-它使用httplib。这是请求的代表性堆栈跟踪:

-> r= requests.get("http://google.com")
  /usr/local/lib/python2.7/dist-packages/requests/api.py(55)get()
-> return request('get', url, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/api.py(44)request()
-> return session.request(method=method, url=url, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/sessions.py(382)request()
-> resp = self.send(prep, **send_kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/sessions.py(485)send()
-> r = adapter.send(request, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/adapters.py(324)send()
-> timeout=timeout
  /usr/local/lib/python2.7/dist-packages/requests/packages/urllib3/connectionpool.py(478)urlopen()
-> body=body, headers=headers)
  /usr/local/lib/python2.7/dist-packages/requests/packages/urllib3/connectionpool.py(285)_make_request()
-> conn.request(method, url, **httplib_request_kw)
  /usr/lib/python2.7/httplib.py(958)request()
-> self._send_request(method, url, body, headers)

httplib机器内部,我们可以看到HTTPConnection._send_request间接使用HTTPConnection._send_output,它最终创建了原始请求主体(如果存在),并使用HTTPConnection.send了分别发送它们。send终于到达插座。

由于没有钩子可以做您想做的事情,因此,万不得已时,您可以Monkey补丁httplib来获取内容。这是一个脆弱的解决方案,如果httplib进行了更改,您可能需要对其进行调整。如果您打算使用此解决方案分发软件,则可能要考虑打包httplib而不是使用系统包,这很容易,因为它是纯python模块。

las,不费吹灰之力,解决方案:

import requests
import httplib

def patch_send():
    old_send= httplib.HTTPConnection.send
    def new_send( self, data ):
        print data
        return old_send(self, data) #return is not necessary, but never hurts, in case the library is changed
    httplib.HTTPConnection.send= new_send

patch_send()
requests.get("http://www.python.org")

产生输出:

GET / HTTP/1.1
Host: www.python.org
Accept-Encoding: gzip, deflate, compress
Accept: */*
User-Agent: python-requests/2.1.0 CPython/2.7.3 Linux/3.2.0-23-generic-pae

Note: this answer is outdated. Newer versions of requests support getting the request content directly, as AntonioHerraizS’s answer documents.

It’s not possible to get the true raw content of the request out of requests, since it only deals with higher level objects, such as headers and method type. requests uses urllib3 to send requests, but urllib3 also doesn’t deal with raw data – it uses httplib. Here’s a representative stack trace of a request:

-> r= requests.get("http://google.com")
  /usr/local/lib/python2.7/dist-packages/requests/api.py(55)get()
-> return request('get', url, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/api.py(44)request()
-> return session.request(method=method, url=url, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/sessions.py(382)request()
-> resp = self.send(prep, **send_kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/sessions.py(485)send()
-> r = adapter.send(request, **kwargs)
  /usr/local/lib/python2.7/dist-packages/requests/adapters.py(324)send()
-> timeout=timeout
  /usr/local/lib/python2.7/dist-packages/requests/packages/urllib3/connectionpool.py(478)urlopen()
-> body=body, headers=headers)
  /usr/local/lib/python2.7/dist-packages/requests/packages/urllib3/connectionpool.py(285)_make_request()
-> conn.request(method, url, **httplib_request_kw)
  /usr/lib/python2.7/httplib.py(958)request()
-> self._send_request(method, url, body, headers)

Inside the httplib machinery, we can see HTTPConnection._send_request indirectly uses HTTPConnection._send_output, which finally creates the raw request and body (if it exists), and uses HTTPConnection.send to send them separately. send finally reaches the socket.

Since there’s no hooks for doing what you want, as a last resort you can monkey patch httplib to get the content. It’s a fragile solution, and you may need to adapt it if httplib is changed. If you intend to distribute software using this solution, you may want to consider packaging httplib instead of using the system’s, which is easy, since it’s a pure python module.

Alas, without further ado, the solution:

import requests
import httplib

def patch_send():
    old_send= httplib.HTTPConnection.send
    def new_send( self, data ):
        print data
        return old_send(self, data) #return is not necessary, but never hurts, in case the library is changed
    httplib.HTTPConnection.send= new_send

patch_send()
requests.get("http://www.python.org")

which yields the output:

GET / HTTP/1.1
Host: www.python.org
Accept-Encoding: gzip, deflate, compress
Accept: */*
User-Agent: python-requests/2.1.0 CPython/2.7.3 Linux/3.2.0-23-generic-pae

回答 3

更好的主意是使用requests_toolbelt库,该库可以将请求和响应都作为字符串转储出去,以供您打印到控制台。它使用上述解决方案无法很好处理的文件和编码来处理所有棘手的情况。

就这么简单:

import requests
from requests_toolbelt.utils import dump

resp = requests.get('https://httpbin.org/redirect/5')
data = dump.dump_all(resp)
print(data.decode('utf-8'))

资料来源:https : //toolbelt.readthedocs.org/en/latest/dumputils.html

您只需输入以下内容即可安装:

pip install requests_toolbelt

An even better idea is to use the requests_toolbelt library, which can dump out both requests and responses as strings for you to print to the console. It handles all the tricky cases with files and encodings which the above solution does not handle well.

It’s as easy as this:

import requests
from requests_toolbelt.utils import dump

resp = requests.get('https://httpbin.org/redirect/5')
data = dump.dump_all(resp)
print(data.decode('utf-8'))

Source: https://toolbelt.readthedocs.org/en/latest/dumputils.html

You can simply install it by typing:

pip install requests_toolbelt

回答 4

这是一个代码,与上面相同,但是带有响应头:

import socket
def patch_requests():
    old_readline = socket._fileobject.readline
    if not hasattr(old_readline, 'patched'):
        def new_readline(self, size=-1):
            res = old_readline(self, size)
            print res,
            return res
        new_readline.patched = True
        socket._fileobject.readline = new_readline
patch_requests()

我花了很多时间寻找这个,所以如果有人需要,我就把它留在这里。

Here is a code, which makes the same, but with response headers:

import socket
def patch_requests():
    old_readline = socket._fileobject.readline
    if not hasattr(old_readline, 'patched'):
        def new_readline(self, size=-1):
            res = old_readline(self, size)
            print res,
            return res
        new_readline.patched = True
        socket._fileobject.readline = new_readline
patch_requests()

I spent a lot of time searching for this, so I’m leaving it here, if someone needs.


回答 5

我使用以下函数来格式化请求。就像@AntonioHerraizS一样,除了它还会在主体中漂亮地打印JSON对象,并标记请求的所有部分。

format_json = functools.partial(json.dumps, indent=2, sort_keys=True)
indent = functools.partial(textwrap.indent, prefix='  ')

def format_prepared_request(req):
    """Pretty-format 'requests.PreparedRequest'

    Example:
        res = requests.post(...)
        print(format_prepared_request(res.request))

        req = requests.Request(...)
        req = req.prepare()
        print(format_prepared_request(res.request))
    """
    headers = '\n'.join(f'{k}: {v}' for k, v in req.headers.items())
    content_type = req.headers.get('Content-Type', '')
    if 'application/json' in content_type:
        try:
            body = format_json(json.loads(req.body))
        except json.JSONDecodeError:
            body = req.body
    else:
        body = req.body
    s = textwrap.dedent("""
    REQUEST
    =======
    endpoint: {method} {url}
    headers:
    {headers}
    body:
    {body}
    =======
    """).strip()
    s = s.format(
        method=req.method,
        url=req.url,
        headers=indent(headers),
        body=indent(body),
    )
    return s

我有一个类似的函数来格式化响应:

def format_response(resp):
    """Pretty-format 'requests.Response'"""
    headers = '\n'.join(f'{k}: {v}' for k, v in resp.headers.items())
    content_type = resp.headers.get('Content-Type', '')
    if 'application/json' in content_type:
        try:
            body = format_json(resp.json())
        except json.JSONDecodeError:
            body = resp.text
    else:
        body = resp.text
    s = textwrap.dedent("""
    RESPONSE
    ========
    status_code: {status_code}
    headers:
    {headers}
    body:
    {body}
    ========
    """).strip()

    s = s.format(
        status_code=resp.status_code,
        headers=indent(headers),
        body=indent(body),
    )
    return s

I use the following function to format requests. It’s like @AntonioHerraizS except it will pretty-print JSON objects in the body as well, and it labels all parts of the request.

format_json = functools.partial(json.dumps, indent=2, sort_keys=True)
indent = functools.partial(textwrap.indent, prefix='  ')

def format_prepared_request(req):
    """Pretty-format 'requests.PreparedRequest'

    Example:
        res = requests.post(...)
        print(format_prepared_request(res.request))

        req = requests.Request(...)
        req = req.prepare()
        print(format_prepared_request(res.request))
    """
    headers = '\n'.join(f'{k}: {v}' for k, v in req.headers.items())
    content_type = req.headers.get('Content-Type', '')
    if 'application/json' in content_type:
        try:
            body = format_json(json.loads(req.body))
        except json.JSONDecodeError:
            body = req.body
    else:
        body = req.body
    s = textwrap.dedent("""
    REQUEST
    =======
    endpoint: {method} {url}
    headers:
    {headers}
    body:
    {body}
    =======
    """).strip()
    s = s.format(
        method=req.method,
        url=req.url,
        headers=indent(headers),
        body=indent(body),
    )
    return s

And I have a similar function to format the response:

def format_response(resp):
    """Pretty-format 'requests.Response'"""
    headers = '\n'.join(f'{k}: {v}' for k, v in resp.headers.items())
    content_type = resp.headers.get('Content-Type', '')
    if 'application/json' in content_type:
        try:
            body = format_json(resp.json())
        except json.JSONDecodeError:
            body = resp.text
    else:
        body = resp.text
    s = textwrap.dedent("""
    RESPONSE
    ========
    status_code: {status_code}
    headers:
    {headers}
    body:
    {body}
    ========
    """).strip()

    s = s.format(
        status_code=resp.status_code,
        headers=indent(headers),
        body=indent(body),
    )
    return s

回答 6

requests支持所谓的事件挂钩(从2.23开始,实际上只有response挂钩)。该钩子可用于请求以打印完整的请求-响应对数据,包括有效的URL,标头和正文,例如:

import textwrap
import requests

def print_roundtrip(response, *args, **kwargs):
    format_headers = lambda d: '\n'.join(f'{k}: {v}' for k, v in d.items())
    print(textwrap.dedent('''
        ---------------- request ----------------
        {req.method} {req.url}
        {reqhdrs}

        {req.body}
        ---------------- response ----------------
        {res.status_code} {res.reason} {res.url}
        {reshdrs}

        {res.text}
    ''').format(
        req=response.request, 
        res=response, 
        reqhdrs=format_headers(response.request.headers), 
        reshdrs=format_headers(response.headers), 
    ))

requests.get('https://httpbin.org/', hooks={'response': print_roundtrip})

运行它会打印:

---------------- request ----------------
GET https://httpbin.org/
User-Agent: python-requests/2.23.0
Accept-Encoding: gzip, deflate
Accept: */*
Connection: keep-alive

None
---------------- response ----------------
200 OK https://httpbin.org/
Date: Thu, 14 May 2020 17:16:13 GMT
Content-Type: text/html; charset=utf-8
Content-Length: 9593
Connection: keep-alive
Server: gunicorn/19.9.0
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true

<!DOCTYPE html>
<html lang="en">
...
</html>

如果响应为二进制res.textres.content则可能需要更改为。

requests supports so called event hooks (as of 2.23 there’s actually only response hook). The hook can be used on a request to print full request-response pair’s data, including effective URL, headers and bodies, like:

import textwrap
import requests

def print_roundtrip(response, *args, **kwargs):
    format_headers = lambda d: '\n'.join(f'{k}: {v}' for k, v in d.items())
    print(textwrap.dedent('''
        ---------------- request ----------------
        {req.method} {req.url}
        {reqhdrs}

        {req.body}
        ---------------- response ----------------
        {res.status_code} {res.reason} {res.url}
        {reshdrs}

        {res.text}
    ''').format(
        req=response.request, 
        res=response, 
        reqhdrs=format_headers(response.request.headers), 
        reshdrs=format_headers(response.headers), 
    ))

requests.get('https://httpbin.org/', hooks={'response': print_roundtrip})

Running it prints:

---------------- request ----------------
GET https://httpbin.org/
User-Agent: python-requests/2.23.0
Accept-Encoding: gzip, deflate
Accept: */*
Connection: keep-alive

None
---------------- response ----------------
200 OK https://httpbin.org/
Date: Thu, 14 May 2020 17:16:13 GMT
Content-Type: text/html; charset=utf-8
Content-Length: 9593
Connection: keep-alive
Server: gunicorn/19.9.0
Access-Control-Allow-Origin: *
Access-Control-Allow-Credentials: true

<!DOCTYPE html>
<html lang="en">
...
</html>

You may want to change res.text to res.content if the response is binary.


Http-prompt 构建在HTTPie之上的交互式命令行HTTP和API测试客户端,具有自动完成、语法突出显示等功能

Hug 使API开发尽可能简单


Hug的目标是使开发Python驱动的API尽可能简单,但并不简单。因此,它极大地简化了Python API开发

Hug的设计目标:

  • 使开发Python驱动的API与书面定义一样简洁
  • 框架应该鼓励编写自我文档代码
  • 它应该是快的。出于性能原因,开发人员永远不会觉得有必要寻找其他地方。
  • 为在Hug之上编写的API编写测试应该是简单而直观的
  • 在API框架中只做一次比将问题集推给API框架的用户要好
  • 成为下一代Python API的基础,采用最新技术

作为这些目标的结果,Hug仅支持Python3+,并且是在此基础上构建的Falcon’s高性能HTTP库

支持拥抱发展

Get professionally supported hug with the Tidelift Subscription

拥抱的专业支持是作为Tidelift
Subscription
Tidelift为软件开发团队提供了购买和维护软件的单一来源,并由最了解该技术的专家提供专业级别保证,同时与现有工具无缝集成

安装拥抱

安装Hug非常简单,只需:

pip3 install hug --upgrade

理想情况下,在virtual environment

快速入门

只需几行代码即可使用简单端点构建示例API

# filename: happy_birthday.py
"""A basic (single function) API written using hug"""
import hug


@hug.get('/happy_birthday')
def happy_birthday(name, age:hug.types.number=1):
    """Says happy birthday to a user"""
    return "Happy {age} Birthday {name}!".format(**locals())

要运行,请在命令行中键入:

hug -f happy_birthday.py

您可以在浏览器中访问该示例,网址为:localhost:8000/happy_birthday?name=hug&age=1然后在以下位置查看您的API的文档localhost:8000/documentation

参数也可以在URL中编码(签出happy_birthday.py对于整个示例)

@hug.get('/greet/{event}')
def greet(event: str):
    """Greets appropriately (from http://blog.ketchum.com/how-to-write-10-common-holiday-greetings/)  """
    greetings = "Happy"
    if event == "Christmas":
        greetings = "Merry"
    if event == "Kwanzaa":
        greetings = "Joyous"
    if event == "wishes":
        greetings = "Warm"

    return "{greetings} {event}!".format(**locals())

一旦您按照上述方式运行服务器,您就可以通过以下方式使用它:

curl http://localhost:8000/greet/wishes
"Warm wishes!"

拥抱着版本化

# filename: versioning_example.py
"""A simple example of a hug API call with versioning"""
import hug

@hug.get('/echo', versions=1)
def echo(text):
    return text


@hug.get('/echo', versions=range(2, 5))
def echo(text):
    return "Echo: {text}".format(**locals())

要运行示例,请执行以下操作:

hug -f versioning_example.py

然后,您可以从以下位置访问该示例localhost:8000/v1/echo?text=Hi/localhost:8000/v2/echo?text=Hi或从以下地址访问您的API的文档localhost:8000

注意:Hug中的版本控制自动支持版本头和直接基于URL的规范

Hug接口测试

拥抱的http方法装饰器不会修改您的原始函数。这使得测试Hug API与测试任何其他Python函数一样简单。此外,这意味着与其他Python代码中的API函数交互与仅调用Python API函数一样简单。Hug使测试API的完整Python堆栈变得容易,方法是使用hug.test模块:

import hug
import happy_birthday

hug.test.get(happy_birthday, 'happy_birthday', {'name': 'Timothy', 'age': 25}) # Returns a Response object

你可以用这个Response测试断言的对象(签出test_happy_birthday.py):

def tests_happy_birthday():
    response = hug.test.get(happy_birthday, 'happy_birthday', {'name': 'Timothy', 'age': 25})
    assert response.status == HTTP_200
    assert response.data is not None

与其他基于WSGI的服务器运行Hug

拥抱暴露出一种__hug_wsgi__自动在每个API模块上使用魔法方法。在任何标准的WSGI服务器上运行基于Hug的API都应该很简单,只需将其指向module_name__hug_wsgi__

例如:

uwsgi --http 0.0.0.0:8000 --wsgi-file examples/hello_world.py --callable __hug_wsgi__

要运行hello world hug示例API,请执行以下操作

Hug API的构建块

在使用Hug框架构建API时,您将使用以下概念:

方法装饰器getpostupdate等HTTP方法修饰器,在保持Python方法不变的同时将Python函数公开为API

@hug.get() # <- Is the hug METHOD decorator
def hello_world():
    return "Hello"

Hug使用您修饰的函数结构自动为API用户生成文档。Hug始终将请求、响应和API_VERSION变量传递给函数(如果它们是在函数定义中定义的参数

类型批注可选地附加到方法参数的函数,用于指定如何验证参数并将其转换为Python类型

@hug.get()
def math(number_1:int, number_2:int): #The :int after both arguments is the Type Annotation
    return number_1 + number_2

类型批注还会馈送到hug的自动文档生成,让API用户知道要提供什么数据

指令与请求/响应数据一起执行的函数,这些函数基于在API_Function中作为参数被请求。这些仅作为输入参数应用,当前不能作为输出格式或转换应用

@hug.get()
def test_time(hug_timer):
    return {'time_taken': float(hug_timer)}

指令可以通过带有hug_前缀,或使用Python3类型批注。后者是更现代的方法,建议使用。模块中声明的指令可以通过使用它们的完全限定名作为类型注释来访问(例如:module.directive_name)

除了明显的输入转换用例之外,还可以使用指令将数据输送到API函数中,即使它们没有出现在请求查询字符串、POST正文等中。有关如何以这种方式使用指令的示例,请参阅Examples文件夹中的身份验证示例

添加您自己的指令非常简单:

@hug.directive()
def square(value=1, **kwargs):
    '''Returns passed in parameter multiplied by itself'''
    return value * value

@hug.get()
@hug.local()
def tester(value: square=10):
    return value

tester() == 100

为完整起见,下面是通过魔术名称方法访问指令的示例:

@hug.directive()
def multiply(value=1, **kwargs):
    '''Returns passed in parameter multiplied by itself'''
    return value * value

@hug.get()
@hug.local()
def tester(hug_multiply=10):
    return hug_multiply

tester() == 100

输出表单事项获取API函数的输出并对其进行格式化以便传输给API用户的函数

@hug.default_output_format()
def my_output_formatter(data):
    return "STRING:{0}".format(data)

@hug.get(output=hug.output_format.json)
def hello():
    return {'hello': 'world'}

如图所示,您可以轻松地更改整个API和单个API调用的输出格式

输入表事项一个函数,它从API的用户处获取数据体,并对其进行格式化以进行处理

@hug.default_input_format("application/json")
def my_input_formatter(data):
    return ('Results', hug.input_format.json(data))

输入格式化程序基于content_type请求数据,并且仅执行基本解析。更详细的解析应该由您的api_function

中间件Hug API处理的每个请求都会调用的函数

@hug.request_middleware()
def process_data(request, response):
    request.env['SERVER_NAME'] = 'changed'

@hug.response_middleware()
def process_data(request, response, resource):
    response.set_header('MyHeader', 'Value')

您还可以使用以下工具轻松添加任何Falcon样式的中间件:

__hug__.http.add_middleware(MiddlewareObject())

参数映射可用于覆盖推断的参数名称,例如。对于保留关键字:

import marshmallow.fields as fields
...

@hug.get('/foo', map_params={'from': 'from_date'})  # API call uses 'from'
def get_foo_by_date(from_date: fields.DateTime()):
    return find_foo(from_date)

输入格式化程序基于content_type请求数据,并且仅执行基本解析。更详细的解析应该由您的api_function

多文件拆分API

Hug使您能够以任何您认为合适的方式组织大型项目。您可以导入任何包含Hug修饰函数(请求处理、指令、类型处理程序等)的模块,并使用该模块扩展您的基础API

例如:

something.py

import hug

@hug.get('/')
def say_hi():
    return 'hello from something'

可以导入到主API文件中:

__init__.py

import hug
from . import something

@hug.get('/')
def say_hi():
    return "Hi from root"

@hug.extend_api('/something')
def something_api():
    return [something]

或者,对于这样的情况,每个URL路由只包含一个模块:

#alternatively
hug.API(__name__).extend(something, '/something')

配置拥抱404

默认情况下,当用户尝试访问未定义的端点时,Hug返回自动生成的API规范。如果您不想退还此规范,可以关闭404文档:

从命令行应用程序执行以下操作:

hug -nd -f {file} #nd flag tells hug not to generate documentation on 404

此外,您还可以轻松创建自定义404处理程序,方法是使用hug.not_found装饰师:

@hug.not_found()
def not_found_handler():
    return "Not Found"

此修饰器的工作方式与Hug HTTP方法修饰器相同,甚至可以识别版本:

@hug.not_found(versions=1)
def not_found_handler():
    return ""

@hug.not_found(versions=2)
def not_found_handler():
    return "Not Found"

Asyncio支持

在使用getcli协程上的方法修饰器,Hug将调度协程的执行

使用异步协同程序装饰器

@hug.get()
@asyncio.coroutine
def hello_world():
    return "Hello"

使用Python 3.5异步关键字

@hug.get()
async def hello_world():
    return "Hello"

注意:Hug在顶部的Falcon上运行,它不是异步服务器。即使使用Asyncio,仍将同步处理请求

使用Docker

如果您希望在Docker中进行开发并保持系统整洁,您可以这样做,但您需要首先安装Docker Compose

一旦你这样做了,你就需要cd进入到docker目录并运行中指定的Web服务器(Gunicorn./docker/gunicorn/Dockerfile之后,您可以在主机上的浏览器中预览API的输出

$ cd ./docker
# This will run Gunicorn on port 8000 of the Docker container.
$ docker-compose up gunicorn

# From the host machine, find your Dockers IP address.
# For Windows & Mac:
$ docker-machine ip default

# For Linux:
$ ifconfig docker0 | grep 'inet' | cut -d: -f2 | awk '{ print $1}' | head -n1

默认情况下,IP为172.17.0.1。假设这也是您看到的IP,那么您将转到http://172.17.0.1:8000/在您的浏览器中查看您的API

您还可以登录到可以考虑您的工作空间的Docker容器。此工作区安装了Python和Pip,因此您可以在Docker中使用这些工具。例如,如果您需要测试CLI接口,您可以使用以下代码

$ docker-compose run workspace bash

在你的码头上workspace容器中的./docker/templates主计算机上的目录挂载到/src在码头集装箱里。这是在下指定的services>app./docker/docker-compose.yml

bash-4.3# cd /src
bash-4.3# tree
.
├── __init__.py
└── handlers
    ├── birthday.py
    └── hello.py

1 directory, 3 files

安全联系信息

Hug认真对待安全和质量。这就是为什么我们只依赖经过彻底测试的组件并利用静电分析工具(如Banddit和SAFE)来验证我们代码库的安全性的原因。如果您发现或遇到任何潜在的安全问题,请立即通知我们,以便我们解决

若要报告安全漏洞,请使用Tidelift security contactTidelift将协调修复和披露

为什么要拥抱?

拥抱只是希望有用的向导的意思。这代表了该项目的目标,即帮助指导开发人员创建编写良好且直观的API


谢谢,我希望你能找到拥抱对您开发下一个Python API很有帮助!

~蒂莫西·克罗斯利

用Python发送100,000个HTTP请求的最快方法是什么?

问题:用Python发送100,000个HTTP请求的最快方法是什么?

我正在打开一个具有100,000个URL的文件。我需要向每个URL发送一个HTTP请求并打印状态代码。我正在使用Python 2.6,到目前为止,我们研究了Python实现线程/并发性的许多令人困惑的方式。我什至看过python 并发库,但无法弄清楚如何正确编写此程序。有没有人遇到过类似的问题?我想通常我需要知道如何尽快地在Python中执行数千个任务-我想这意味着“同时”。

I am opening a file which has 100,000 URL’s. I need to send an HTTP request to each URL and print the status code. I am using Python 2.6, and so far looked at the many confusing ways Python implements threading/concurrency. I have even looked at the python concurrence library, but cannot figure out how to write this program correctly. Has anyone come across a similar problem? I guess generally I need to know how to perform thousands of tasks in Python as fast as possible – I suppose that means ‘concurrently’.


回答 0

无捻解决方案:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

这比扭曲的解决方案要快一点,并且使用的CPU更少。

Twistedless solution:

from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue

concurrent = 200

def doWork():
    while True:
        url = q.get()
        status, url = getStatus(url)
        doSomethingWithResult(status, url)
        q.task_done()

def getStatus(ourl):
    try:
        url = urlparse(ourl)
        conn = httplib.HTTPConnection(url.netloc)   
        conn.request("HEAD", url.path)
        res = conn.getresponse()
        return res.status, ourl
    except:
        return "error", ourl

def doSomethingWithResult(status, url):
    print status, url

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in open('urllist.txt'):
        q.put(url.strip())
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

This one is slighty faster than the twisted solution and uses less CPU.


回答 1

使用龙卷风异步联网库的解决方案

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

A solution using tornado asynchronous networking library

from tornado import ioloop, httpclient

i = 0

def handle_request(response):
    print(response.code)
    global i
    i -= 1
    if i == 0:
        ioloop.IOLoop.instance().stop()

http_client = httpclient.AsyncHTTPClient()
for url in open('urls.txt'):
    i += 1
    http_client.fetch(url.strip(), handle_request, method='HEAD')
ioloop.IOLoop.instance().start()

回答 2

自2010年发布以来,情况发生了很大变化,我还没有尝试所有其他答案,但是尝试了一些答案,我发现使用python3.6对我来说最有效。

我能够每秒获取约150个在AWS上运行的唯一域。

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

Things have changed quite a bit since 2010 when this was posted and I haven’t tried all the other answers but I have tried a few, and I found this to work the best for me using python3.6.

I was able to fetch about ~150 unique domains per second running on AWS.

import pandas as pd
import concurrent.futures
import requests
import time

out = []
CONNECTIONS = 100
TIMEOUT = 5

tlds = open('../data/sample_1k.txt').read().splitlines()
urls = ['http://{}'.format(x) for x in tlds[1:]]

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
    time1 = time.time()
    for future in concurrent.futures.as_completed(future_to_url):
        try:
            data = future.result()
        except Exception as exc:
            data = str(type(exc))
        finally:
            out.append(data)

            print(str(len(out)),end="\r")

    time2 = time.time()

print(f'Took {time2-time1:.2f} s')
print(pd.Series(out).value_counts())

回答 3

线程绝对不是这里的答案。如果总体目标是“最快的方法”,它们将提供进程和内核瓶颈,以及吞吐量限制。

一点点的twisted及其异步HTTP客户端将为您带来更好的结果。

Threads are absolutely not the answer here. They will provide both process and kernel bottlenecks, as well as throughput limits that are not acceptable if the overall goal is “the fastest way”.

A little bit of twisted and its asynchronous HTTP client would give you much better results.


回答 4

我知道这是一个古老的问题,但是在Python 3.7中,您可以使用asyncio和来实现aiohttp

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

您可以阅读有关它的更多信息,并在此处查看示例。

I know this is an old question, but in Python 3.7 you can do this using asyncio and aiohttp.

import asyncio
import aiohttp
from aiohttp import ClientSession, ClientConnectorError

async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
    try:
        resp = await session.request(method="GET", url=url, **kwargs)
    except ClientConnectorError:
        return (url, 404)
    return (url, resp.status)

async def make_requests(urls: set, **kwargs) -> None:
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                fetch_html(url=url, session=session, **kwargs)
            )
        results = await asyncio.gather(*tasks)

    for result in results:
        print(f'{result[1]} - {str(result[0])}')

if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    asyncio.run(make_requests(urls=urls))

You can read more about it and see an example here.


回答 5

使用grequests,它是request + Gevent模块的组合。

GRequests允许您将Requests与Gevent一起使用,以轻松进行异步HTTP请求。

用法很简单:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送它们:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

Use grequests , it’s a combination of requests + Gevent module .

GRequests allows you to use Requests with Gevent to make asyncronous HTTP Requests easily.

Usage is simple:

import grequests

urls = [
   'http://www.heroku.com',
   'http://tablib.org',
   'http://httpbin.org',
   'http://python-requests.org',
   'http://kennethreitz.com'
]

Create a set of unsent Requests:

>>> rs = (grequests.get(u) for u in urls)

Send them all at the same time:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

回答 6

解决此问题的一种好方法是首先编写获得一个结果所需的代码,然后合并线程代码以并行化应用程序。

在理想情况下,这仅意味着同时启动100,000个线程,这些线程会将其结果输出到字典或列表中以供以后处理,但是实际上,您可以通过这种方式发出多少个并行HTTP请求受到限制。在本地,您可以同时打开多少个套接字,Python解释器允许多少个执行线程受到限制。从远程来看,如果所有请求都针对一台或多台服务器,则同时连接的数量可能会受到限制。这些限制可能导致您必须以一种方式来编写脚本,以便在任何时候仅轮询一小部分网址(如另一位发帖人所述,100可能是一个不错的线程池大小,尽管您可能会发现自己可以成功部署更多)。

您可以按照以下设计模式来解决上述问题:

  1. 启动一个线程以启动新的请求线程,直到当前正在运行的线程(您可以通过threading.active_count()或通过将线程对象推入数据结构来跟踪它们)的数量大于等于您的同时请求的最大数量(例如100) ,然后短暂睡眠。没有更多URL可处理时,该线程应终止。因此,线程将不断唤醒,启动新线程并进入休眠状态,直到完成操作为止。
  2. 让请求线程将其结果存储在某种数据结构中,以供以后检索和输出。如果您要在其中存储结果的结构是C listdictCPython,则可以安全地从线程中添加或插入不带锁的唯一项,但是如果您写入文件或需要更复杂的跨线程数据交互,则应使用a互斥锁,以保护该状态免于腐败

我建议您使用线程模块。您可以使用它来启动和跟踪正在运行的线程。Python的线程支持是裸露的,但是对问题的描述表明它完全可以满足您的需求。

最后,如果你想看到用Python编写的并行网络应用的一个非常简单的应用程序,请ssh.py。这是一个小型库,使用Python线程并行化许多SSH连接。该设计足够接近您的要求,您可能会发现它是很好的资源。

A good approach to solving this problem is to first write the code required to get one result, then incorporate threading code to parallelize the application.

In a perfect world this would simply mean simultaneously starting 100,000 threads which output their results into a dictionary or list for later processing, but in practice you are limited in how many parallel HTTP requests you can issue in this fashion. Locally, you have limits in how many sockets you can open concurrently, how many threads of execution your Python interpreter will allow. Remotely, you may be limited in the number of simultaneous connections if all the requests are against one server, or many. These limitations will probably necessitate that you write the script in such a way as to only poll a small fraction of the URLs at any one time (100, as another poster mentioned, is probably a decent thread pool size, although you may find that you can successfully deploy many more).

You can follow this design pattern to resolve the above issue:

  1. Start a thread which launches new request threads until the number of currently running threads (you can track them via threading.active_count() or by pushing the thread objects into a data structure) is >= your maximum number of simultaneous requests (say 100), then sleeps for a short timeout. This thread should terminate when there is are no more URLs to process. Thus, the thread will keep waking up, launching new threads, and sleeping until your are finished.
  2. Have the request threads store their results in some data structure for later retrieval and output. If the structure you are storing the results in is a list or dict in CPython, you can safely append or insert unique items from your threads without locks, but if you write to a file or require in more complex cross-thread data interaction you should use a mutual exclusion lock to protect this state from corruption.

I would suggest you use the threading module. You can use it to launch and track running threads. Python’s threading support is bare, but the description of your problem suggests that it is completely sufficient for your needs.

Finally, if you’d like to see a pretty straightforward application of a parallel network application written in Python, check out ssh.py. It’s a small library which uses Python threading to parallelize many SSH connections. The design is close enough to your requirements that you may find it to be a good resource.


回答 7

如果希望获得最佳性能,则可能要考虑使用异步I / O而不是线程。与成千上万个OS线程相关的开销是不平凡的,并且Python解释器中的上下文切换在此之上增加了更多。线程化肯定会完成工作,但是我怀疑异步路由会提供更好的整体性能。

具体来说,我建议在Twisted库(http://www.twistedmatrix.com)中使用异步Web客户端。它具有公认的陡峭的学习曲线,但是一旦您掌握了Twisted的异步编程风格,就很容易使用。

Twisted的异步Web客户端API的HowTo可在以下位置找到:

http://twistedmatrix.com/documents/current/web/howto/client.html

If you’re looking to get the best performance possible, you might want to consider using Asynchronous I/O rather than threads. The overhead associated with thousands of OS threads is non-trivial and the context switching within the Python interpreter adds even more on top of it. Threading will certainly get the job done but I suspect that an asynchronous route will provide better overall performance.

Specifically, I’d suggest the async web client in the Twisted library (http://www.twistedmatrix.com). It has an admittedly steep learning curve but it quite easy to use once you get a good handle on Twisted’s style of asynchronous programming.

A HowTo on Twisted’s asynchronous web client API is available at:

http://twistedmatrix.com/documents/current/web/howto/client.html


回答 8

一个解法:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

测试时间:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

平时:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

A solution:

from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools


concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)

def getStatus(ourl):
    url = urlparse(ourl)
    conn = httplib.HTTPConnection(url.netloc)   
    conn.request("HEAD", url.path)
    res = conn.getresponse()
    return res.status

def processResponse(response,url):
    print response, url
    processedOne()

def processError(error,url):
    print "error", url#, error
    processedOne()

def processedOne():
    if finished.next()==added:
        reactor.stop()

def addTask(url):
    req = threads.deferToThread(getStatus, url)
    req.addCallback(processResponse, url)
    req.addErrback(processError, url)   

added=0
for url in open('urllist.txt'):
    added+=1
    addTask(url.strip())

try:
    reactor.run()
except KeyboardInterrupt:
    reactor.stop()

Testtime:

[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null 

real    1m10.682s
user    0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms

回答 9

使用线程池是一个不错的选择,这将使此操作相当容易。不幸的是,python没有使线程池变得异常简单的标准库。但是,这里有一个不错的库,它可以帮助您入门:http : //www.chrisarndt.de/projects/threadpool/

来自其站点的代码示例:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

希望这可以帮助。

Using a thread pool is a good option, and will make this fairly easy. Unfortunately, python doesn’t have a standard library that makes thread pools ultra easy. But here is a decent library that should get you started: http://www.chrisarndt.de/projects/threadpool/

Code example from their site:

pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()

Hope this helps.


回答 10

创建epoll对象,
开放许多客户端TCP套接字,
调整自己的发送缓冲区是有点超过请求头,
发送一个请求头-它应该是即时的,只是放置到一个缓冲区,在注册插座epoll对象,
.pollepollobect,
阅读前3来自每个套接字的字节.poll
将其写入sys.stdout后跟\n(不要刷新),关闭客户端套接字。

限制同时打开的套接字数-创建套接字时处理错误。仅当另一个插座关闭时,才创建一个新的插座。
调整操作系统限制。
尝试分叉到几个(不是很多)进程:这可能有助于更有效地使用CPU。

Create epoll object,
open many client TCP sockets,
adjust their send buffers to be a bit more than request header,
send a request header — it should be immediate, just placing into a buffer, register socket in epoll object,
do .poll on epoll obect,
read first 3 bytes from each socket from .poll,
write them to sys.stdout followed by \n (don’t flush), close the client socket.

Limit number of sockets opened simultaneously — handle errors when sockets are created. Create a new socket only if another is closed.
Adjust OS limits.
Try forking into a few (not many) processes: this may help to use CPU a bit more effectively.


回答 11

对于您的情况,线程可能会成功,因为您可能会花费大量时间等待响应。标准库中有一些有用的模块(例如Queue)可能会有所帮助。

我之前并行下载文件时也做了类似的事情,这对我来说已经足够了,但是这并不是您正在谈论的范围。

如果您的任务更多地受CPU限制,则可能需要查看多处理模块,该模块将允许您利用更多的CPU /核心/线程(更多的进程不会互相阻塞,因为锁定是按进程进行的)

For your case, threading will probably do the trick as you’ll probably be spending most time waiting for a response. There are helpful modules like Queue in the standard library that might help.

I did a similar thing with parallel downloading of files before and it was good enough for me, but it wasn’t on the scale you are talking about.

If your task was more CPU-bound, you might want to look at the multiprocessing module, which will allow you to utilize more CPUs/cores/threads (more processes that won’t block each other since the locking is per process)


回答 12

考虑使用Windmill,尽管Windmill可能无法执行那么多线程。

您可以在5台计算机上使用手动滚动的Python脚本来完成此操作,每台计算机都使用端口40000-60000连接出站,从而打开100,000个端口连接。

同样,使用线程良好的QA应用程序(例如OpenSTA) 进行示例测试可能会有所帮助,以便了解每个服务器可以处理的数量。

另外,尝试研究仅将简单的Perl与LWP :: ConnCache类一起使用。这样,您可能会获得更多的性能(更多的连接)。

Consider using Windmill , although Windmill probably cant do that many threads.

You could do it with a hand rolled Python script on 5 machines, each one connecting outbound using ports 40000-60000, opening 100,000 port connections.

Also, it might help to do a sample test with a nicely threaded QA app such as OpenSTA in order to get an idea of how much each server can handle.

Also, try looking into just using simple Perl with the LWP::ConnCache class. You’ll probably get more performance (more connections) that way.


回答 13

这个扭曲的异步Web客户端运行得很快。

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

This twisted async web client goes pretty fast.

#!/usr/bin/python2.7

from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput

pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}

def getLock(url, simultaneous = 1):
    return locks[urlparse(url).netloc, randrange(simultaneous)]

@inlineCallbacks
def getMapping(url):
    # Limit ourselves to 4 simultaneous connections per host
    # Tweak this number, but it should be no larger than pool.maxPersistentPerHost 
    lock = getLock(url,4)
    yield lock.acquire()
    try:
        resp = yield agent.request('HEAD', url)
        codes[url] = resp.code
    except Exception as e:
        codes[url] = str(e)
    finally:
        lock.release()


dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())

reactor.run()
pprint(codes)

回答 14

最简单的方法是使用Python的内置线程库。它们不是“真实的” /内核线程,它们有问题(例如序列化),但是足够好。您需要一个队列和线程池。一种选择在这里,但是编写自己的东西很简单。您无法并行处理所有100,000个呼叫,但是可以同时触发100个(或大约)呼叫。

The easiest way would be to use Python’s built-in threading library. They’re not “real” / kernel threads They have issues (like serialization), but are good enough. You’d want a queue & thread pool. One option is here, but it’s trivial to write your own. You can’t parallelize all 100,000 calls, but you can fire off 100 (or so) of them at the same time.


在Python中最快的HTTP GET方法是什么?

问题:在Python中最快的HTTP GET方法是什么?

如果我知道内容将是字符串,那么用Python进行HTTP GET的最快方法是什么?我正在搜索文档,以查找像以下这样的快速单行代码:

contents = url.get("http://example.com/foo/bar")

但是,所有我能找到使用谷歌是httpliburllib-我无法找到这些库中的快捷方式。

标准Python 2.5是否具有上述某种形式的快捷方式,还是应该编写一个函数url_get

  1. 我宁愿不捕获对wget或的炮击输出curl

What is the quickest way to HTTP GET in Python if I know the content will be a string? I am searching the documentation for a quick one-liner like:

contents = url.get("http://example.com/foo/bar")

But all I can find using Google are httplib and urllib – and I am unable to find a shortcut in those libraries.

Does standard Python 2.5 have a shortcut in some form as above, or should I write a function url_get?

  1. I would prefer not to capture the output of shelling out to wget or curl.

回答 0

Python 3:

import urllib.request
contents = urllib.request.urlopen("http://example.com/foo/bar").read()

Python 2:

import urllib2
contents = urllib2.urlopen("http://example.com/foo/bar").read()

urllib.request和的文档read

Python 3:

import urllib.request
contents = urllib.request.urlopen("http://example.com/foo/bar").read()

Python 2:

import urllib2
contents = urllib2.urlopen("http://example.com/foo/bar").read()

Documentation for urllib.request and read.


回答 1

您可以使用一个称为request的库。

import requests
r = requests.get("http://example.com/foo/bar")

这很容易。然后您可以这样做:

>>> print(r.status_code)
>>> print(r.headers)
>>> print(r.content)

You could use a library called requests.

import requests
r = requests.get("http://example.com/foo/bar")

This is quite easy. Then you can do like this:

>>> print(r.status_code)
>>> print(r.headers)
>>> print(r.content)

回答 2

如果您希望使用httplib2的解决方案成为一体,请考虑实例化匿名Http对象。

import httplib2
resp, content = httplib2.Http().request("http://example.com/foo/bar")

If you want solution with httplib2 to be oneliner consider instantiating anonymous Http object

import httplib2
resp, content = httplib2.Http().request("http://example.com/foo/bar")

回答 3

看一下httplib2,它提供了很多您想要的东西,它旁边有许多非常有用的功能。

import httplib2

resp, content = httplib2.Http().request("http://example.com/foo/bar")

其中content是响应主体(作为字符串),而resp将包含状态和响应标头。

虽然它不包含在标准python安装中(但只需要标准python),但是绝对值得一试。

Have a look at httplib2, which – next to a lot of very useful features – provides exactly what you want.

import httplib2

resp, content = httplib2.Http().request("http://example.com/foo/bar")

Where content would be the response body (as a string), and resp would contain the status and response headers.

It doesn’t come included with a standard python install though (but it only requires standard python), but it’s definitely worth checking out.


回答 4

强大的urllib3库就足够简单了。

像这样导入它:

import urllib3

http = urllib3.PoolManager()

并发出这样的请求:

response = http.request('GET', 'https://example.com')

print(response.data) # Raw data.
print(response.data.decode('utf-8')) # Text.
print(response.status) # Status code.
print(response.headers['Content-Type']) # Content type.

您也可以添加标题:

response = http.request('GET', 'https://example.com', headers={
    'key1': 'value1',
    'key2': 'value2'
})

可以在urllib3文档中找到更多信息。

urllib3比内置模块urllib.requesthttp模块更安全,更易于使用,并且稳定。

It’s simple enough with the powerful urllib3 library.

Import it like this:

import urllib3

http = urllib3.PoolManager()

And make a request like this:

response = http.request('GET', 'https://example.com')

print(response.data) # Raw data.
print(response.data.decode('utf-8')) # Text.
print(response.status) # Status code.
print(response.headers['Content-Type']) # Content type.

You can add headers too:

response = http.request('GET', 'https://example.com', headers={
    'key1': 'value1',
    'key2': 'value2'
})

More info can be found on the urllib3 documentation.

urllib3 is much safer and easier to use than the builtin urllib.request or http modules and is stable.


回答 5

theller的wget解决方案确实很有用,但是,我发现它无法在整个下载过程中打印出进度。如果在reporthook中的print语句后添加一行,那是完美的。

import sys, urllib

def reporthook(a, b, c):
    print "% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c),
    sys.stdout.flush()
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print url, "->", file
    urllib.urlretrieve(url, file, reporthook)
print

theller’s solution for wget is really useful, however, i found it does not print out the progress throughout the downloading process. It’s perfect if you add one line after the print statement in reporthook.

import sys, urllib

def reporthook(a, b, c):
    print "% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c),
    sys.stdout.flush()
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print url, "->", file
    urllib.urlretrieve(url, file, reporthook)
print

回答 6

这是Python中的wget脚本:

# From python cookbook, 2nd edition, page 487
import sys, urllib

def reporthook(a, b, c):
    print "% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c),
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print url, "->", file
    urllib.urlretrieve(url, file, reporthook)
print

Here is a wget script in Python:

# From python cookbook, 2nd edition, page 487
import sys, urllib

def reporthook(a, b, c):
    print "% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c),
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print url, "->", file
    urllib.urlretrieve(url, file, reporthook)
print

回答 7

无需其他必要的导入,此解决方案(对我而言)有效-也适用于https:

try:
    import urllib2 as urlreq # Python 2.x
except:
    import urllib.request as urlreq # Python 3.x
req = urlreq.Request("http://example.com/foo/bar")
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36')
urlreq.urlopen(req).read()

在标头信息中未指定“ User-Agent”时,通常很难抓住内容。然后通常会使用类似的取消请求:urllib2.HTTPError: HTTP Error 403: Forbiddenurllib.error.HTTPError: HTTP Error 403: Forbidden

Without further necessary imports this solution works (for me) – also with https:

try:
    import urllib2 as urlreq # Python 2.x
except:
    import urllib.request as urlreq # Python 3.x
req = urlreq.Request("http://example.com/foo/bar")
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36')
urlreq.urlopen(req).read()

I often have difficulty grabbing the content when not specifying a “User-Agent” in the header information. Then usually the requests are cancelled with something like: urllib2.HTTPError: HTTP Error 403: Forbidden or urllib.error.HTTPError: HTTP Error 403: Forbidden.


回答 8

如何发送标头

Python 3:

import urllib.request
contents = urllib.request.urlopen(urllib.request.Request(
    "https://api.github.com/repos/cirosantilli/linux-kernel-module-cheat/releases/latest",
    headers={"Accept" : 'application/vnd.github.full+json"text/html'}
)).read()
print(contents)

Python 2:

import urllib2
contents = urllib2.urlopen(urllib2.Request(
    "https://api.github.com",
    headers={"Accept" : 'application/vnd.github.full+json"text/html'}
)).read()
print(contents)

How to also send headers

Python 3:

import urllib.request
contents = urllib.request.urlopen(urllib.request.Request(
    "https://api.github.com/repos/cirosantilli/linux-kernel-module-cheat/releases/latest",
    headers={"Accept" : 'application/vnd.github.full+json"text/html'}
)).read()
print(contents)

Python 2:

import urllib2
contents = urllib2.urlopen(urllib2.Request(
    "https://api.github.com",
    headers={"Accept" : 'application/vnd.github.full+json"text/html'}
)).read()
print(contents)

回答 9

如果您专门使用HTTP API,那么还有更方便的选择,例如Nap

例如,以下是自20145月1日起从Github获取要点的方法:

from nap.url import Url
api = Url('https://api.github.com')

gists = api.join('gists')
response = gists.get(params={'since': '2014-05-01T00:00:00Z'})
print(response.json())

更多示例:https : //github.com/kimmobrunfeldt/nap#examples

If you are working with HTTP APIs specifically, there are also more convenient choices such as Nap.

For example, here’s how to get gists from Github since May 1st 2014:

from nap.url import Url
api = Url('https://api.github.com')

gists = api.join('gists')
response = gists.get(params={'since': '2014-05-01T00:00:00Z'})
print(response.json())

More examples: https://github.com/kimmobrunfeldt/nap#examples


回答 10

出色的解决方案轩,塞勒。

为了使其与python 3配合使用,请进行以下更改

import sys, urllib.request

def reporthook(a, b, c):
    print ("% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c))
    sys.stdout.flush()
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print (url, "->", file)
    urllib.request.urlretrieve(url, file, reporthook)
print

另外,您输入的URL之前应带有“ http://”,否则将返回未知的URL类型错误。

Excellent solutions Xuan, Theller.

For it to work with python 3 make the following changes

import sys, urllib.request

def reporthook(a, b, c):
    print ("% 3.1f%% of %d bytes\r" % (min(100, float(a * b) / c * 100), c))
    sys.stdout.flush()
for url in sys.argv[1:]:
    i = url.rfind("/")
    file = url[i+1:]
    print (url, "->", file)
    urllib.request.urlretrieve(url, file, reporthook)
print

Also, the URL you enter should be preceded by a “http://”, otherwise it returns a unknown url type error.


回答 11

对于python >= 3.6,您可以使用dload

import dload
t = dload.text(url)

对于json

j = dload.json(url)

安装:
pip install dload

For python >= 3.6, you can use dload:

import dload
t = dload.text(url)

For json:

j = dload.json(url)

Install:
pip install dload


回答 12

实际上,在python中,我们可以从文件中读取url,这是从API读取json的示例。

import json

from urllib.request import urlopen

with urlopen(url) as f:

resp = json.load(f)

return resp['some_key']

Actually in python we can read from urls like from files, here is an example for reading json from API.

import json

from urllib.request import urlopen

with urlopen(url) as f:

resp = json.load(f)

return resp['some_key']

回答 13

如果您需要较低级别的API:

import http.client

conn = http.client.HTTPSConnection('example.com')
conn.request('GET', '/')

resp = conn.getresponse()
content = resp.read()

conn.close()

text = content.decode('utf-8')

print(text)

If you want a lower level API:

import http.client

conn = http.client.HTTPSConnection('example.com')
conn.request('GET', '/')

resp = conn.getresponse()
content = resp.read()

conn.close()

text = content.decode('utf-8')

print(text)

如何使用Python通过HTTP下载文件?

问题:如何使用Python通过HTTP下载文件?

我有一个小的实用程序,可以用来按计划从网站上下载MP3文件,然后构建/更新已添加到iTunes的播客XML文件。

创建/更新XML文件的文本处理是用Python编写的。但是,我在Windows内使用wget.bat文件中下载实际的MP3文件。我希望使用Python编写整个实用程序。

我努力寻找一种方法来实际使用Python下载文件,因此为什么我诉诸于使用 wget

那么,如何使用Python下载文件?

I have a small utility that I use to download an MP3 file from a website on a schedule and then builds/updates a podcast XML file which I’ve added to iTunes.

The text processing that creates/updates the XML file is written in Python. However, I use wget inside a Windows .bat file to download the actual MP3 file. I would prefer to have the entire utility written in Python.

I struggled to find a way to actually download the file in Python, thus why I resorted to using wget.

So, how do I download the file using Python?


回答 0

在Python 2中,请使用标准库随附的urllib2。

import urllib2
response = urllib2.urlopen('http://www.example.com/')
html = response.read()

减去任何错误处理,这是使用该库的最基本方法。您还可以执行更复杂的操作,例如更改标题。该文档可在此处找到

In Python 2, use urllib2 which comes with the standard library.

import urllib2
response = urllib2.urlopen('http://www.example.com/')
html = response.read()

This is the most basic way to use the library, minus any error handling. You can also do more complex stuff such as changing headers. The documentation can be found here.


回答 1

使用urlretrieve

import urllib
urllib.urlretrieve ("http://www.example.com/songs/mp3.mp3", "mp3.mp3")

(对于Python 3+使用import urllib.requesturllib.request.urlretrieve

还有一个,带有“进度条”

import urllib2

url = "http://download.thinkbroadband.com/10MB.zip"

file_name = url.split('/')[-1]
u = urllib2.urlopen(url)
f = open(file_name, 'wb')
meta = u.info()
file_size = int(meta.getheaders("Content-Length")[0])
print "Downloading: %s Bytes: %s" % (file_name, file_size)

file_size_dl = 0
block_sz = 8192
while True:
    buffer = u.read(block_sz)
    if not buffer:
        break

    file_size_dl += len(buffer)
    f.write(buffer)
    status = r"%10d  [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
    status = status + chr(8)*(len(status)+1)
    print status,

f.close()

One more, using urlretrieve:

import urllib
urllib.urlretrieve ("http://www.example.com/songs/mp3.mp3", "mp3.mp3")

(for Python 3+ use import urllib.request and urllib.request.urlretrieve)

Yet another one, with a “progressbar”

import urllib2

url = "http://download.thinkbroadband.com/10MB.zip"

file_name = url.split('/')[-1]
u = urllib2.urlopen(url)
f = open(file_name, 'wb')
meta = u.info()
file_size = int(meta.getheaders("Content-Length")[0])
print "Downloading: %s Bytes: %s" % (file_name, file_size)

file_size_dl = 0
block_sz = 8192
while True:
    buffer = u.read(block_sz)
    if not buffer:
        break

    file_size_dl += len(buffer)
    f.write(buffer)
    status = r"%10d  [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
    status = status + chr(8)*(len(status)+1)
    print status,

f.close()

回答 2

在2012年,使用python请求库

>>> import requests
>>> 
>>> url = "http://download.thinkbroadband.com/10MB.zip"
>>> r = requests.get(url)
>>> print len(r.content)
10485760

你可以跑 pip install requests以获取它。

与API相比,请求具有许多优势,因为API更加简单。如果必须执行身份验证,则尤其如此。在这种情况下,urllib和urllib2非常不直观且令人痛苦。


2015-12-30

人们对进度条表示钦佩。肯定很酷。现在有几种现成的解决方案,包括tqdm

from tqdm import tqdm
import requests

url = "http://download.thinkbroadband.com/10MB.zip"
response = requests.get(url, stream=True)

with open("10MB", "wb") as handle:
    for data in tqdm(response.iter_content()):
        handle.write(data)

这本质上是30个月前描述的实现@kvance。

In 2012, use the python requests library

>>> import requests
>>> 
>>> url = "http://download.thinkbroadband.com/10MB.zip"
>>> r = requests.get(url)
>>> print len(r.content)
10485760

You can run pip install requests to get it.

Requests has many advantages over the alternatives because the API is much simpler. This is especially true if you have to do authentication. urllib and urllib2 are pretty unintuitive and painful in this case.


2015-12-30

People have expressed admiration for the progress bar. It’s cool, sure. There are several off-the-shelf solutions now, including tqdm:

from tqdm import tqdm
import requests

url = "http://download.thinkbroadband.com/10MB.zip"
response = requests.get(url, stream=True)

with open("10MB", "wb") as handle:
    for data in tqdm(response.iter_content()):
        handle.write(data)

This is essentially the implementation @kvance described 30 months ago.


回答 3

import urllib2
mp3file = urllib2.urlopen("http://www.example.com/songs/mp3.mp3")
with open('test.mp3','wb') as output:
  output.write(mp3file.read())

wbopen('test.mp3','wb')打开一个文件(并清除所有现有文件)以二进制模式,所以你可以用它来代替刚才保存的文本数据。

import urllib2
mp3file = urllib2.urlopen("http://www.example.com/songs/mp3.mp3")
with open('test.mp3','wb') as output:
  output.write(mp3file.read())

The wb in open('test.mp3','wb') opens a file (and erases any existing file) in binary mode so you can save data with it instead of just text.


回答 4

Python 3

  • urllib.request.urlopen

    import urllib.request
    response = urllib.request.urlopen('http://www.example.com/')
    html = response.read()
  • urllib.request.urlretrieve

    import urllib.request
    urllib.request.urlretrieve('http://www.example.com/songs/mp3.mp3', 'mp3.mp3')

    注意:根据文档,urllib.request.urlretrieve是“旧版界面”和“以后可能会弃用”(感谢gerrit

Python 2

  • urllib2.urlopen(感谢科里

    import urllib2
    response = urllib2.urlopen('http://www.example.com/')
    html = response.read()
  • urllib.urlretrieve(感谢PabloG

    import urllib
    urllib.urlretrieve('http://www.example.com/songs/mp3.mp3', 'mp3.mp3')

Python 3

  • urllib.request.urlopen

    import urllib.request
    response = urllib.request.urlopen('http://www.example.com/')
    html = response.read()
    
  • urllib.request.urlretrieve

    import urllib.request
    urllib.request.urlretrieve('http://www.example.com/songs/mp3.mp3', 'mp3.mp3')
    

    Note: According to the documentation, urllib.request.urlretrieve is a “legacy interface” and “might become deprecated in the future” (thanks gerrit)

Python 2

  • urllib2.urlopen (thanks Corey)

    import urllib2
    response = urllib2.urlopen('http://www.example.com/')
    html = response.read()
    
  • urllib.urlretrieve (thanks PabloG)

    import urllib
    urllib.urlretrieve('http://www.example.com/songs/mp3.mp3', 'mp3.mp3')
    

回答 5

使用wget模块:

import wget
wget.download('url')

use wget module:

import wget
wget.download('url')

回答 6

用于Python 2/3的PabloG代码的改进版本:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import ( division, absolute_import, print_function, unicode_literals )

import sys, os, tempfile, logging

if sys.version_info >= (3,):
    import urllib.request as urllib2
    import urllib.parse as urlparse
else:
    import urllib2
    import urlparse

def download_file(url, dest=None):
    """ 
    Download and save a file specified by url to dest directory,
    """
    u = urllib2.urlopen(url)

    scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
    filename = os.path.basename(path)
    if not filename:
        filename = 'downloaded.file'
    if dest:
        filename = os.path.join(dest, filename)

    with open(filename, 'wb') as f:
        meta = u.info()
        meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
        meta_length = meta_func("Content-Length")
        file_size = None
        if meta_length:
            file_size = int(meta_length[0])
        print("Downloading: {0} Bytes: {1}".format(url, file_size))

        file_size_dl = 0
        block_sz = 8192
        while True:
            buffer = u.read(block_sz)
            if not buffer:
                break

            file_size_dl += len(buffer)
            f.write(buffer)

            status = "{0:16}".format(file_size_dl)
            if file_size:
                status += "   [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
            status += chr(13)
            print(status, end="")
        print()

    return filename

if __name__ == "__main__":  # Only run if this file is called directly
    print("Testing with 10MB download")
    url = "http://download.thinkbroadband.com/10MB.zip"
    filename = download_file(url)
    print(filename)

An improved version of the PabloG code for Python 2/3:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import ( division, absolute_import, print_function, unicode_literals )

import sys, os, tempfile, logging

if sys.version_info >= (3,):
    import urllib.request as urllib2
    import urllib.parse as urlparse
else:
    import urllib2
    import urlparse

def download_file(url, dest=None):
    """ 
    Download and save a file specified by url to dest directory,
    """
    u = urllib2.urlopen(url)

    scheme, netloc, path, query, fragment = urlparse.urlsplit(url)
    filename = os.path.basename(path)
    if not filename:
        filename = 'downloaded.file'
    if dest:
        filename = os.path.join(dest, filename)

    with open(filename, 'wb') as f:
        meta = u.info()
        meta_func = meta.getheaders if hasattr(meta, 'getheaders') else meta.get_all
        meta_length = meta_func("Content-Length")
        file_size = None
        if meta_length:
            file_size = int(meta_length[0])
        print("Downloading: {0} Bytes: {1}".format(url, file_size))

        file_size_dl = 0
        block_sz = 8192
        while True:
            buffer = u.read(block_sz)
            if not buffer:
                break

            file_size_dl += len(buffer)
            f.write(buffer)

            status = "{0:16}".format(file_size_dl)
            if file_size:
                status += "   [{0:6.2f}%]".format(file_size_dl * 100 / file_size)
            status += chr(13)
            print(status, end="")
        print()

    return filename

if __name__ == "__main__":  # Only run if this file is called directly
    print("Testing with 10MB download")
    url = "http://download.thinkbroadband.com/10MB.zip"
    filename = download_file(url)
    print(filename)

回答 7

Python 2 & Python 3附带了简单但兼容的方法six

from six.moves import urllib
urllib.request.urlretrieve("http://www.example.com/songs/mp3.mp3", "mp3.mp3")

Simple yet Python 2 & Python 3 compatible way comes with six library:

from six.moves import urllib
urllib.request.urlretrieve("http://www.example.com/songs/mp3.mp3", "mp3.mp3")

回答 8

import os,requests
def download(url):
    get_response = requests.get(url,stream=True)
    file_name  = url.split("/")[-1]
    with open(file_name, 'wb') as f:
        for chunk in get_response.iter_content(chunk_size=1024):
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)


download("https://example.com/example.jpg")
import os,requests
def download(url):
    get_response = requests.get(url,stream=True)
    file_name  = url.split("/")[-1]
    with open(file_name, 'wb') as f:
        for chunk in get_response.iter_content(chunk_size=1024):
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)


download("https://example.com/example.jpg")

回答 9

为此,在纯Python中编写了wget库。从2.0版开始urlretrieve,它就具备了这些功能

Wrote wget library in pure Python just for this purpose. It is pumped up urlretrieve with these features as of version 2.0.


回答 10

以下是在python中下载文件的最常用调用:

  1. urllib.urlretrieve ('url_to_file', file_name)

  2. urllib2.urlopen('url_to_file')

  3. requests.get(url)

  4. wget.download('url', file_name)

注意:urlopenurlretrieve发现在下载大文件(大小> 500 MB)时表现相对较差。requests.get将文件存储在内存中,直到下载完成。

Following are the most commonly used calls for downloading files in python:

  1. urllib.urlretrieve ('url_to_file', file_name)

  2. urllib2.urlopen('url_to_file')

  3. requests.get(url)

  4. wget.download('url', file_name)

Note: urlopen and urlretrieve are found to perform relatively bad with downloading large files (size > 500 MB). requests.get stores the file in-memory until download is complete.


回答 11

我同意Corey的观点,urllib2比urllib更完整,如果您想做更复杂的事情,它应该是使用的模块,但是要使答案更完整,如果只需要基础知识,则urllib是一个更简单的模块:

import urllib
response = urllib.urlopen('http://www.example.com/sound.mp3')
mp3 = response.read()

将正常工作。或者,如果您不想处理“响应”对象,则可以直接调用read()

import urllib
mp3 = urllib.urlopen('http://www.example.com/sound.mp3').read()

I agree with Corey, urllib2 is more complete than urllib and should likely be the module used if you want to do more complex things, but to make the answers more complete, urllib is a simpler module if you want just the basics:

import urllib
response = urllib.urlopen('http://www.example.com/sound.mp3')
mp3 = response.read()

Will work fine. Or, if you don’t want to deal with the “response” object you can call read() directly:

import urllib
mp3 = urllib.urlopen('http://www.example.com/sound.mp3').read()

回答 12

在python3中,您可以使用urllib3和shutil libraires。使用pip或pip3下载它们(取决于python3是否为默认值)

pip3 install urllib3 shutil

然后运行这段代码

import urllib.request
import shutil

url = "http://www.somewebsite.com/something.pdf"
output_file = "save_this_name.pdf"
with urllib.request.urlopen(url) as response, open(output_file, 'wb') as out_file:
    shutil.copyfileobj(response, out_file)

请注意,您已下载urllib3urllib在代码中使用

In python3 you can use urllib3 and shutil libraires. Download them by using pip or pip3 (Depending whether python3 is default or not)

pip3 install urllib3 shutil

Then run this code

import urllib.request
import shutil

url = "http://www.somewebsite.com/something.pdf"
output_file = "save_this_name.pdf"
with urllib.request.urlopen(url) as response, open(output_file, 'wb') as out_file:
    shutil.copyfileobj(response, out_file)

Note that you download urllib3 but use urllib in code


回答 13

您也可以使用urlretrieve获得进度反馈:

def report(blocknr, blocksize, size):
    current = blocknr*blocksize
    sys.stdout.write("\r{0:.2f}%".format(100.0*current/size))

def downloadFile(url):
    print "\n",url
    fname = url.split('/')[-1]
    print fname
    urllib.urlretrieve(url, fname, report)

You can get the progress feedback with urlretrieve as well:

def report(blocknr, blocksize, size):
    current = blocknr*blocksize
    sys.stdout.write("\r{0:.2f}%".format(100.0*current/size))

def downloadFile(url):
    print "\n",url
    fname = url.split('/')[-1]
    print fname
    urllib.urlretrieve(url, fname, report)

回答 14

如果已安装wget,则可以使用parallel_sync。

pip安装parallel_sync

from parallel_sync import wget
urls = ['http://something.png', 'http://somthing.tar.gz', 'http://somthing.zip']
wget.download('/tmp', urls)
# or a single file:
wget.download('/tmp', urls[0], filenames='x.zip', extract=True)

Doc:https : //pythonhosted.org/parallel_sync/pages/examples.html

这是非常强大的。它可以并行下载文件,重试失败,甚至可以将文件下载到远程计算机上。

If you have wget installed, you can use parallel_sync.

pip install parallel_sync

from parallel_sync import wget
urls = ['http://something.png', 'http://somthing.tar.gz', 'http://somthing.zip']
wget.download('/tmp', urls)
# or a single file:
wget.download('/tmp', urls[0], filenames='x.zip', extract=True)

Doc: https://pythonhosted.org/parallel_sync/pages/examples.html

This is pretty powerful. It can download files in parallel, retry upon failure , and it can even download files on a remote machine.


回答 15

如果速度对您很重要,则我对模块urllib和进行了小型性能测试wget,并考虑了wget一次尝试使用状态栏,一次尝试不使用状态栏。我使用了三个不同的500MB文件进行测试(不同的文件-消除了在后台进行某些缓存的机会)。在python2的debian机器上进行了测试。

首先,这些是结果(在不同的运行中它们是相似的):

$ python wget_test.py 
urlretrive_test : starting
urlretrive_test : 6.56
==============
wget_no_bar_test : starting
wget_no_bar_test : 7.20
==============
wget_with_bar_test : starting
100% [......................................................................] 541335552 / 541335552
wget_with_bar_test : 50.49
==============

我执行测试的方式是使用“配置文件”装饰器。这是完整的代码:

import wget
import urllib
import time
from functools import wraps

def profile(func):
    @wraps(func)
    def inner(*args):
        print func.__name__, ": starting"
        start = time.time()
        ret = func(*args)
        end = time.time()
        print func.__name__, ": {:.2f}".format(end - start)
        return ret
    return inner

url1 = 'http://host.com/500a.iso'
url2 = 'http://host.com/500b.iso'
url3 = 'http://host.com/500c.iso'

def do_nothing(*args):
    pass

@profile
def urlretrive_test(url):
    return urllib.urlretrieve(url)

@profile
def wget_no_bar_test(url):
    return wget.download(url, out='/tmp/', bar=do_nothing)

@profile
def wget_with_bar_test(url):
    return wget.download(url, out='/tmp/')

urlretrive_test(url1)
print '=============='
time.sleep(1)

wget_no_bar_test(url2)
print '=============='
time.sleep(1)

wget_with_bar_test(url3)
print '=============='
time.sleep(1)

urllib 似乎是最快的

If speed matters to you, I made a small performance test for the modules urllib and wget, and regarding wget I tried once with status bar and once without. I took three different 500MB files to test with (different files- to eliminate the chance that there is some caching going on under the hood). Tested on debian machine, with python2.

First, these are the results (they are similar in different runs):

$ python wget_test.py 
urlretrive_test : starting
urlretrive_test : 6.56
==============
wget_no_bar_test : starting
wget_no_bar_test : 7.20
==============
wget_with_bar_test : starting
100% [......................................................................] 541335552 / 541335552
wget_with_bar_test : 50.49
==============

The way I performed the test is using “profile” decorator. This is the full code:

import wget
import urllib
import time
from functools import wraps

def profile(func):
    @wraps(func)
    def inner(*args):
        print func.__name__, ": starting"
        start = time.time()
        ret = func(*args)
        end = time.time()
        print func.__name__, ": {:.2f}".format(end - start)
        return ret
    return inner

url1 = 'http://host.com/500a.iso'
url2 = 'http://host.com/500b.iso'
url3 = 'http://host.com/500c.iso'

def do_nothing(*args):
    pass

@profile
def urlretrive_test(url):
    return urllib.urlretrieve(url)

@profile
def wget_no_bar_test(url):
    return wget.download(url, out='/tmp/', bar=do_nothing)

@profile
def wget_with_bar_test(url):
    return wget.download(url, out='/tmp/')

urlretrive_test(url1)
print '=============='
time.sleep(1)

wget_no_bar_test(url2)
print '=============='
time.sleep(1)

wget_with_bar_test(url3)
print '=============='
time.sleep(1)

urllib seems to be the fastest


回答 16

仅出于完整性考虑,也可以使用该subprocess软件包调用任何程序来检索文件。专用于检索文件的程序要比Python函数(如)强大urlretrieve。例如,wget可以递归下载目录(-R),可以处理FTP,重定向,HTTP代理,可以避免重新下载现有文件(-nc),并且aria2可以进行多连接下载,从而有可能加快下载速度。

import subprocess
subprocess.check_output(['wget', '-O', 'example_output_file.html', 'https://example.com'])

在Jupyter Notebook中,还可以使用以下!语法直接调用程序:

!wget -O example_output_file.html https://example.com

Just for the sake of completeness, it is also possible to call any program for retrieving files using the subprocess package. Programs dedicated to retrieving files are more powerful than Python functions like urlretrieve. For example, wget can download directories recursively (-R), can deal with FTP, redirects, HTTP proxies, can avoid re-downloading existing files (-nc), and aria2 can do multi-connection downloads which can potentially speed up your downloads.

import subprocess
subprocess.check_output(['wget', '-O', 'example_output_file.html', 'https://example.com'])

In Jupyter Notebook, one can also call programs directly with the ! syntax:

!wget -O example_output_file.html https://example.com

回答 17

源代码可以是:

import urllib
sock = urllib.urlopen("http://diveintopython.org/")
htmlSource = sock.read()                            
sock.close()                                        
print htmlSource  

Source code can be:

import urllib
sock = urllib.urlopen("http://diveintopython.org/")
htmlSource = sock.read()                            
sock.close()                                        
print htmlSource  

回答 18

您可以在Python 2和3上使用PycURL

import pycurl

FILE_DEST = 'pycurl.html'
FILE_SRC = 'http://pycurl.io/'

with open(FILE_DEST, 'wb') as f:
    c = pycurl.Curl()
    c.setopt(c.URL, FILE_SRC)
    c.setopt(c.WRITEDATA, f)
    c.perform()
    c.close()

You can use PycURL on Python 2 and 3.

import pycurl

FILE_DEST = 'pycurl.html'
FILE_SRC = 'http://pycurl.io/'

with open(FILE_DEST, 'wb') as f:
    c = pycurl.Curl()
    c.setopt(c.URL, FILE_SRC)
    c.setopt(c.WRITEDATA, f)
    c.perform()
    c.close()

回答 19

我编写了以下内容,这些内容适用于香草Python 2或Python 3。


import sys
try:
    import urllib.request
    python3 = True
except ImportError:
    import urllib2
    python3 = False


def progress_callback_simple(downloaded,total):
    sys.stdout.write(
        "\r" +
        (len(str(total))-len(str(downloaded)))*" " + str(downloaded) + "/%d"%total +
        " [%3.2f%%]"%(100.0*float(downloaded)/float(total))
    )
    sys.stdout.flush()

def download(srcurl, dstfilepath, progress_callback=None, block_size=8192):
    def _download_helper(response, out_file, file_size):
        if progress_callback!=None: progress_callback(0,file_size)
        if block_size == None:
            buffer = response.read()
            out_file.write(buffer)

            if progress_callback!=None: progress_callback(file_size,file_size)
        else:
            file_size_dl = 0
            while True:
                buffer = response.read(block_size)
                if not buffer: break

                file_size_dl += len(buffer)
                out_file.write(buffer)

                if progress_callback!=None: progress_callback(file_size_dl,file_size)
    with open(dstfilepath,"wb") as out_file:
        if python3:
            with urllib.request.urlopen(srcurl) as response:
                file_size = int(response.getheader("Content-Length"))
                _download_helper(response,out_file,file_size)
        else:
            response = urllib2.urlopen(srcurl)
            meta = response.info()
            file_size = int(meta.getheaders("Content-Length")[0])
            _download_helper(response,out_file,file_size)

import traceback
try:
    download(
        "https://geometrian.com/data/programming/projects/glLib/glLib%20Reloaded%200.5.9/0.5.9.zip",
        "output.zip",
        progress_callback_simple
    )
except:
    traceback.print_exc()
    input()

笔记:

  • 支持“进度条”回调。
  • 从我的网站下载了一个4 MB的测试.zip文件。

I wrote the following, which works in vanilla Python 2 or Python 3.


import sys
try:
    import urllib.request
    python3 = True
except ImportError:
    import urllib2
    python3 = False


def progress_callback_simple(downloaded,total):
    sys.stdout.write(
        "\r" +
        (len(str(total))-len(str(downloaded)))*" " + str(downloaded) + "/%d"%total +
        " [%3.2f%%]"%(100.0*float(downloaded)/float(total))
    )
    sys.stdout.flush()

def download(srcurl, dstfilepath, progress_callback=None, block_size=8192):
    def _download_helper(response, out_file, file_size):
        if progress_callback!=None: progress_callback(0,file_size)
        if block_size == None:
            buffer = response.read()
            out_file.write(buffer)

            if progress_callback!=None: progress_callback(file_size,file_size)
        else:
            file_size_dl = 0
            while True:
                buffer = response.read(block_size)
                if not buffer: break

                file_size_dl += len(buffer)
                out_file.write(buffer)

                if progress_callback!=None: progress_callback(file_size_dl,file_size)
    with open(dstfilepath,"wb") as out_file:
        if python3:
            with urllib.request.urlopen(srcurl) as response:
                file_size = int(response.getheader("Content-Length"))
                _download_helper(response,out_file,file_size)
        else:
            response = urllib2.urlopen(srcurl)
            meta = response.info()
            file_size = int(meta.getheaders("Content-Length")[0])
            _download_helper(response,out_file,file_size)

import traceback
try:
    download(
        "https://geometrian.com/data/programming/projects/glLib/glLib%20Reloaded%200.5.9/0.5.9.zip",
        "output.zip",
        progress_callback_simple
    )
except:
    traceback.print_exc()
    input()

Notes:

  • Supports a “progress bar” callback.
  • Download is a 4 MB test .zip from my website.

回答 20

这可能有点晚了,但是我看到了pabloG的代码,不禁添加了一个os.system(’cls’)使其看上去真棒!一探究竟 :

    import urllib2,os

    url = "http://download.thinkbroadband.com/10MB.zip"

    file_name = url.split('/')[-1]
    u = urllib2.urlopen(url)
    f = open(file_name, 'wb')
    meta = u.info()
    file_size = int(meta.getheaders("Content-Length")[0])
    print "Downloading: %s Bytes: %s" % (file_name, file_size)
    os.system('cls')
    file_size_dl = 0
    block_sz = 8192
    while True:
        buffer = u.read(block_sz)
        if not buffer:
            break

        file_size_dl += len(buffer)
        f.write(buffer)
        status = r"%10d  [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
        status = status + chr(8)*(len(status)+1)
        print status,

    f.close()

如果在Windows以外的环境中运行,则必须使用“ cls”以外的其他名称。在MAC OS X和Linux中,应该“清楚”。

This may be a little late, But I saw pabloG’s code and couldn’t help adding a os.system(‘cls’) to make it look AWESOME! Check it out :

    import urllib2,os

    url = "http://download.thinkbroadband.com/10MB.zip"

    file_name = url.split('/')[-1]
    u = urllib2.urlopen(url)
    f = open(file_name, 'wb')
    meta = u.info()
    file_size = int(meta.getheaders("Content-Length")[0])
    print "Downloading: %s Bytes: %s" % (file_name, file_size)
    os.system('cls')
    file_size_dl = 0
    block_sz = 8192
    while True:
        buffer = u.read(block_sz)
        if not buffer:
            break

        file_size_dl += len(buffer)
        f.write(buffer)
        status = r"%10d  [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size)
        status = status + chr(8)*(len(status)+1)
        print status,

    f.close()

If running in an environment other than Windows, you will have to use something other then ‘cls’. In MAC OS X and Linux it should be ‘clear’.


回答 21

urlretrieve和request.get很简单,但事实并非如此。我已经获取了几个站点的数据,包括文本和图像,以上两个可能解决了大多数任务。但是对于更通用的解决方案,我建议使用urlopen。由于它包含在Python 3标准库中,因此您的代码可以在任何运行Python 3的计算机上运行,​​而无需预先安装站点软件包

import urllib.request
url_request = urllib.request.Request(url, headers=headers)
url_connect = urllib.request.urlopen(url_request)

#remember to open file in bytes mode
with open(filename, 'wb') as f:
    while True:
        buffer = url_connect.read(buffer_size)
        if not buffer: break

        #an integer value of size of written data
        data_wrote = f.write(buffer)

#you could probably use with-open-as manner
url_connect.close()

当使用Python通过http下载文件时,此答案提供了针对HTTP 403 Forbidden的解决方案。我只尝试了请求和urllib模块,其他模块可能提供了更好的功能,但这是我用来解决大多数问题的模块。

urlretrieve and requests.get are simple, however the reality not. I have fetched data for couple sites, including text and images, the above two probably solve most of the tasks. but for a more universal solution I suggest the use of urlopen. As it is included in Python 3 standard library, your code could run on any machine that run Python 3 without pre-installing site-package

import urllib.request
url_request = urllib.request.Request(url, headers=headers)
url_connect = urllib.request.urlopen(url_request)

#remember to open file in bytes mode
with open(filename, 'wb') as f:
    while True:
        buffer = url_connect.read(buffer_size)
        if not buffer: break

        #an integer value of size of written data
        data_wrote = f.write(buffer)

#you could probably use with-open-as manner
url_connect.close()

This answer provides a solution to HTTP 403 Forbidden when downloading file over http using Python. I have tried only requests and urllib modules, the other module may provide something better, but this is the one I used to solve most of the problems.


回答 22

答案较晚,但python>=3.6您可以使用:

import dload
dload.save(url)

安装dload方式:

pip3 install dload

Late answer, but for python>=3.6 you can use:

import dload
dload.save(url)

Install dload with:

pip3 install dload

回答 23

我想从网页上下载所有文件。我尝试了一下,wget但是失败了,所以我决定使用Python路由,然后找到了这个线程。

阅读后,我做了一个命令行应用程序soupget,扩展了PabloGStan的出色答案,并添加了一些有用的选项。

它使用BeatifulSoup收集页面的所有URL,然后下载具有所需扩展名的URL。最后,它可以并行下载多个文件。

这里是:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (division, absolute_import, print_function, unicode_literals)
import sys, os, argparse
from bs4 import BeautifulSoup

# --- insert Stan's script here ---
# if sys.version_info >= (3,): 
#...
#...
# def download_file(url, dest=None): 
#...
#...

# --- new stuff ---
def collect_all_url(page_url, extensions):
    """
    Recovers all links in page_url checking for all the desired extensions
    """
    conn = urllib2.urlopen(page_url)
    html = conn.read()
    soup = BeautifulSoup(html, 'lxml')
    links = soup.find_all('a')

    results = []    
    for tag in links:
        link = tag.get('href', None)
        if link is not None: 
            for e in extensions:
                if e in link:
                    # Fallback for badly defined links
                    # checks for missing scheme or netloc
                    if bool(urlparse.urlparse(link).scheme) and bool(urlparse.urlparse(link).netloc):
                        results.append(link)
                    else:
                        new_url=urlparse.urljoin(page_url,link)                        
                        results.append(new_url)
    return results

if __name__ == "__main__":  # Only run if this file is called directly
    # Command line arguments
    parser = argparse.ArgumentParser(
        description='Download all files from a webpage.')
    parser.add_argument(
        '-u', '--url', 
        help='Page url to request')
    parser.add_argument(
        '-e', '--ext', 
        nargs='+',
        help='Extension(s) to find')    
    parser.add_argument(
        '-d', '--dest', 
        default=None,
        help='Destination where to save the files')
    parser.add_argument(
        '-p', '--par', 
        action='store_true', default=False, 
        help="Turns on parallel download")
    args = parser.parse_args()

    # Recover files to download
    all_links = collect_all_url(args.url, args.ext)

    # Download
    if not args.par:
        for l in all_links:
            try:
                filename = download_file(l, args.dest)
                print(l)
            except Exception as e:
                print("Error while downloading: {}".format(e))
    else:
        from multiprocessing.pool import ThreadPool
        results = ThreadPool(10).imap_unordered(
            lambda x: download_file(x, args.dest), all_links)
        for p in results:
            print(p)

其用法的一个示例是:

python3 soupget.py -p -e <list of extensions> -d <destination_folder> -u <target_webpage>

还有一个实际示例,如果您想看到它的实际效果:

python3 soupget.py -p -e .xlsx .pdf .csv -u https://healthdata.gov/dataset/chemicals-cosmetics

I wanted do download all the files from a webpage. I tried wget but it was failing so I decided for the Python route and I found this thread.

After reading it, I have made a little command line application, soupget, expanding on the excellent answers of PabloG and Stan and adding some useful options.

It uses BeatifulSoup to collect all the URLs of the page and then download the ones with the desired extension(s). Finally it can download multiple files in parallel.

Here it is:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import (division, absolute_import, print_function, unicode_literals)
import sys, os, argparse
from bs4 import BeautifulSoup

# --- insert Stan's script here ---
# if sys.version_info >= (3,): 
#...
#...
# def download_file(url, dest=None): 
#...
#...

# --- new stuff ---
def collect_all_url(page_url, extensions):
    """
    Recovers all links in page_url checking for all the desired extensions
    """
    conn = urllib2.urlopen(page_url)
    html = conn.read()
    soup = BeautifulSoup(html, 'lxml')
    links = soup.find_all('a')

    results = []    
    for tag in links:
        link = tag.get('href', None)
        if link is not None: 
            for e in extensions:
                if e in link:
                    # Fallback for badly defined links
                    # checks for missing scheme or netloc
                    if bool(urlparse.urlparse(link).scheme) and bool(urlparse.urlparse(link).netloc):
                        results.append(link)
                    else:
                        new_url=urlparse.urljoin(page_url,link)                        
                        results.append(new_url)
    return results

if __name__ == "__main__":  # Only run if this file is called directly
    # Command line arguments
    parser = argparse.ArgumentParser(
        description='Download all files from a webpage.')
    parser.add_argument(
        '-u', '--url', 
        help='Page url to request')
    parser.add_argument(
        '-e', '--ext', 
        nargs='+',
        help='Extension(s) to find')    
    parser.add_argument(
        '-d', '--dest', 
        default=None,
        help='Destination where to save the files')
    parser.add_argument(
        '-p', '--par', 
        action='store_true', default=False, 
        help="Turns on parallel download")
    args = parser.parse_args()

    # Recover files to download
    all_links = collect_all_url(args.url, args.ext)

    # Download
    if not args.par:
        for l in all_links:
            try:
                filename = download_file(l, args.dest)
                print(l)
            except Exception as e:
                print("Error while downloading: {}".format(e))
    else:
        from multiprocessing.pool import ThreadPool
        results = ThreadPool(10).imap_unordered(
            lambda x: download_file(x, args.dest), all_links)
        for p in results:
            print(p)

An example of its usage is:

python3 soupget.py -p -e <list of extensions> -d <destination_folder> -u <target_webpage>

And an actual example if you want to see it in action:

python3 soupget.py -p -e .xlsx .pdf .csv -u https://healthdata.gov/dataset/chemicals-cosmetics

Httpx-Python的下一代HTTP客户端。🦋

HTTPX是Python3的一个功能齐全的HTTP客户端,它提供同步和异步API,并同时支持HTTP/1.1和HTTP/2

注意事项HTTPX应该在测试版中考虑。我们相信我们现在已经使公共API达到了一个稳定点,但是强烈建议将您的依赖项绑定到0.18.*发布,这样您就可以正确地查看API changes between package updates1.0版本预计将在2021年的某个时候发布


我们开始吧

>>> import httpx
>>> r = httpx.get('https://www.example.org/')
>>> r
<Response [200 OK]>
>>> r.status_code
200
>>> r.headers['content-type']
'text/html; charset=UTF-8'
>>> r.text
'<!doctype html>\n<html>\n<head>\n<title>Example Domain</title>...'

或者,使用异步API

使用IPython或Python 3.8+和python -m asyncio以交互方式尝试此代码

>>> import httpx
>>> async with httpx.AsyncClient() as client:
...     r = await client.get('https://www.example.org/')
...
>>> r
<Response [200 OK]>

功能

HTTPX构建在requests,并为您提供:

加上的所有标准功能requests

  • 国际域名和URL
  • 保活和连接池
  • 具有Cookie持久性的会话
  • 浏览器样式的SSL验证
  • 基本/摘要身份验证
  • 精美的密钥/价值Cookie
  • 自动解压
  • 自动内容解码
  • Unicode响应正文
  • 多部分文件上载
  • HTTP(S)代理支持
  • 连接超时
  • 流式下载
  • netrc支持
  • 分块请求

安装

使用pip安装:

$ pip install httpx

或者,要包括可选的HTTP/2支持,请使用:

$ pip install httpx[http2]

HTTPX需要Python 3.6+

文档

有关项目文档,请访问https://www.python-httpx.org/

要浏览所有基础知识,请访问QuickStart

有关更高级的主题,请参阅Advanced Usage部分中,async support节,或HTTP/2部分

这个Developer Interface提供全面的API参考

要了解有关与HTTPX集成的工具的信息,请参阅Third Party Packages

贡献力量

如果您想使用HTTPX做出贡献,请查看Contributing Guide学习如何开始

依赖项

HTTPX项目依赖于这些优秀的库:

  • httpcore-的底层传输实现httpx
    • h11-HTTP/1.1支持
    • h2-HTTP/2支持。(可选)
  • certifi-SSL证书
  • rfc3986-URL解析和规范化
    • idna-支持国际化域名
  • sniffio-异步库自动检测
  • async_generator-后端支持contextlib.asynccontextmanager(仅Python 3.6需要)
  • brotlicffi-解码“brotli”压缩响应。(可选)

巨额信贷应归因于requests对于此工作中的大部分工作所遵循的API布局,以及urllib3有关较低级别的网络细节的大量设计灵感,请参阅

-⭐️-

HTTPX是BSD licensed密码。在英国布赖顿设计和建造

Uvicorn-闪电般的ASGI服务器。🦄

闪电般的ASGI服务器

文档https://www.uvicorn.org

要求:Python 3.6+(要支持Python 3.5,请安装版本0.8.6。)

Uvicorn是一个闪电般的ASGI服务器实现,使用uvloophttptools

直到最近,Python还缺乏用于异步框架的最低级别的服务器/应用程序接口。这个ASGI specification填补了这一空白,意味着我们现在能够开始构建一组可在所有异步CIO框架中使用的通用工具

Uvicorn目前支持HTTP/1.1和WebSockets。计划支持HTTP/2

快速入门

使用以下方式安装pip

$ pip install uvicorn

这将安装具有最小(纯Python)依赖项的uvicorn

$ pip install uvicorn[standard]

这将安装带有“基于Cython的”依赖项(如果可能)和其他“可选的附加项”的uvicorn。

在此上下文中,“基于Cython”的含义如下:

  • 事件循环uvloop将在可能的情况下安装和使用
  • http协议将由httptools如果可能的话

此外,“自选额外服务”的意思是:

  • WebSocket协议将由websockets(您是否要使用wsproto如果可能,您需要手动安装)
  • 这个--reloader处于开发模式的标志将使用watchgod
  • Windows用户将拥有colorama为彩色原木安装的
  • python-dotenv如果您要使用--env-file选项
  • PyYAML将被安装,以允许您提供.yaml文件到--log-config,如果需要

创建应用程序,在example.py

async def app(scope, receive, send):
    assert scope['type'] == 'http'

    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ],
    })
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })

运行服务器:

$ uvicorn example:app

Uvicorn是BSD licensed代码
在英国布赖顿设计和建造

-🦄-

Guricorn‘Green Unicorn’ 是一款适用于UNIX的WSGI HTTP服务器

Gunicorn‘Green Unicorn’是用于UNIX的Python WSGI HTTP服务器。这是一个从Ruby‘s移植过来的叉前工人模型Unicorn项目。Gunicorn服务器与各种Web框架广泛兼容,实现简单,对服务器资源的使用很少,而且速度相当快

欢迎加入我们的行列#gunicorn在……上面Freenode

文档

文档位于https://docs.gunicorn.org

安装

Gunicorn要求Python 3.x>=3.5

从PyPI安装:

$ pip install gunicorn

用法

基本用法:

$ gunicorn [OPTIONS] APP_MODULE

哪里APP_MODULE是属于这种模式的$(MODULE_NAME):$(VARIABLE_NAME)模块名称可以是完整的虚线路径。变量名引用应在指定模块中找到的WSGI可调用对象

测试应用示例:

$ cd examples
$ gunicorn --workers=2 test:app

贡献

看见our complete contributor’s guide有关更多详细信息,请参阅

许可证

Gunicorn是在麻省理工学院的许可下发布的。请参阅LICENSE有关更多详细信息,请提交文件

Locust-用Python编写的可伸缩用户负载测试工具

Locust是一个易于使用、可编写脚本和可扩展的性能测试工具。您可以使用常规Python代码定义用户的行为,而不是使用笨重的UI或特定于域的语言。这使得Locust具有无限的可扩展性,并且对开发人员非常友好

功能

用普通老式Python编写用户测试场景

如果希望用户循环、执行一些条件行为或进行一些计算,只需使用Python提供的常规编程构造即可。Locust在它自己的greenlet内运行每个用户(一个轻量级进程/协程)。这使您可以像编写普通(阻塞)Python代码一样编写测试,而不必使用回调或其他机制。因为您的场景“仅仅是python”,所以您可以使用常规IDE,并将测试作为常规代码进行版本控制(与使用XML或二进制格式的其他一些工具相反)

from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
    wait_time = between(1, 2)

    def on_start(self):
        self.client.post("/login", json={"username":"foo", "password":"bar"})

    @task
    def hello_world(self):
        self.client.get("/hello")
        self.client.get("/world")

    @task(3)
    def view_item(self):
        for item_id in range(10):
            self.client.get(f"/item?id={item_id}", name="/item")

分布式和可扩展-支持数十万用户

Locust使运行分布在多台机器上的负载测试变得很容易。它是基于事件的(使用gevent),这使得单个进程可以处理数千个并发用户。虽然可能有其他工具能够在给定硬件上每秒执行更多请求,但每个Locust用户的低开销使其非常适合测试高并发工作负载

基于Web的用户界面

Locust有一个用户友好的Web界面,可以实时显示您的测试进度。您甚至可以在测试运行时更改负载。它还可以在没有UI的情况下运行,便于用于CI/CD测试

可以测试任何系统

即使Locust主要与网站/服务一起工作,它也可以用来测试几乎任何系统或协议。只是write a client您想要测试的内容,或者explore some created by the community

可黑客攻击

蝗虫很小,非常灵活,我们打算保持这种状态。如果你想send reporting data to that database & graphing system you like,包装对睡觉api的调用以处理系统的细节或运行totally custom load pattern,没有什么能阻止你!

链接

作者

许可证

根据麻省理工学院许可许可的开放源码(请参阅许可证有关详细信息,请参阅文件)