标签归档:python-requests

Python请求库如何通过单个令牌传递Authorization标头

问题:Python请求库如何通过单个令牌传递Authorization标头

我有一个请求URI和一个令牌。如果我使用:

curl -s "<MY_URI>" -H "Authorization: TOK:<MY_TOKEN>"

等等,我得到200并查看相应的JSON数据。因此,我安装了请求,并且当我尝试访问该资源时,我得到了403,这可能是因为我不知道传递该令牌的正确语法。谁能帮我解决这个问题?这就是我所拥有的:

import sys,socket
import requests

r = requests.get('<MY_URI>','<MY_TOKEN>')
r. status_code

我已经尝试过:

r = requests.get('<MY_URI>',auth=('<MY_TOKEN>'))
r = requests.get('<MY_URI>',auth=('TOK','<MY_TOKEN>'))
r = requests.get('<MY_URI>',headers=('Authorization: TOK:<MY_TOKEN>'))

但是这些都不起作用。

I have a request URI and a token. If I use:

curl -s "<MY_URI>" -H "Authorization: TOK:<MY_TOKEN>"

etc., I get a 200 and view the corresponding JSON data. So, I installed requests and when I attempt to access this resource I get a 403 probably because I do not know the correct syntax to pass that token. Can anyone help me figure it out? This is what I have:

import sys,socket
import requests

r = requests.get('<MY_URI>','<MY_TOKEN>')
r. status_code

I already tried:

r = requests.get('<MY_URI>',auth=('<MY_TOKEN>'))
r = requests.get('<MY_URI>',auth=('TOK','<MY_TOKEN>'))
r = requests.get('<MY_URI>',headers=('Authorization: TOK:<MY_TOKEN>'))

But none of these work.


回答 0

在python中:

('<MY_TOKEN>')

相当于

'<MY_TOKEN>'

并要求翻译

('TOK', '<MY_TOKEN>')

当您希望请求使用基本身份验证并设计一个授权标头时,如下所示:

'VE9LOjxNWV9UT0tFTj4K'

这是base64的表示形式 'TOK:<MY_TOKEN>'

要传递自己的标头,您需要像这样传递字典:

r = requests.get('<MY_URI>', headers={'Authorization': 'TOK:<MY_TOKEN>'})

In python:

('<MY_TOKEN>')

is equivalent to

'<MY_TOKEN>'

And requests interprets

('TOK', '<MY_TOKEN>')

As you wanting requests to use Basic Authentication and craft an authorization header like so:

'VE9LOjxNWV9UT0tFTj4K'

Which is the base64 representation of 'TOK:<MY_TOKEN>'

To pass your own header you pass in a dictionary like so:

r = requests.get('<MY_URI>', headers={'Authorization': 'TOK:<MY_TOKEN>'})

回答 1

我一直在寻找类似的东西,并且遇到了这个问题。看来您提到的第一个选项

r = requests.get('<MY_URI>', auth=('<MY_TOKEN>'))

“ auth”具有两个参数:用户名和密码,因此实际语句应为

r=requests.get('<MY_URI>', auth=('<YOUR_USERNAME>', '<YOUR_PASSWORD>'))

在我的情况下,没有密码,因此我将auth字段中的第二个参数留空,如下所示:

r=requests.get('<MY_URI', auth=('MY_USERNAME', ''))

希望这对某人有帮助:)

I was looking for something similar and came across this. It looks like in the first option you mentioned

r = requests.get('<MY_URI>', auth=('<MY_TOKEN>'))

“auth” takes two parameters: username and password, so the actual statement should be

r=requests.get('<MY_URI>', auth=('<YOUR_USERNAME>', '<YOUR_PASSWORD>'))

In my case, there was no password, so I left the second parameter in auth field empty as shown below:

r=requests.get('<MY_URI', auth=('MY_USERNAME', ''))

Hope this helps somebody :)


回答 2

这为我工作:

access_token = #yourAccessTokenHere#

result = requests.post(url,
      headers={'Content-Type':'application/json',
               'Authorization': 'Bearer {}'.format(access_token)})

This worked for me:

access_token = #yourAccessTokenHere#

result = requests.post(url,
      headers={'Content-Type':'application/json',
               'Authorization': 'Bearer {}'.format(access_token)})

回答 3

您还可以设置整个会话的标题:

TOKEN = 'abcd0123'
HEADERS = {'Authorization': 'token {}'.format(TOKEN)}

with requests.Session() as s:

    s.headers.update(HEADERS)
    resp = s.get('http://example.com/')

You can also set headers for the entire session:

TOKEN = 'abcd0123'
HEADERS = {'Authorization': 'token {}'.format(TOKEN)}

with requests.Session() as s:

    s.headers.update(HEADERS)
    resp = s.get('http://example.com/')

回答 4

请求本身仅通过用户传递参数而不是令牌支持基本身份验证。

如果需要,可以添加以下类以使请求支持基于令牌的基本身份验证:

import requests
from base64 import b64encode

class BasicAuthToken(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token
    def __call__(self, r):
        authstr = 'Basic ' + b64encode(('token:' + self.token).encode('utf-8')).decode('utf-8')
        r.headers['Authorization'] = authstr
        return r

然后,要使用它,请运行以下请求:

r = requests.get(url, auth=BasicAuthToken(api_token))

一种替代方法是改为编写自定义标头,如此处其他用户所建议的那样。

Requests natively supports basic auth only with user-pass params, not with tokens.

You could, if you wanted, add the following class to have requests support token based basic authentication:

import requests
from base64 import b64encode

class BasicAuthToken(requests.auth.AuthBase):
    def __init__(self, token):
        self.token = token
    def __call__(self, r):
        authstr = 'Basic ' + b64encode(('token:' + self.token).encode('utf-8')).decode('utf-8')
        r.headers['Authorization'] = authstr
        return r

Then, to use it run the following request :

r = requests.get(url, auth=BasicAuthToken(api_token))

An alternative would be to formulate a custom header instead, just as was suggested by other users here.


回答 5

我在这里建立了,我可以 在这里登录:https : //auth0.com/docs/flows/guides/auth-code/call-api-auth-code,所以我在linkedin上的代码在这里登录:

ref = 'https://api.linkedin.com/v2/me'
headers = {"content-type": "application/json; charset=UTF-8",'Authorization':'Bearer {}'.format(access_token)}
Linkedin_user_info = requests.get(ref1, headers=headers).json()

i founded here, its ok with me for linkedin: https://auth0.com/docs/flows/guides/auth-code/call-api-auth-code so my code with with linkedin login here:

ref = 'https://api.linkedin.com/v2/me'
headers = {"content-type": "application/json; charset=UTF-8",'Authorization':'Bearer {}'.format(access_token)}
Linkedin_user_info = requests.get(ref1, headers=headers).json()

回答 6

您可以尝试这样的事情

r = requests.get(ENDPOINT, params=params, headers={'Authorization': 'Basic %s' %  API_KEY})

You can try something like this

r = requests.get(ENDPOINT, params=params, headers={'Authorization': 'Basic %s' %  API_KEY})

回答 7

这为我工作:

r = requests.get('http://127.0.0.1:8000/api/ray/musics/', headers={'Authorization': 'Token 22ec0cc4207ebead1f51dea06ff149342082b190'})

我的代码使用用户生成的令牌。

This worked for me:

r = requests.get('http://127.0.0.1:8000/api/ray/musics/', headers={'Authorization': 'Token 22ec0cc4207ebead1f51dea06ff149342082b190'})

My code uses user generated token.


如何使用Python Requests库在发布请求中发送Cookie?

问题:如何使用Python Requests库在发布请求中发送Cookie?

我正在尝试使用Requests库发送带有后期请求的cookie,但是我不确定如何根据其文档实际设置cookie。该脚本可在Wikipedia上使用,并且需要发送的cookie具有以下形式:

enwiki_session=17ab96bd8ffbe8ca58a78657a918558e; path=/; domain=.wikipedia.com; HttpOnly

但是,requests文档快速入门仅以此为例:

cookies = dict(cookies_are='working')

如何使用该库对上述Cookie进行编码?我是否需要使用python的标准cookie库进行制作,然后将其与POST请求一起发送?

I’m trying to use the Requests library to send cookies with a post request, but I’m not sure how to actually set up the cookies based on its documentation. The script is for use on Wikipedia, and the cookie(s) that need to be sent are of this form:

enwiki_session=17ab96bd8ffbe8ca58a78657a918558e; path=/; domain=.wikipedia.com; HttpOnly

However, the requests documentation quickstart gives this as the only example:

cookies = dict(cookies_are='working')

How can I encode a cookie like the above using this library? Do I need to make it with python’s standard cookie library, then send it along with the POST request?


回答 0

最新版本的“请求”将通过简单的词典为您构建CookieJars。

import requests

cookies = {'enwiki_session': '17ab96bd8ffbe8ca58a78657a918558'}

r = requests.post('http://wikipedia.org', cookies=cookies)

请享用 :)

The latest release of Requests will build CookieJars for you from simple dictionaries.

import requests

cookies = {'enwiki_session': '17ab96bd8ffbe8ca58a78657a918558'}

r = requests.post('http://wikipedia.org', cookies=cookies)

Enjoy :)


回答 1

只是为了扩展上一个答案,如果将两个请求链接在一起,并且想要将第一个返回的cookie发送到第二个(例如,保持跨请求的会话有效),则可以执行以下操作:

import requests
r1 = requests.post('http://www.yourapp.com/login')
r2 = requests.post('http://www.yourapp.com/somepage',cookies=r1.cookies)

Just to extend on the previous answer, if you are linking two requests together and want to send the cookies returned from the first one to the second one (for example, maintaining a session alive across requests) you can do:

import requests
r1 = requests.post('http://www.yourapp.com/login')
r2 = requests.post('http://www.yourapp.com/somepage',cookies=r1.cookies)

回答 2

如果要将cookie传递给浏览器,则必须附加到标头以发送回。如果您使用的是wsgi:

import requests
...


def application(environ, start_response):
    cookie = {'enwiki_session': '17ab96bd8ffbe8ca58a78657a918558'}
    response_headers = [('Content-type', 'text/plain')]
    response_headers.append(('Set-Cookie',cookie))
...

    return [bytes(post_env),response_headers]

通过将auth user / password传递给我的python脚本并将cookie传递给浏览器,我能够成功地通过托管在同一域中的Bugzilla和TWiki进行身份验证,而python wsgi脚本正在运行。这使我可以在同一浏览器中打开Bugzilla和TWiki页面并进行身份验证。我正在尝试对SuiteCRM执行相同的操作,但是即使SuiteCRM已成功通过身份验证,我也无法接受从python脚本获得的会话cookie。

If you want to pass the cookie to the browser, you have to append to the headers to be sent back. If you’re using wsgi:

import requests
...


def application(environ, start_response):
    cookie = {'enwiki_session': '17ab96bd8ffbe8ca58a78657a918558'}
    response_headers = [('Content-type', 'text/plain')]
    response_headers.append(('Set-Cookie',cookie))
...

    return [bytes(post_env),response_headers]

I’m successfully able to authenticate with Bugzilla and TWiki hosted on the same domain my python wsgi script is running by passing auth user/password to my python script and pass the cookies to the browser. This allows me to open the Bugzilla and TWiki pages in the same browser and be authenticated. I’m trying to do the same with SuiteCRM but i’m having trouble with SuiteCRM accepting the session cookies obtained from the python script even though it has successfully authenticated.


将标头添加到python请求模块

问题:将标头添加到python请求模块

之前我使用httplib模块在请求中添加标头。现在,我正在对该requests模块尝试相同的操作。

这是我正在使用的python请求模块:http : //pypi.python.org/pypi/requests

如何向标头添加标题,request.postrequest.get说必须foobar在标头的每个请求中添加密钥。

Earlier I used httplib module to add a header in the request. Now I am trying the same thing with the requests module.

This is the python request module I am using: http://pypi.python.org/pypi/requests

How can I add a header to request.post() and request.get(). Say I have to add foobar key in each request in the header.


回答 0

http://docs.python-requests.org/en/latest/user/quickstart/

url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}

r = requests.post(url, data=json.dumps(payload), headers=headers)

您只需要用标题创建一个字典(键:值对,其中键是标题的名称,值是该对的值),然后将该字典传递给.getor .post方法的标题参数。

因此,更具体地针对您的问题:

headers = {'foobar': 'raboof'}
requests.get('http://himom.com', headers=headers)

From http://docs.python-requests.org/en/latest/user/quickstart/

url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}

r = requests.post(url, data=json.dumps(payload), headers=headers)

You just need to create a dict with your headers (key: value pairs where the key is the name of the header and the value is, well, the value of the pair) and pass that dict to the headers parameter on the .get or .post method.

So more specific to your question:

headers = {'foobar': 'raboof'}
requests.get('http://himom.com', headers=headers)

回答 1

您还可以执行此操作以为Session对象的所有将来获取设置标头,其中x-test将出现在所有s.get()调用中:

s = requests.Session()
s.auth = ('user', 'pass')
s.headers.update({'x-test': 'true'})

# both 'x-test' and 'x-test2' are sent
s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})

来自:http : //docs.python-requests.org/en/latest/user/advanced/#session-objects

You can also do this to set a header for all future gets for the Session object, where x-test will be in all s.get() calls:

s = requests.Session()
s.auth = ('user', 'pass')
s.headers.update({'x-test': 'true'})

# both 'x-test' and 'x-test2' are sent
s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})

from: http://docs.python-requests.org/en/latest/user/advanced/#session-objects


python请求超时。获取整个响应

问题:python请求超时。获取整个响应

我正在收集网站列表上的统计信息,为了简化起见,我使用了请求。这是我的代码:

data=[]
websites=['http://google.com', 'http://bbc.co.uk']
for w in websites:
    r= requests.get(w, verify=False)
    data.append( (r.url, len(r.content), r.elapsed.total_seconds(), str([(l.status_code, l.url) for l in r.history]), str(r.headers.items()), str(r.cookies.items())) )

现在,我想requests.get在10秒后超时,以免循环陷入困境。

这个问题以前也很有趣但是没有一个答案是正确的。我将为此悬赏,以得到一个不错的答案。

我听说也许不使用请求是个好主意,但是我应该如何获得请求所提供的好处。(元组中的)

I’m gathering statistics on a list of websites and I’m using requests for it for simplicity. Here is my code:

data=[]
websites=['http://google.com', 'http://bbc.co.uk']
for w in websites:
    r= requests.get(w, verify=False)
    data.append( (r.url, len(r.content), r.elapsed.total_seconds(), str([(l.status_code, l.url) for l in r.history]), str(r.headers.items()), str(r.cookies.items())) )

Now, I want requests.get to timeout after 10 seconds so the loop doesn’t get stuck.

This question has been of interest before too but none of the answers are clean. I will be putting some bounty on this to get a nice answer.

I hear that maybe not using requests is a good idea but then how should I get the nice things requests offer. (the ones in the tuple)


回答 0

怎样使用eventlet?如果您想在10秒后使请求超时,即使正在接收数据,此代码段也将为您服务:

import requests
import eventlet
eventlet.monkey_patch()

with eventlet.Timeout(10):
    requests.get("http://ipv4.download.thinkbroadband.com/1GB.zip", verify=False)

What about using eventlet? If you want to timeout the request after 10 seconds, even if data is being received, this snippet will work for you:

import requests
import eventlet
eventlet.monkey_patch()

with eventlet.Timeout(10):
    requests.get("http://ipv4.download.thinkbroadband.com/1GB.zip", verify=False)

回答 1

设置超时参数

r = requests.get(w, verify=False, timeout=10) # 10 seconds

只要您不stream=True对该请求进行设置,requests.get()如果连接花费的时间超过十秒钟,或者服务器发送的数据超过十秒钟,这将导致呼叫超时。

Set the timeout parameter:

r = requests.get(w, verify=False, timeout=10) # 10 seconds

As long as you don’t set stream=True on that request, this will cause the call to requests.get() to timeout if the connection takes more than ten seconds, or if the server doesn’t send data for more than ten seconds.


回答 2

更新:https//requests.readthedocs.io/en/master/user/advanced/#timeouts

在新版本中requests

如果为超时指定单个值,则如下所示:

r = requests.get('https://github.com', timeout=5)

超时值将同时应用于connectread超时。如果要单独设置值,请指定一个元组:

r = requests.get('https://github.com', timeout=(3.05, 27))

如果远程服务器非常慢,则可以通过将None传递为超时值,然后获取一杯咖啡,从而使Requests永远等待响应。

r = requests.get('https://github.com', timeout=None)

我的旧的(可能是过时的)答案(很久以前发布了):

还有其他方法可以解决此问题:

1.使用TimeoutSauce内部类

来自:https : //github.com/kennethreitz/requests/issues/1928#issuecomment-35811896

import requests from requests.adapters import TimeoutSauce

class MyTimeout(TimeoutSauce):
    def __init__(self, *args, **kwargs):
        connect = kwargs.get('connect', 5)
        read = kwargs.get('read', connect)
        super(MyTimeout, self).__init__(connect=connect, read=read)

requests.adapters.TimeoutSauce = MyTimeout

此代码应使我们将读取超时设置为等于连接超时,这是您在Session.get()调用中传递的超时值。(请注意,我实际上尚未测试此代码,因此可能需要进行一些快速调试,我只是将其直接写到GitHub窗口中。)

2.使用来自kevinburke的请求分支: https : //github.com/kevinburke/requests/tree/connect-timeout

从其文档中:https : //github.com/kevinburke/requests/blob/connect-timeout/docs/user/advanced.rst

如果为超时指定单个值,则如下所示:

r = requests.get('https://github.com', timeout=5)

超时值将同时应用于连接和读取超时。如果要单独设置值,请指定一个元组:

r = requests.get('https://github.com', timeout=(3.05, 27))

kevinburke已请求将其合并到主要请求项目中,但尚未被接受。

UPDATE: https://requests.readthedocs.io/en/master/user/advanced/#timeouts

In new version of requests:

If you specify a single value for the timeout, like this:

r = requests.get('https://github.com', timeout=5)

The timeout value will be applied to both the connect and the read timeouts. Specify a tuple if you would like to set the values separately:

r = requests.get('https://github.com', timeout=(3.05, 27))

If the remote server is very slow, you can tell Requests to wait forever for a response, by passing None as a timeout value and then retrieving a cup of coffee.

r = requests.get('https://github.com', timeout=None)

My old (probably outdated) answer (which was posted long time ago):

There are other ways to overcome this problem:

1. Use the TimeoutSauce internal class

From: https://github.com/kennethreitz/requests/issues/1928#issuecomment-35811896

import requests from requests.adapters import TimeoutSauce

class MyTimeout(TimeoutSauce):
    def __init__(self, *args, **kwargs):
        connect = kwargs.get('connect', 5)
        read = kwargs.get('read', connect)
        super(MyTimeout, self).__init__(connect=connect, read=read)

requests.adapters.TimeoutSauce = MyTimeout

This code should cause us to set the read timeout as equal to the connect timeout, which is the timeout value you pass on your Session.get() call. (Note that I haven’t actually tested this code, so it may need some quick debugging, I just wrote it straight into the GitHub window.)

2. Use a fork of requests from kevinburke: https://github.com/kevinburke/requests/tree/connect-timeout

From its documentation: https://github.com/kevinburke/requests/blob/connect-timeout/docs/user/advanced.rst

If you specify a single value for the timeout, like this:

r = requests.get('https://github.com', timeout=5)

The timeout value will be applied to both the connect and the read timeouts. Specify a tuple if you would like to set the values separately:

r = requests.get('https://github.com', timeout=(3.05, 27))

kevinburke has requested it to be merged into the main requests project, but it hasn’t been accepted yet.


回答 3

timeout = int(seconds)

由于 requests >= 2.4.0,您可以使用timeout参数,即:

requests.get('https://duckduckgo.com/', timeout=10)

注意:

timeout不是整个响应下载的时间限制;相反,exception如果服务器在超时秒内未发出响应(更确切地说,在超时秒内未在基础套接字上接收到任何字节),则引发。如果未明确指定超时,则请求不会超时。

timeout = int(seconds)

Since requests >= 2.4.0, you can use the timeout argument, i.e:

requests.get('https://duckduckgo.com/', timeout=10)

Note:

timeout is not a time limit on the entire response download; rather, an exception is raised if the server has not issued a response for timeout seconds ( more precisely, if no bytes have been received on the underlying socket for timeout seconds). If no timeout is specified explicitly, requests do not time out.


回答 4

要创建超时,您可以使用信号

解决此问题的最佳方法可能是

  1. 设置异常作为警报信号的处理程序
  2. 延迟十秒拨打警报信号
  3. 在一个try-except-finally块内调用该函数。
  4. 如果该功能超时,则将到达except块。
  5. 在finally块中,您将中止警报,因此以后不会对其进行信号处理。

这是一些示例代码:

import signal
from time import sleep

class TimeoutException(Exception):
    """ Simple Exception to be called on timeouts. """
    pass

def _timeout(signum, frame):
    """ Raise an TimeoutException.

    This is intended for use as a signal handler.
    The signum and frame arguments passed to this are ignored.

    """
    # Raise TimeoutException with system default timeout message
    raise TimeoutException()

# Set the handler for the SIGALRM signal:
signal.signal(signal.SIGALRM, _timeout)
# Send the SIGALRM signal in 10 seconds:
signal.alarm(10)

try:    
    # Do our code:
    print('This will take 11 seconds...')
    sleep(11)
    print('done!')
except TimeoutException:
    print('It timed out!')
finally:
    # Abort the sending of the SIGALRM signal:
    signal.alarm(0)

有一些注意事项:

  1. 它不是线程安全的,信号总是传递到主线程,因此您不能将其放在任何其他线程中。
  2. 在信号调度和实际代码执行之后会有一点延迟。这意味着即使仅睡眠十秒钟,该示例也会超时。

但是,所有这些都在标准python库中!除了睡眠功能导入外,它只是一个导入。如果要在许多地方使用超时,则可以轻松地将TimeoutException,_timeout和单数放在函数中,然后调用它。或者,您可以制作一个装饰器并将其放置在函数上,请参见下面链接的答案。

您还可以将其设置为“上下文管理器”,以便将其与以下with语句一起使用:

import signal
class Timeout():
    """ Timeout for use with the `with` statement. """

    class TimeoutException(Exception):
        """ Simple Exception to be called on timeouts. """
        pass

    def _timeout(signum, frame):
        """ Raise an TimeoutException.

        This is intended for use as a signal handler.
        The signum and frame arguments passed to this are ignored.

        """
        raise Timeout.TimeoutException()

    def __init__(self, timeout=10):
        self.timeout = timeout
        signal.signal(signal.SIGALRM, Timeout._timeout)

    def __enter__(self):
        signal.alarm(self.timeout)

    def __exit__(self, exc_type, exc_value, traceback):
        signal.alarm(0)
        return exc_type is Timeout.TimeoutException

# Demonstration:
from time import sleep

print('This is going to take maximum 10 seconds...')
with Timeout(10):
    sleep(15)
    print('No timeout?')
print('Done')

这种上下文管理器方法的一个缺点是您无法知道代码是否实际超时。

资料来源和推荐读物:

To create a timeout you can use signals.

The best way to solve this case is probably to

  1. Set an exception as the handler for the alarm signal
  2. Call the alarm signal with a ten second delay
  3. Call the function inside a try-except-finally block.
  4. The except block is reached if the function timed out.
  5. In the finally block you abort the alarm, so it’s not singnaled later.

Here is some example code:

import signal
from time import sleep

class TimeoutException(Exception):
    """ Simple Exception to be called on timeouts. """
    pass

def _timeout(signum, frame):
    """ Raise an TimeoutException.

    This is intended for use as a signal handler.
    The signum and frame arguments passed to this are ignored.

    """
    # Raise TimeoutException with system default timeout message
    raise TimeoutException()

# Set the handler for the SIGALRM signal:
signal.signal(signal.SIGALRM, _timeout)
# Send the SIGALRM signal in 10 seconds:
signal.alarm(10)

try:    
    # Do our code:
    print('This will take 11 seconds...')
    sleep(11)
    print('done!')
except TimeoutException:
    print('It timed out!')
finally:
    # Abort the sending of the SIGALRM signal:
    signal.alarm(0)

There are some caveats to this:

  1. It is not threadsafe, signals are always delivered to the main thread, so you can’t put this in any other thread.
  2. There is a slight delay after the scheduling of the signal and the execution of the actual code. This means that the example would time out even if it only slept for ten seconds.

But, it’s all in the standard python library! Except for the sleep function import it’s only one import. If you are going to use timeouts many places You can easily put the TimeoutException, _timeout and the singaling in a function and just call that. Or you can make a decorator and put it on functions, see the answer linked below.

You can also set this up as a “context manager” so you can use it with the with statement:

import signal
class Timeout():
    """ Timeout for use with the `with` statement. """

    class TimeoutException(Exception):
        """ Simple Exception to be called on timeouts. """
        pass

    def _timeout(signum, frame):
        """ Raise an TimeoutException.

        This is intended for use as a signal handler.
        The signum and frame arguments passed to this are ignored.

        """
        raise Timeout.TimeoutException()

    def __init__(self, timeout=10):
        self.timeout = timeout
        signal.signal(signal.SIGALRM, Timeout._timeout)

    def __enter__(self):
        signal.alarm(self.timeout)

    def __exit__(self, exc_type, exc_value, traceback):
        signal.alarm(0)
        return exc_type is Timeout.TimeoutException

# Demonstration:
from time import sleep

print('This is going to take maximum 10 seconds...')
with Timeout(10):
    sleep(15)
    print('No timeout?')
print('Done')

One possible down side with this context manager approach is that you can’t know if the code actually timed out or not.

Sources and recommended reading:


回答 5

尝试使用超时和错误处理此请求:

import requests
try: 
    url = "http://google.com"
    r = requests.get(url, timeout=10)
except requests.exceptions.Timeout as e: 
    print e

Try this request with timeout & error handling:

import requests
try: 
    url = "http://google.com"
    r = requests.get(url, timeout=10)
except requests.exceptions.Timeout as e: 
    print e

回答 6

设置stream=True和使用r.iter_content(1024)。是的,eventlet.Timeout只是对我不起作用。

try:
    start = time()
    timeout = 5
    with get(config['source']['online'], stream=True, timeout=timeout) as r:
        r.raise_for_status()
        content = bytes()
        content_gen = r.iter_content(1024)
        while True:
            if time()-start > timeout:
                raise TimeoutError('Time out! ({} seconds)'.format(timeout))
            try:
                content += next(content_gen)
            except StopIteration:
                break
        data = content.decode().split('\n')
        if len(data) in [0, 1]:
            raise ValueError('Bad requests data')
except (exceptions.RequestException, ValueError, IndexError, KeyboardInterrupt,
        TimeoutError) as e:
    print(e)
    with open(config['source']['local']) as f:
        data = [line.strip() for line in f.readlines()]

讨论在这里https://redd.it/80kp1h

Set stream=True and use r.iter_content(1024). Yes, eventlet.Timeout just somehow doesn’t work for me.

try:
    start = time()
    timeout = 5
    with get(config['source']['online'], stream=True, timeout=timeout) as r:
        r.raise_for_status()
        content = bytes()
        content_gen = r.iter_content(1024)
        while True:
            if time()-start > timeout:
                raise TimeoutError('Time out! ({} seconds)'.format(timeout))
            try:
                content += next(content_gen)
            except StopIteration:
                break
        data = content.decode().split('\n')
        if len(data) in [0, 1]:
            raise ValueError('Bad requests data')
except (exceptions.RequestException, ValueError, IndexError, KeyboardInterrupt,
        TimeoutError) as e:
    print(e)
    with open(config['source']['local']) as f:
        data = [line.strip() for line in f.readlines()]

The discussion is here https://redd.it/80kp1h


回答 7

这可能有点过分,但是Celery分布式任务队列对超时有很好的支持。

特别是,您可以定义一个软时间限制,它仅会在您的过程中引发异常(以便您可以清理)和/或一个硬时间限制,当超过该时间限制时,该硬时间限制将终止任务。

在幕后,它使用与“之前”帖子中引用的信号方法相同,但以更易用和易管理的方式。而且,如果您要监视的网站列表很长,您可能会受益于其主要功能-各种方式来管理大量任务的执行。

This may be overkill, but the Celery distributed task queue has good support for timeouts.

In particular, you can define a soft time limit that just raises an exception in your process (so you can clean up) and/or a hard time limit that terminates the task when the time limit has been exceeded.

Under the covers, this uses the same signals approach as referenced in your “before” post, but in a more usable and manageable way. And if the list of web sites you are monitoring is long, you might benefit from its primary feature — all kinds of ways to manage the execution of a large number of tasks.


回答 8

我相信您可以使用multiprocessing而不依赖第三方套餐:

import multiprocessing
import requests

def call_with_timeout(func, args, kwargs, timeout):
    manager = multiprocessing.Manager()
    return_dict = manager.dict()

    # define a wrapper of `return_dict` to store the result.
    def function(return_dict):
        return_dict['value'] = func(*args, **kwargs)

    p = multiprocessing.Process(target=function, args=(return_dict,))
    p.start()

    # Force a max. `timeout` or wait for the process to finish
    p.join(timeout)

    # If thread is still active, it didn't finish: raise TimeoutError
    if p.is_alive():
        p.terminate()
        p.join()
        raise TimeoutError
    else:
        return return_dict['value']

call_with_timeout(requests.get, args=(url,), kwargs={'timeout': 10}, timeout=60)

传递给kwargs的超时是从服务器获取任何响应timeout的超时,自变量是获取完整响应的超时。

I believe you can use multiprocessing and not depend on a 3rd party package:

import multiprocessing
import requests

def call_with_timeout(func, args, kwargs, timeout):
    manager = multiprocessing.Manager()
    return_dict = manager.dict()

    # define a wrapper of `return_dict` to store the result.
    def function(return_dict):
        return_dict['value'] = func(*args, **kwargs)

    p = multiprocessing.Process(target=function, args=(return_dict,))
    p.start()

    # Force a max. `timeout` or wait for the process to finish
    p.join(timeout)

    # If thread is still active, it didn't finish: raise TimeoutError
    if p.is_alive():
        p.terminate()
        p.join()
        raise TimeoutError
    else:
        return return_dict['value']

call_with_timeout(requests.get, args=(url,), kwargs={'timeout': 10}, timeout=60)

The timeout passed to kwargs is the timeout to get any response from the server, the argument timeout is the timeout to get the complete response.


回答 9

超时=(连接超时,数据读取超时)或给出一个参数(超时= 1)

import requests

try:
    req = requests.request('GET', 'https://www.google.com',timeout=(1,1))
    print(req)
except requests.ReadTimeout:
    print("READ TIME OUT")

timeout = (connection timeout, data read timeout) or give a single argument(timeout=1)

import requests

try:
    req = requests.request('GET', 'https://www.google.com',timeout=(1,1))
    print(req)
except requests.ReadTimeout:
    print("READ TIME OUT")

回答 10

此代码适用于socketError 11004和10060 …

# -*- encoding:UTF-8 -*-
__author__ = 'ACE'
import requests
from PyQt4.QtCore import *
from PyQt4.QtGui import *


class TimeOutModel(QThread):
    Existed = pyqtSignal(bool)
    TimeOut = pyqtSignal()

    def __init__(self, fun, timeout=500, parent=None):
        """
        @param fun: function or lambda
        @param timeout: ms
        """
        super(TimeOutModel, self).__init__(parent)
        self.fun = fun

        self.timeer = QTimer(self)
        self.timeer.setInterval(timeout)
        self.timeer.timeout.connect(self.time_timeout)
        self.Existed.connect(self.timeer.stop)
        self.timeer.start()

        self.setTerminationEnabled(True)

    def time_timeout(self):
        self.timeer.stop()
        self.TimeOut.emit()
        self.quit()
        self.terminate()

    def run(self):
        self.fun()


bb = lambda: requests.get("http://ipv4.download.thinkbroadband.com/1GB.zip")

a = QApplication([])

z = TimeOutModel(bb, 500)
print 'timeout'

a.exec_()

this code working for socketError 11004 and 10060……

# -*- encoding:UTF-8 -*-
__author__ = 'ACE'
import requests
from PyQt4.QtCore import *
from PyQt4.QtGui import *


class TimeOutModel(QThread):
    Existed = pyqtSignal(bool)
    TimeOut = pyqtSignal()

    def __init__(self, fun, timeout=500, parent=None):
        """
        @param fun: function or lambda
        @param timeout: ms
        """
        super(TimeOutModel, self).__init__(parent)
        self.fun = fun

        self.timeer = QTimer(self)
        self.timeer.setInterval(timeout)
        self.timeer.timeout.connect(self.time_timeout)
        self.Existed.connect(self.timeer.stop)
        self.timeer.start()

        self.setTerminationEnabled(True)

    def time_timeout(self):
        self.timeer.stop()
        self.TimeOut.emit()
        self.quit()
        self.terminate()

    def run(self):
        self.fun()


bb = lambda: requests.get("http://ipv4.download.thinkbroadband.com/1GB.zip")

a = QApplication([])

z = TimeOutModel(bb, 500)
print 'timeout'

a.exec_()

回答 11

尽管存在与请求有关的问题,但我发现使用pycurl CURLOPT_TIMEOUT或CURLOPT_TIMEOUT_MS 非常容易。

无需线程或信令:

import pycurl
import StringIO

url = 'http://www.example.com/example.zip'
timeout_ms = 1000
raw = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.TIMEOUT_MS, timeout_ms)  # total timeout in milliseconds
c.setopt(pycurl.WRITEFUNCTION, raw.write)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPGET, 1)
try:
    c.perform()
except pycurl.error:
    traceback.print_exc() # error generated on timeout
    pass # or just pass if you don't want to print the error

Despite the question being about requests, I find this very easy to do with pycurl CURLOPT_TIMEOUT or CURLOPT_TIMEOUT_MS.

No threading or signaling required:

import pycurl
import StringIO

url = 'http://www.example.com/example.zip'
timeout_ms = 1000
raw = StringIO.StringIO()
c = pycurl.Curl()
c.setopt(pycurl.TIMEOUT_MS, timeout_ms)  # total timeout in milliseconds
c.setopt(pycurl.WRITEFUNCTION, raw.write)
c.setopt(pycurl.NOSIGNAL, 1)
c.setopt(pycurl.URL, url)
c.setopt(pycurl.HTTPGET, 1)
try:
    c.perform()
except pycurl.error:
    traceback.print_exc() # error generated on timeout
    pass # or just pass if you don't want to print the error

回答 12

如果您使用该选项stream=True,则可以执行以下操作:

r = requests.get(
    'http://url_to_large_file',
    timeout=1,  # relevant only for underlying socket
    stream=True)

with open('/tmp/out_file.txt'), 'wb') as f:
    start_time = time.time()
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:  # filter out keep-alive new chunks
            f.write(chunk)
        if time.time() - start_time > 8:
            raise Exception('Request took longer than 8s')

该解决方案不需要信号或多处理。

In case you’re using the option stream=True you can do this:

r = requests.get(
    'http://url_to_large_file',
    timeout=1,  # relevant only for underlying socket
    stream=True)

with open('/tmp/out_file.txt'), 'wb') as f:
    start_time = time.time()
    for chunk in r.iter_content(chunk_size=1024):
        if chunk:  # filter out keep-alive new chunks
            f.write(chunk)
        if time.time() - start_time > 8:
            raise Exception('Request took longer than 8s')

The solution does not need signals or multiprocessing.


回答 13

只是另一个解决方案(可从http://docs.python-requests.org/en/master/user/advanced/#streaming-uploads获得

上传之前,您可以确定内容大小:

TOO_LONG = 10*1024*1024  # 10 Mb
big_url = "http://ipv4.download.thinkbroadband.com/1GB.zip"
r = requests.get(big_url, stream=True)
print (r.headers['content-length'])
# 1073741824  

if int(r.headers['content-length']) < TOO_LONG:
    # upload content:
    content = r.content

但是请注意,发件人可能在“内容长度”响应字段中设置了错误的值。

Just another one solution (got it from http://docs.python-requests.org/en/master/user/advanced/#streaming-uploads)

Before upload you can find out the content size:

TOO_LONG = 10*1024*1024  # 10 Mb
big_url = "http://ipv4.download.thinkbroadband.com/1GB.zip"
r = requests.get(big_url, stream=True)
print (r.headers['content-length'])
# 1073741824  

if int(r.headers['content-length']) < TOO_LONG:
    # upload content:
    content = r.content

But be careful, a sender can set up incorrect value in the ‘content-length’ response field.


回答 14

如果是这样,请创建一个监视程序线程,该线程在10秒后会弄乱请求的内部状态,例如:

  • 关闭底层套接字,理想情况下
  • 如果请求重试该操作,则会触发异常

请注意,根据系统库,您可能无法设置DNS解析的截止日期。

If it comes to that, create a watchdog thread that messes up requests’ internal state after 10 seconds, e.g.:

  • closes the underlying socket, and ideally
  • triggers an exception if requests retries the operation

Note that depending on the system libraries you may be unable to set deadline on DNS resolution.


回答 15

好吧,我在此页面上尝试了许多解决方案,但仍然面临不稳定,随机挂起,连接性能差的问题。

我现在正在使用Curl,即使实现如此差劲,我也很高兴它具有“最大时间”功能和全局性能:

content=commands.getoutput('curl -m6 -Ss "http://mywebsite.xyz"')

在这里,我定义了一个6秒的最大时间参数,包括连接和传输时间。

我确定Curl有一个不错的python绑定,如果您更喜欢使用pythonic语法:)

Well, I tried many solutions on this page and still faced instabilities, random hangs, poor connections performance.

I’m now using Curl and i’m really happy about it’s “max time” functionnality and about the global performances, even with such a poor implementation :

content=commands.getoutput('curl -m6 -Ss "http://mywebsite.xyz"')

Here, I defined a 6 seconds max time parameter, englobing both connection and transfer time.

I’m sure Curl has a nice python binding, if you prefer to stick to the pythonic syntax :)


回答 16

有一个名为timeout-decorator的程序包,您可以使用它来使任何python函数超时。

@timeout_decorator.timeout(5)
def mytest():
    print("Start")
    for i in range(1,10):
        time.sleep(1)
        print("{} seconds have passed".format(i))

它使用一些此处建议的信号方法。另外,您可以告诉它使用多处理而不是信号(例如,如果您处于多线程环境中)。

There is a package called timeout-decorator that you can use to time out any python function.

@timeout_decorator.timeout(5)
def mytest():
    print("Start")
    for i in range(1,10):
        time.sleep(1)
        print("{} seconds have passed".format(i))

It uses the signals approach that some answers here suggest. Alternatively, you can tell it to use multiprocessing instead of signals (e.g. if you are in a multi-thread environment).


回答 17

我正在使用请求2.2.1,eventlet不适用于我。相反,我可以使用gevent超时,因为在我的服务中将gevent用于gunicorn。

import gevent
import gevent.monkey
gevent.monkey.patch_all(subprocess=True)
try:
    with gevent.Timeout(5):
        ret = requests.get(url)
        print ret.status_code, ret.content
except gevent.timeout.Timeout as e:
    print "timeout: {}".format(e.message)

请注意,一般的异常处理不会捕获gevent.timeout.Timeout。因此,无论是显式捕获gevent.timeout.Timeout 还是传递要像这样使用的其他异常:with gevent.Timeout(5, requests.exceptions.Timeout):尽管引发此异常时未传递任何消息。

I’m using requests 2.2.1 and eventlet didn’t work for me. Instead I was able use gevent timeout instead since gevent is used in my service for gunicorn.

import gevent
import gevent.monkey
gevent.monkey.patch_all(subprocess=True)
try:
    with gevent.Timeout(5):
        ret = requests.get(url)
        print ret.status_code, ret.content
except gevent.timeout.Timeout as e:
    print "timeout: {}".format(e.message)

Please note that gevent.timeout.Timeout is not caught by general Exception handling. So either explicitly catch gevent.timeout.Timeout or pass in a different exception to be used like so: with gevent.Timeout(5, requests.exceptions.Timeout): although no message is passed when this exception is raised.


回答 18

我想出了一个更直接的解决方案,该解决方案虽然丑陋,但可以解决实际问题。它有点像这样:

resp = requests.get(some_url, stream=True)
resp.raw._fp.fp._sock.settimeout(read_timeout)
# This will load the entire response even though stream is set
content = resp.content

您可以在此处阅读完整的说明

I came up with a more direct solution that is admittedly ugly but fixes the real problem. It goes a bit like this:

resp = requests.get(some_url, stream=True)
resp.raw._fp.fp._sock.settimeout(read_timeout)
# This will load the entire response even though stream is set
content = resp.content

You can read the full explanation here


带有Python’请求’模块的代理

问题:带有Python’请求’模块的代理

简短,简单的介绍了出色的Python 请求模块。

我似乎在文档中找不到变量“代理”应包含的内容。当我发送带有标准“ IP:PORT”值的字典时,它拒绝要求2个值。所以,我猜(因为在文档中似乎没有涵盖),第一个值是ip,第二个值是端口?

文档只提到了这一点:

代理–(可选)字典到代理URL的映射协议。

所以我尝试了…我应该怎么做?

proxy = { ip: port}

在将它们放入字典之前,我应该将它们转换为某种类型吗?

r = requests.get(url,headers=headers,proxies=proxy)

Just a short, simple one about the excellent Requests module for Python.

I can’t seem to find in the documentation what the variable ‘proxies’ should contain. When I send it a dict with a standard “IP:PORT” value it rejected it asking for 2 values. So, I guess (because this doesn’t seem to be covered in the docs) that the first value is the ip and the second the port?

The docs mention this only:

proxies – (optional) Dictionary mapping protocol to the URL of the proxy.

So I tried this… what should I be doing?

proxy = { ip: port}

and should I convert these to some type before putting them in the dict?

r = requests.get(url,headers=headers,proxies=proxy)

回答 0

proxies“字典语法{"protocol":"ip:port", ...}。使用它,您可以使用httphttpsftp协议为请求指定不同(或相同)的代理:

http_proxy  = "http://10.10.1.10:3128"
https_proxy = "https://10.10.1.11:1080"
ftp_proxy   = "ftp://10.10.1.10:3128"

proxyDict = { 
              "http"  : http_proxy, 
              "https" : https_proxy, 
              "ftp"   : ftp_proxy
            }

r = requests.get(url, headers=headers, proxies=proxyDict)

requests文档推导:

参数:
method –新Request对象的方法。
url–新的Request对象的URL。

proxies–(可选)字典映射 协议代理URL


在Linux上,你也可以通过这样做HTTP_PROXYHTTPS_PROXY以及FTP_PROXY环境变量:

export HTTP_PROXY=10.10.1.10:3128
export HTTPS_PROXY=10.10.1.11:1080
export FTP_PROXY=10.10.1.10:3128

在Windows上:

set http_proxy=10.10.1.10:3128
set https_proxy=10.10.1.11:1080
set ftp_proxy=10.10.1.10:3128

谢谢,Jay指出了这一点:
语法随请求2.0.0更改。
您需要将架构添加到url:https : //2.python-requests.org/en/latest/user/advanced/#proxies

The proxies‘ dict syntax is {"protocol":"ip:port", ...}. With it you can specify different (or the same) proxie(s) for requests using http, https, and ftp protocols:

http_proxy  = "http://10.10.1.10:3128"
https_proxy = "https://10.10.1.11:1080"
ftp_proxy   = "ftp://10.10.1.10:3128"

proxyDict = { 
              "http"  : http_proxy, 
              "https" : https_proxy, 
              "ftp"   : ftp_proxy
            }

r = requests.get(url, headers=headers, proxies=proxyDict)

Deduced from the requests documentation:

Parameters:
method – method for the new Request object.
url – URL for the new Request object.

proxies – (optional) Dictionary mapping protocol to the URL of the proxy.


On linux you can also do this via the HTTP_PROXY, HTTPS_PROXY, and FTP_PROXY environment variables:

export HTTP_PROXY=10.10.1.10:3128
export HTTPS_PROXY=10.10.1.11:1080
export FTP_PROXY=10.10.1.10:3128

On Windows:

set http_proxy=10.10.1.10:3128
set https_proxy=10.10.1.11:1080
set ftp_proxy=10.10.1.10:3128

Thanks, Jay for pointing this out:
The syntax changed with requests 2.0.0.
You’ll need to add a schema to the url: https://2.python-requests.org/en/latest/user/advanced/#proxies


回答 1

我发现urllib有一些非常好的代码来选取系统的代理设置,而且它们恰好采用直接使用的正确形式。您可以这样使用:

import urllib

...
r = requests.get('http://example.org', proxies=urllib.request.getproxies())

它确实运行良好,并且urllib知道如何获取Mac OS X和Windows设置。

I have found that urllib has some really good code to pick up the system’s proxy settings and they happen to be in the correct form to use directly. You can use this like:

import urllib

...
r = requests.get('http://example.org', proxies=urllib.request.getproxies())

It works really well and urllib knows about getting Mac OS X and Windows settings as well.


回答 2

您可以在此处参考代理文档

如果需要使用代理,则可以使用任何请求方法的proxies参数配置单个请求:

import requests

proxies = {
  "http": "http://10.10.1.10:3128",
  "https": "https://10.10.1.10:1080",
}

requests.get("http://example.org", proxies=proxies)

要将HTTP Basic Auth与您的代理一起使用,请使用http:// user:password@host.com/语法:

proxies = {
    "http": "http://user:pass@10.10.1.10:3128/"
}

You can refer to the proxy documentation here.

If you need to use a proxy, you can configure individual requests with the proxies argument to any request method:

import requests

proxies = {
  "http": "http://10.10.1.10:3128",
  "https": "https://10.10.1.10:1080",
}

requests.get("http://example.org", proxies=proxies)

To use HTTP Basic Auth with your proxy, use the http://user:password@host.com/ syntax:

proxies = {
    "http": "http://user:pass@10.10.1.10:3128/"
}

回答 3

可接受的答案对我来说是一个好的开始,但是我不断遇到以下错误:

AssertionError: Not supported proxy scheme None

解决此问题的方法是在代理url中指定http://,从而:

http_proxy  = "http://194.62.145.248:8080"
https_proxy  = "https://194.62.145.248:8080"
ftp_proxy   = "10.10.1.10:3128"

proxyDict = {
              "http"  : http_proxy,
              "https" : https_proxy,
              "ftp"   : ftp_proxy
            }

我想知道为什么原始作品对某些人有用,但对我却不有用。

编辑:我看到主要的答案现在已更新以反映此:)

The accepted answer was a good start for me, but I kept getting the following error:

AssertionError: Not supported proxy scheme None

Fix to this was to specify the http:// in the proxy url thus:

http_proxy  = "http://194.62.145.248:8080"
https_proxy  = "https://194.62.145.248:8080"
ftp_proxy   = "10.10.1.10:3128"

proxyDict = {
              "http"  : http_proxy,
              "https" : https_proxy,
              "ftp"   : ftp_proxy
            }

I’d be interested as to why the original works for some people but not me.

Edit: I see the main answer is now updated to reflect this :)


回答 4

如果您要坚持使用cookie和会话数据,则最好这样做:

import requests

proxies = {
    'http': 'http://user:pass@10.10.1.0:3128',
    'https': 'https://user:pass@10.10.1.0:3128',
}

# Create the session and set the proxies.
s = requests.Session()
s.proxies = proxies

# Make the HTTP request through the session.
r = s.get('http://www.showmemyip.com/')

If you’d like to persisist cookies and session data, you’d best do it like this:

import requests

proxies = {
    'http': 'http://user:pass@10.10.1.0:3128',
    'https': 'https://user:pass@10.10.1.0:3128',
}

# Create the session and set the proxies.
s = requests.Session()
s.proxies = proxies

# Make the HTTP request through the session.
r = s.get('http://www.showmemyip.com/')

回答 5

晚了8年。但我喜欢:

import os
import requests

os.environ['HTTP_PROXY'] = os.environ['http_proxy'] = 'http://http-connect-proxy:3128/'
os.environ['HTTPS_PROXY'] = os.environ['https_proxy'] = 'http://http-connect-proxy:3128/'
os.environ['NO_PROXY'] = os.environ['no_proxy'] = '127.0.0.1,localhost,.local'

r = requests.get('https://example.com')  # , verify=False

8 years late. But I like:

import os
import requests

os.environ['HTTP_PROXY'] = os.environ['http_proxy'] = 'http://http-connect-proxy:3128/'
os.environ['HTTPS_PROXY'] = os.environ['https_proxy'] = 'http://http-connect-proxy:3128/'
os.environ['NO_PROXY'] = os.environ['no_proxy'] = '127.0.0.1,localhost,.local'

r = requests.get('https://example.com')  # , verify=False

回答 6

这是我的python基本类,带有一些代理配置和秒表的requests模块!

import requests
import time
class BaseCheck():
    def __init__(self, url):
        self.http_proxy  = "http://user:pw@proxy:8080"
        self.https_proxy = "http://user:pw@proxy:8080"
        self.ftp_proxy   = "http://user:pw@proxy:8080"
        self.proxyDict = {
                      "http"  : self.http_proxy,
                      "https" : self.https_proxy,
                      "ftp"   : self.ftp_proxy
                    }
        self.url = url
        def makearr(tsteps):
            global stemps
            global steps
            stemps = {}
            for step in tsteps:
                stemps[step] = { 'start': 0, 'end': 0 }
            steps = tsteps
        makearr(['init','check'])
        def starttime(typ = ""):
            for stemp in stemps:
                if typ == "":
                    stemps[stemp]['start'] = time.time()
                else:
                    stemps[stemp][typ] = time.time()
        starttime()
    def __str__(self):
        return str(self.url)
    def getrequests(self):
        g=requests.get(self.url,proxies=self.proxyDict)
        print g.status_code
        print g.content
        print self.url
        stemps['init']['end'] = time.time()
        #print stemps['init']['end'] - stemps['init']['start']
        x= stemps['init']['end'] - stemps['init']['start']
        print x


test=BaseCheck(url='http://google.com')
test.getrequests()

here is my basic class in python for the requests module with some proxy configs and stopwatch !

import requests
import time
class BaseCheck():
    def __init__(self, url):
        self.http_proxy  = "http://user:pw@proxy:8080"
        self.https_proxy = "http://user:pw@proxy:8080"
        self.ftp_proxy   = "http://user:pw@proxy:8080"
        self.proxyDict = {
                      "http"  : self.http_proxy,
                      "https" : self.https_proxy,
                      "ftp"   : self.ftp_proxy
                    }
        self.url = url
        def makearr(tsteps):
            global stemps
            global steps
            stemps = {}
            for step in tsteps:
                stemps[step] = { 'start': 0, 'end': 0 }
            steps = tsteps
        makearr(['init','check'])
        def starttime(typ = ""):
            for stemp in stemps:
                if typ == "":
                    stemps[stemp]['start'] = time.time()
                else:
                    stemps[stemp][typ] = time.time()
        starttime()
    def __str__(self):
        return str(self.url)
    def getrequests(self):
        g=requests.get(self.url,proxies=self.proxyDict)
        print g.status_code
        print g.content
        print self.url
        stemps['init']['end'] = time.time()
        #print stemps['init']['end'] - stemps['init']['start']
        x= stemps['init']['end'] - stemps['init']['start']
        print x


test=BaseCheck(url='http://google.com')
test.getrequests()

回答 7

我只是做了一个代理抓取器,也可以与相同的抓取代理连接,而无需任何输入,这里是:

#Import Modules

from termcolor import colored
from selenium import webdriver
import requests
import os
import sys
import time

#Proxy Grab

options = webdriver.ChromeOptions()
options.add_argument('headless')
driver = webdriver.Chrome(chrome_options=options)
driver.get("https://www.sslproxies.org/")
tbody = driver.find_element_by_tag_name("tbody")
cell = tbody.find_elements_by_tag_name("tr")
for column in cell:

        column = column.text.split(" ")
        print(colored(column[0]+":"+column[1],'yellow'))
driver.quit()
print("")

os.system('clear')
os.system('cls')

#Proxy Connection

print(colored('Getting Proxies from graber...','green'))
time.sleep(2)
os.system('clear')
os.system('cls')
proxy = {"http": "http://"+ column[0]+":"+column[1]}
url = 'https://mobile.facebook.com/login'
r = requests.get(url,  proxies=proxy)
print("")
print(colored('Connecting using proxy' ,'green'))
print("")
sts = r.status_code

i just made a proxy graber and also can connect with same grabed proxy without any input here is :

#Import Modules

from termcolor import colored
from selenium import webdriver
import requests
import os
import sys
import time

#Proxy Grab

options = webdriver.ChromeOptions()
options.add_argument('headless')
driver = webdriver.Chrome(chrome_options=options)
driver.get("https://www.sslproxies.org/")
tbody = driver.find_element_by_tag_name("tbody")
cell = tbody.find_elements_by_tag_name("tr")
for column in cell:

        column = column.text.split(" ")
        print(colored(column[0]+":"+column[1],'yellow'))
driver.quit()
print("")

os.system('clear')
os.system('cls')

#Proxy Connection

print(colored('Getting Proxies from graber...','green'))
time.sleep(2)
os.system('clear')
os.system('cls')
proxy = {"http": "http://"+ column[0]+":"+column[1]}
url = 'https://mobile.facebook.com/login'
r = requests.get(url,  proxies=proxy)
print("")
print(colored('Connecting using proxy' ,'green'))
print("")
sts = r.status_code

回答 8

有点晚了,但这是一个包装器类,它简化了抓取代理,然后进行了HTTP POST或GET:

代理请求

https://github.com/rootVIII/proxy_requests

It’s a bit late but here is a wrapper class that simplifies scraping proxies and then making an http POST or GET:

ProxyRequests

https://github.com/rootVIII/proxy_requests

回答 9

我共享一些代码,该代码介绍如何从“ https://free-proxy-list.net”站点获取代理并将数据存储到与“ Elite Proxy Switcher”(格式为IP:PORT)这样的工具兼容的文件中:

## PROXY_UPDATER-从https://free-proxy-list.net/获取免费代理

from lxml.html import fromstring
import requests
from itertools import cycle
import traceback
import re

######################FIND PROXIES#########################################
def get_proxies():
    url = 'https://free-proxy-list.net/'
    response = requests.get(url)
    parser = fromstring(response.text)
    proxies = set()
    for i in parser.xpath('//tbody/tr')[:299]:   #299 proxies max
        proxy = ":".join([i.xpath('.//td[1]/text()') 
        [0],i.xpath('.//td[2]/text()')[0]])
        proxies.add(proxy)
    return proxies



######################write to file in format   IP:PORT######################
try:
    proxies = get_proxies()
    f=open('proxy_list.txt','w')
    for proxy in proxies:
        f.write(proxy+'\n')
    f.close()
    print ("DONE")
except:
    print ("MAJOR ERROR")

I share some code how to fetch proxies from the site “https://free-proxy-list.net” and store data to a file compatible with tools like “Elite Proxy Switcher”(format IP:PORT):

##PROXY_UPDATER – get free proxies from https://free-proxy-list.net/

from lxml.html import fromstring
import requests
from itertools import cycle
import traceback
import re

######################FIND PROXIES#########################################
def get_proxies():
    url = 'https://free-proxy-list.net/'
    response = requests.get(url)
    parser = fromstring(response.text)
    proxies = set()
    for i in parser.xpath('//tbody/tr')[:299]:   #299 proxies max
        proxy = ":".join([i.xpath('.//td[1]/text()') 
        [0],i.xpath('.//td[2]/text()')[0]])
        proxies.add(proxy)
    return proxies



######################write to file in format   IP:PORT######################
try:
    proxies = get_proxies()
    f=open('proxy_list.txt','w')
    for proxy in proxies:
        f.write(proxy+'\n')
    f.close()
    print ("DONE")
except:
    print ("MAJOR ERROR")

如何离线安装软件包?

问题:如何离线安装软件包?

从pypi下载python软件包的最佳方法是什么,以便从另一台计算机上脱机安装?有什么简单的方法可以通过pip或easy_install来做到这一点?我正在尝试在未连接到Internet的FreeBSD盒上安装请求库。

What’s the best way to download a python package and it’s dependencies from pypi for offline installation on another machine? Is there any easy way to do this with pip or easy_install? I’m trying to install the requests library on a FreeBSD box that is not connected to the internet.


回答 0

如果该软件包位于PYPI上,则将其及其依赖项下载到某个本地目录。例如

$ mkdir / pypi && cd / pypi
$ ls -la
  -rw-r--r-- 1个Pavel人员237954 Apr 19 11:31 Flask-WTF-0.6.tar.gz
  -rw-r--r-- 1个Pavel员工389741 2月22日17:10 Jinja2-2.6.tar.gz
  -rw-r--r-- 1个Pavel人员70305 Apr 11 00:28 MySQL-python-1.2.3.tar.gz
  -rw-r--r-- 1个Pavel人员2597214 Apr 10 18:26 SQLAlchemy-0.7.6.tar.gz
  -rw-r--r-- 1个Pavel员工1108056 2月22日17:10 Werkzeug-0.8.2.tar.gz
  -rw-r--r-- 1个Pavel员工488207 Apr 10 18:26 boto-2.3.0.tar.gz
  -rw-r--r-- 1个Pavel人员490192 4月16日12:00 flask-0.9-dev-2a6c80a.tar.gz

某些软件包可能必须手工存档到外观相似的tarball中。当我想要某个东西的最新版本(不稳定)时,我会做很多事情。某些软件包不在PYPI上,因此也适用于它们。

假设您在中有一个格式正确的Python应用程序~/src/myapp~/src/myapp/setup.py将会install_requires列出您/pypi目录中的一或多个内容的列表。像这样:

  install_requires=[
    'boto',
    'Flask',
    'Werkzeug',
    # and so on

如果您希望能够在拥有所有必要依赖项的情况下运行您的应用程序,同时仍然对其进行黑客攻击,则可以执行以下操作:

$ cd〜/ src / myapp
$ python setup.py开发--always-unzip --allow-hosts = None --find-links = / pypi

这样,您的应用将直接从您的源目录执行。您可以破解某些东西,然后重新运行该应用程序而无需重建任何内容。

如果要将应用程序及其依赖项安装到当前的python环境中,请执行以下操作:

$ cd〜/ src / myapp
$ easy_install --always-unzip --allow-hosts = None --find-links = / pypi。

在这两种情况下,如果/pypi目录中不存在一个或多个依赖项,构建都将失败。它不会尝试从Internet混杂地安装丢失的东西。

我强烈建议在活动的虚拟环境中调用它setup.py develop ...,以避免污染全局Python环境。(virtualenv是)几乎可以走的路。切勿在全局Python环境中安装任何东西。easy_install ...

如果您构建了应用程序的计算机与要在其上部署应用程序的计算机具有相同的体系结构,则可以将所有easy_install内容都放入其中的整个虚拟环境目录中。不过,在压缩之前,您必须使虚拟环境目录可重定位(请参见 –relocatable选项)。注意:目标计算机需要安装相同版本的Python,并且应用程序可能也已经预安装了基于C的任何依赖关系(例如,如果您依赖PIL,那么必须预安装libpng,libjpeg等) 。

If the package is on PYPI, download it and its dependencies to some local directory. E.g.

$ mkdir /pypi && cd /pypi
$ ls -la
  -rw-r--r--   1 pavel  staff   237954 Apr 19 11:31 Flask-WTF-0.6.tar.gz
  -rw-r--r--   1 pavel  staff   389741 Feb 22 17:10 Jinja2-2.6.tar.gz
  -rw-r--r--   1 pavel  staff    70305 Apr 11 00:28 MySQL-python-1.2.3.tar.gz
  -rw-r--r--   1 pavel  staff  2597214 Apr 10 18:26 SQLAlchemy-0.7.6.tar.gz
  -rw-r--r--   1 pavel  staff  1108056 Feb 22 17:10 Werkzeug-0.8.2.tar.gz
  -rw-r--r--   1 pavel  staff   488207 Apr 10 18:26 boto-2.3.0.tar.gz
  -rw-r--r--   1 pavel  staff   490192 Apr 16 12:00 flask-0.9-dev-2a6c80a.tar.gz

Some packages may have to be archived into similar looking tarballs by hand. I do it a lot when I want a more recent (less stable) version of something. Some packages aren’t on PYPI, so same applies to them.

Suppose you have a properly formed Python application in ~/src/myapp. ~/src/myapp/setup.py will have install_requires list that mentions one or more things that you have in your /pypi directory. Like so:

  install_requires=[
    'boto',
    'Flask',
    'Werkzeug',
    # and so on

If you want to be able to run your app with all the necessary dependencies while still hacking on it, you’ll do something like this:

$ cd ~/src/myapp
$ python setup.py develop --always-unzip --allow-hosts=None --find-links=/pypi

This way your app will be executed straight from your source directory. You can hack on things, and then rerun the app without rebuilding anything.

If you want to install your app and its dependencies into the current python environment, you’ll do something like this:

$ cd ~/src/myapp
$ easy_install --always-unzip --allow-hosts=None --find-links=/pypi .

In both cases, the build will fail if one or more dependencies aren’t present in /pypi directory. It won’t attempt to promiscuously install missing things from Internet.

I highly recommend to invoke setup.py develop ... and easy_install ... within an active virtual environment to avoid contaminating your global Python environment. It is (virtualenv that is) pretty much the way to go. Never install anything into global Python environment.

If the machine that you’ve built your app has same architecture as the machine on which you want to deploy it, you can simply tarball the entire virtual environment directory into which you easy_install-ed everything. Just before tarballing though, you must make the virtual environment directory relocatable (see –relocatable option). NOTE: the destination machine needs to have the same version of Python installed, and also any C-based dependencies your app may have must be preinstalled there too (e.g. say if you depend on PIL, then libpng, libjpeg, etc must be preinstalled).


回答 1

pip download命令使您无需安装即可下载软件包:

pip download -r requirements.txt

(在以前的pip版本中,拼写为 pip install --download -r requirements.txt。)

然后,您可以使用它们pip install --no-index --find-links /path/to/download/dir/ -r requirements.txt来安装那些下载的sdist,而无需访问网络。

The pip download command lets you download packages without installing them:

pip download -r requirements.txt

(In previous versions of pip, this was spelled pip install --download -r requirements.txt.)

Then you can use pip install --no-index --find-links /path/to/download/dir/ -r requirements.txt to install those downloaded sdists, without accessing the network.


回答 2

如果要脱机安装python库及其依赖项,请在具有相同操作系统,网络连接和python安装的机器上完成以下步骤:

1)创建一个requirements.txt内容相似的文件(注意-这些是您要下载的库):

Flask==0.12
requests>=2.7.0
scikit-learn==0.19.1
numpy==1.14.3
pandas==0.22.0

创建需求文件的一种方法是使用pip freeze > requirements.txt。这将列出您环境中的所有库。那你可以进去requirements.txt并删除不需要的对象。

2)执行命令 mkdir wheelhouse && pip download -r requirements.txt -d wheelhouse以将库及其依赖项下载到目录wheelhouse

3)将requirements.txt复制到 wheelhouse目录中

4)存档驾驶室成wheelhouse.tar.gztar -zcf wheelhouse.tar.gz wheelhouse

然后上传wheelhouse.tar.gz到目标计算机:

1)执行tar -zxf wheelhouse.tar.gz以提取文件

2)执行pip install -r wheelhouse/requirements.txt --no-index --find-links wheelhouse以安装库及其依赖项

If you want install python libs and their dependencies offline, finish following these steps on a machine with the same os, network connected, and python installed:

1) Create a requirements.txt file with similar content (Note – these are the libraries you wish to download):

Flask==0.12
requests>=2.7.0
scikit-learn==0.19.1
numpy==1.14.3
pandas==0.22.0

One option for creating the requirements file is to use pip freeze > requirements.txt. This will list all libraries in your environment. Then you can go in to requirements.txt and remove un-needed ones.

2) Execute command mkdir wheelhouse && pip download -r requirements.txt -d wheelhouse to download libs and their dependencies to directory wheelhouse

3) Copy requirements.txt into wheelhouse directory

4) Archive wheelhouse into wheelhouse.tar.gz with tar -zcf wheelhouse.tar.gz wheelhouse

Then upload wheelhouse.tar.gz to your target machine:

1) Execute tar -zxf wheelhouse.tar.gz to extract the files

2) Execute pip install -r wheelhouse/requirements.txt --no-index --find-links wheelhouse to install the libs and their dependencies


回答 3

离线python。为此,我使用virtualenv(隔离的Python环境)

1)使用pip在线安装virtualenv:

pip install virtualenv --user

或使用whl脱机:转到此链接,下载最新版本(.whl或tar.gz),然后使用以下命令进行安装:

pip install virtualenv-15.1.0-py2.py3-none-any.whl --user

通过使用--user您不需要使用sudo pip…

2)使用virtualenv

在联机计算机上,选择带有终端的目录cd并运行以下代码:

python -m virtualenv myenv
cd myenv
source bin/activate
pip install Flask

安装所有软件包后,必须requirements.txt在virtualenv处于活动状态时生成一个,以便

pip freeze > requirements.txt

打开一个新终端并创建另一个env之类的myenv2

python -m virtualenv myenv2
cd myenv2
source bin/activate
cd -
ls

现在,您可以转到您的requirements.txttranferred_packages文件夹所在的脱机文件夹。使用以下代码下载软件包,并将其全部放入tranferred_packages文件夹。

pip download -r requirements.txt

将您的脱机文件夹移至脱机计算机,然后

python -m virtualenv myenv2
cd myenv2
source bin/activate
cd -
cd offline
pip install --no-index --find-links="./tranferred_packages" -r requirements.txt

离线文件夹中的内容[requirements.txt,tranferred_pa​​ckages {Flask-0.10.1.tar.gz,…}]

检查包裹清单

pip list

注意:与2017年一样,最好使用python3。您可以使用此命令创建python 3 virtualenv。

virtualenv -p python3 envname

offline python. for doing this I use virtualenv (isolated Python environment)

1) install virtualenv online with pip:

pip install virtualenv --user

or offline with whl: go to this link , download last version (.whl or tar.gz) and install that with this command:

pip install virtualenv-15.1.0-py2.py3-none-any.whl --user

by using --user you don’t need to use sudo pip….

2) use virtualenv

on online machine select a directory with terminal cd and run this code:

python -m virtualenv myenv
cd myenv
source bin/activate
pip install Flask

after installing all the packages, you have to generate a requirements.txt so while your virtualenv is active, write

pip freeze > requirements.txt

open a new terminal and create another env like myenv2.

python -m virtualenv myenv2
cd myenv2
source bin/activate
cd -
ls

now you can go to your offline folder where your requirements.txt and tranferred_packages folder are in there. download the packages with following code and put all of them to tranferred_packages folder.

pip download -r requirements.txt

take your offline folder to offline computer and then

python -m virtualenv myenv2
cd myenv2
source bin/activate
cd -
cd offline
pip install --no-index --find-links="./tranferred_packages" -r requirements.txt

what is in the folder offline [requirements.txt , tranferred_packages {Flask-0.10.1.tar.gz, …}]

check list of your package

pip list

note: as we are in 2017 it is better to use python 3. you can create python 3 virtualenv with this command.

virtualenv -p python3 envname

回答 4

下载压缩包,将其转移到您的FreeBSD机器上并解压缩,然后运行python setup.py install就可以了!

编辑:只是要补充一点,您现在也可以使用pip安装tarball。

Download the tarball, transfer it to your FreeBSD machine and extract it, afterwards run python setup.py install and you’re done!

EDIT: Just to add on that, you can also install the tarballs with pip now.


回答 5

让我一步一步地完成该过程:

  1. 在连接到互联网的计算机上,创建一个文件夹。
   $ mkdir packages
   $ cd packages
  1. 打开命令提示符或shell并执行以下命令:

    假设您想要的软件包是 tensorflow

    $ pip download tensorflow

  2. 现在,在目标计算机上,复制packages文件夹并应用以下命令

  $ cd packages
  $ pip install 'tensorflow-xyz.whl' --no-index --find-links '.'

请注意,tensorflow-xyz.whl必须将替换为所需软件包的原始名称。

Let me go through the process step by step:

  1. On a computer connected to the internet, create a folder.
   $ mkdir packages
   $ cd packages
  1. open up a command prompt or shell and execute the following command:

    Suppose the package you want is tensorflow

    $ pip download tensorflow

  2. Now, on the target computer, copy the packages folder and apply the following command

  $ cd packages
  $ pip install 'tensorflow-xyz.whl' --no-index --find-links '.'

Note that the tensorflow-xyz.whl must be replaced by the original name of the required package.


回答 6

使用wheel编译包。

打包:

$ tempdir=$(mktemp -d /tmp/wheelhouse-XXXXX)
$ pip wheel -r requirements.txt --wheel-dir=$tempdir
$ cwd=`pwd`
$ (cd "$tempdir"; tar -cjvf "$cwd/bundled.tar.bz2" *)

复制tarball并安装:

$ tempdir=$(mktemp -d /tmp/wheelhouse-XXXXX)
$ (cd $tempdir; tar -xvf /path/to/bundled.tar.bz2)
$ pip install --force-reinstall --ignore-installed --upgrade --no-index --no-deps $tempdir/*

注意wheel二进制软件包不是跨机器的。

更多参考 此处:https : //pip.pypa.io/en/stable/user_guide/#installation-bundles

Using wheel compiled packages.

bundle up:

$ tempdir=$(mktemp -d /tmp/wheelhouse-XXXXX)
$ pip wheel -r requirements.txt --wheel-dir=$tempdir
$ cwd=`pwd`
$ (cd "$tempdir"; tar -cjvf "$cwd/bundled.tar.bz2" *)

copy tarball and install:

$ tempdir=$(mktemp -d /tmp/wheelhouse-XXXXX)
$ (cd $tempdir; tar -xvf /path/to/bundled.tar.bz2)
$ pip install --force-reinstall --ignore-installed --upgrade --no-index --no-deps $tempdir/*

Note wheel binary packages are not across machines.

More ref. here: https://pip.pypa.io/en/stable/user_guide/#installation-bundles


回答 7

我有一个类似的问题。而且我必须以相同的方式安装它,我们是从pypi安装的。

我做了以下事情:

  1. 创建一个目录以存储机器中所有可以访问Internet的软件包。

    mkdir -p /path/to/packages/
  2. 将所有软件包下载到路径

    pip download -r requirements.txt -d /path/to/packages
    
    Eg:- ls /root/wheelhouse/  # **/root/wheelhouse** is my **/path/to/packages/**
    total 4524
    -rw-r--r--. 1 root root   16667 May 23  2017 incremental-17.5.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   34713 Sep  1 10:21 attrs-18.2.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root 3088398 Oct 15 14:41 Twisted-18.9.0.tar.bz2
    -rw-r--r--. 1 root root  133356 Jan 28 15:58 chardet-3.0.4-py2.py3-none-any.whl
    -rw-r--r--. 1 root root  154154 Jan 28 15:58 certifi-2018.11.29-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   57987 Jan 28 15:58 requests-2.21.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   58594 Jan 28 15:58 idna-2.8-py2.py3-none-any.whl
    -rw-r--r--. 1 root root  118086 Jan 28 15:59 urllib3-1.24.1-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   47229 Jan 28 15:59 tqdm-4.30.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root    7922 Jan 28 16:13 constantly-15.1.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root  164706 Jan 28 16:14 zope.interface-4.6.0-cp27-cp27mu-manylinux1_x86_64.whl
    -rw-r--r--. 1 root root  573841 Jan 28 16:14 setuptools-40.7.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   37638 Jan 28 16:15 Automat-0.7.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   37905 Jan 28 16:15 hyperlink-18.0.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   52311 Jan 28 16:15 PyHamcrest-1.9.0-py2.py3-none-any.whl
    -rw-r--r--. 1 root root   10586 Jan 28 16:15 six-1.12.0-py2.py3-none-any.whl
  3. 压缩软件包目录,然后将其复制到没有Internet访问权限的计算机上。然后做,

    cd /path/to/packages/
    tar -cvzf packages.tar.gz .  # not the . (dot) at the end

    packages.tar.gz复制到没有Internet访问权限的目标计算机中。

  4. 在无法访问互联网的计算机上,执行以下操作(假设您将已解压缩的软件包复制到当前计算机上的/ path / to / package /中)

    cd /path/to/packages/
    tar -xvzf packages.tar.gz
    mkdir -p $HOME/.config/pip/
    
    vi $HOME/.config/pip/pip.conf

    并将以下内容粘贴并保存。

    [global]
    timeout = 10
    find-links = file:///path/to/package/
    no-cache-dir = true
    no-index = true
  5. 最后,我建议您使用某种形式的virtualenv安装软件包。

    virtualenv -p python2 venv # use python3, if you are on python3
    source ./venv/bin/activate
    pip install <package>

您应该能够下载目录/ path / to / package /中的所有模块。

注意:我只是这样做,因为我无法添加选项或更改我们安装模块的方式。不然我会做的

    pip install --no-index --find-links /path/to/download/dir/ -r requirements.txt

I had a similar problem. And i had to make it install the same way, we do from pypi.

I did the following things:

  1. Make a directory to store all the packages in the machine that have internet access.

    mkdir -p /path/to/packages/
    
  2. Download all the packages to the path

Edit: You can also try:

python3 -m pip wheel --no-cache-dir -r requirements.txt -w /path/to/packages
pip download -r requirements.txt -d /path/to/packages

Eg:- ls /root/wheelhouse/  # **/root/wheelhouse** is my **/path/to/packages/**
total 4524
-rw-r--r--. 1 root root   16667 May 23  2017 incremental-17.5.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root   34713 Sep  1 10:21 attrs-18.2.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root 3088398 Oct 15 14:41 Twisted-18.9.0.tar.bz2
-rw-r--r--. 1 root root  133356 Jan 28 15:58 chardet-3.0.4-py2.py3-none-any.whl
-rw-r--r--. 1 root root  154154 Jan 28 15:58 certifi-2018.11.29-py2.py3-none-any.whl
-rw-r--r--. 1 root root   57987 Jan 28 15:58 requests-2.21.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root   58594 Jan 28 15:58 idna-2.8-py2.py3-none-any.whl
-rw-r--r--. 1 root root  118086 Jan 28 15:59 urllib3-1.24.1-py2.py3-none-any.whl
-rw-r--r--. 1 root root   47229 Jan 28 15:59 tqdm-4.30.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root    7922 Jan 28 16:13 constantly-15.1.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root  164706 Jan 28 16:14 zope.interface-4.6.0-cp27-cp27mu-manylinux1_x86_64.whl
-rw-r--r--. 1 root root  573841 Jan 28 16:14 setuptools-40.7.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root   37638 Jan 28 16:15 Automat-0.7.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root   37905 Jan 28 16:15 hyperlink-18.0.0-py2.py3-none-any.whl
-rw-r--r--. 1 root root   52311 Jan 28 16:15 PyHamcrest-1.9.0-py2.py3-none-any.whl
 -rw-r--r--. 1 root root   10586 Jan 28 16:15 six-1.12.0-py2.py3-none-any.whl
  1. Tar the packages directory and copy it to the Machine that doesn’t have internet access. Then do,

    cd /path/to/packages/
    tar -cvzf packages.tar.gz .  # not the . (dot) at the end
    

Copy the packages.tar.gz into the destination machine that doesn’t have internet access.

  1. In the machine that doesn’t have internet access, do the following (Assuming you copied the tarred packages to /path/to/package/ in the current machine)

    cd /path/to/packages/
    tar -xvzf packages.tar.gz
    mkdir -p $HOME/.config/pip/
    vi $HOME/.config/pip/pip.conf
    

and paste the following content inside and save it.

[global]
timeout = 10
find-links = file:///path/to/package/
no-cache-dir = true
no-index = true
  1. Finally, i suggest you use, some form of virtualenv for installing the packages.

    virtualenv -p python2 venv # use python3, if you are on python3
    source ./venv/bin/activate
    pip install <package>
    

You should be able to download all the modules that are in the directory /path/to/package/.

Note: I only did this, because i couldn’t add options or change the way we install the modules. Otherwise i’d have done

pip install --no-index --find-links /path/to/download/dir/ -r requirements.txt

回答 8

对于Pip 8.1.2,您可以用于pip download -r requ.txt将软件包下载到本地计算机。

For Pip 8.1.2 you can use pip download -r requ.txt to download packages to your local machine.


在Python请求库的get方法中使用标头

问题:在Python请求库的get方法中使用标头

因此,我最近偶然发现了这个用于在Python中处理HTTP请求的强大库;在此处找到http://docs.python-requests.org/en/latest/index.html

我喜欢使用它,但是我不知道如何在我的get请求中添加标题。救命?

So I recently stumbled upon this great library for handling HTTP requests in Python; found here http://docs.python-requests.org/en/latest/index.html.

I love working with it, but I can’t figure out how to add headers to my get requests. Help?


回答 0

根据api,标头都可以使用request.get传递:

r=requests.get("http://www.example.com/", headers={"content-type":"text"})

According to the api, the headers can all be passed in using requests.get:

r=requests.get("http://www.example.com/", headers={"content-type":"text"})

回答 1

根据您链接的页面上的文档,您似乎很简单(强调我的)。

requests.get(URL,params = None,headers = None,Cookies = None,auth = None,timeout = None)

发送GET请求。返回Response对象。

参数:

  • url –新Request对象的URL 。
  • params –(可选)与一起发送的GET参数字典Request
  • 标头-(可选)与一起发送的HTTP标头字典Request
  • cookies –(可选)与一起发送的CookieJar对象 Request
  • auth –(可选)AuthObject以启用基本HTTP身份验证。
  • 超时–(可选)浮动,描述请求的超时。

Seems pretty straightforward, according to the docs on the page you linked (emphasis mine).

requests.get(url, params=None, headers=None, cookies=None, auth=None, timeout=None)

Sends a GET request. Returns Response object.

Parameters:

  • url – URL for the new Request object.
  • params – (optional) Dictionary of GET Parameters to send with the Request.
  • headers – (optional) Dictionary of HTTP Headers to send with the Request.
  • cookies – (optional) CookieJar object to send with the Request.
  • auth – (optional) AuthObject to enable Basic HTTP Auth.
  • timeout – (optional) Float describing the timeout of the request.

回答 2

这个答案告诉我,您可以为整个会话设置标题:

s = requests.Session()
s.auth = ('user', 'pass')
s.headers.update({'x-test': 'true'})

# both 'x-test' and 'x-test2' are sent
s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})

奖励:会话还可以处理Cookie。

This answer taught me that you can set headers for an entire session:

s = requests.Session()
s.auth = ('user', 'pass')
s.headers.update({'x-test': 'true'})

# both 'x-test' and 'x-test2' are sent
s.get('http://httpbin.org/headers', headers={'x-test2': 'true'})

Bonus: Sessions also handle cookies.


请求中的URL超过了最大重试次数

问题:请求中的URL超过了最大重试次数

我正在尝试获取App Store> Business的内容:

import requests
from lxml import html

page = requests.get("https://itunes.apple.com/in/genre/ios-business/id6000?mt=8")
tree = html.fromstring(page.text)

flist = []
plist = []
for i in range(0, 100):
    app = tree.xpath("//div[@class='column first']/ul/li/a/@href")
    ap = app[0]
    page1 = requests.get(ap)

当我尝试range使用(0,2)它时,但是当我使用rangein 时,100它显示此错误:

Traceback (most recent call last):
  File "/home/preetham/Desktop/eg.py", line 17, in <module>
    page1 = requests.get(ap)
  File "/usr/local/lib/python2.7/dist-packages/requests/api.py", line 55, in get
    return request('get', url, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/api.py", line 44, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/sessions.py", line 383, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/sessions.py", line 486, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/adapters.py", line 378, in send
    raise ConnectionError(e)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='itunes.apple.com', port=443): Max retries exceeded with url: /in/app/adobe-reader/id469337564?mt=8 (Caused by <class 'socket.gaierror'>: [Errno -2] Name or service not known)

I’m trying to get the content of App Store > Business:

import requests
from lxml import html

page = requests.get("https://itunes.apple.com/in/genre/ios-business/id6000?mt=8")
tree = html.fromstring(page.text)

flist = []
plist = []
for i in range(0, 100):
    app = tree.xpath("//div[@class='column first']/ul/li/a/@href")
    ap = app[0]
    page1 = requests.get(ap)

When I try the range with (0,2) it works, but when I put the range in 100s it shows this error:

Traceback (most recent call last):
  File "/home/preetham/Desktop/eg.py", line 17, in <module>
    page1 = requests.get(ap)
  File "/usr/local/lib/python2.7/dist-packages/requests/api.py", line 55, in get
    return request('get', url, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/api.py", line 44, in request
    return session.request(method=method, url=url, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/sessions.py", line 383, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/sessions.py", line 486, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/requests/adapters.py", line 378, in send
    raise ConnectionError(e)
requests.exceptions.ConnectionError: HTTPSConnectionPool(host='itunes.apple.com', port=443): Max retries exceeded with url: /in/app/adobe-reader/id469337564?mt=8 (Caused by <class 'socket.gaierror'>: [Errno -2] Name or service not known)

回答 0

这里发生的是itunes服务器拒绝您的连接(您在短时间内从同一ip地址发送了太多请求)

网址超出了最大重试次数:/ in / app / adobe-reader / id469337564?mt = 8

错误跟踪会误导您,应该是“由于目标计算机主动拒绝连接而无法建立连接”

Github上有关python.requests lib的问题,请在此处查看

要克服此问题(与其说是错误的调试跟踪,不如说是一个问题),您应该捕获与连接有关的异常,如下所示:

try:
    page1 = requests.get(ap)
except requests.exceptions.ConnectionError:
    r.status_code = "Connection refused"

解决此问题的另一种方法是,如果您使用足够的时间间隔将请求发送到服务器,则可以通过sleep(timeinsec)python中的函数来实现(不要忘记导入睡眠)

from time import sleep

总而言之,请求都是很棒的python lib,希望能解决您的问题。

What happened here is that itunes server refuses your connection (you’re sending too many requests from same ip address in short period of time)

Max retries exceeded with url: /in/app/adobe-reader/id469337564?mt=8

error trace is misleading it should be something like “No connection could be made because the target machine actively refused it”.

There is an issue at about python.requests lib at Github, check it out here

To overcome this issue (not so much an issue as it is misleading debug trace) you should catch connection related exceptions like so:

try:
    page1 = requests.get(ap)
except requests.exceptions.ConnectionError:
    r.status_code = "Connection refused"

Another way to overcome this problem is if you use enough time gap to send requests to server this can be achieved by sleep(timeinsec) function in python (don’t forget to import sleep)

from time import sleep

All in all requests is awesome python lib, hope that solves your problem.


回答 1

只需使用以下requests'功能:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry


session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

session.get(url)

这将GET是URL,如果是,将重试3次requests.exceptions.ConnectionErrorbackoff_factor将有助于在两次尝试之间施加延迟,以避免在定期请求配额的情况下再次失败。

看一下requests.packages.urllib3.util.retry.Retry,它有许多选项可以简化重试。

Just use requests' features:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry


session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)

session.get(url)

This will GET the URL and retry 3 times in case of requests.exceptions.ConnectionError. backoff_factor will help to apply delays between attempts to avoid to fail again in case of periodic request quota.

Take a look at requests.packages.urllib3.util.retry.Retry, it has many options to simplify retries.


回答 2

就是这样

粘贴以下代码代替page = requests.get(url)

import time

page = ''
while page == '':
    try:
        page = requests.get(url)
        break
    except:
        print("Connection refused by the server..")
        print("Let me sleep for 5 seconds")
        print("ZZzzzz...")
        time.sleep(5)
        print("Was a nice sleep, now let me continue...")
        continue

别客气 :)

Just do this,

Paste the following code in place of page = requests.get(url):

import time

page = ''
while page == '':
    try:
        page = requests.get(url)
        break
    except:
        print("Connection refused by the server..")
        print("Let me sleep for 5 seconds")
        print("ZZzzzz...")
        time.sleep(5)
        print("Was a nice sleep, now let me continue...")
        continue

You’re welcome :)


回答 3

pip install pyopenssl 似乎为我解决了。

https://github.com/requests/requests/issues/4246

pip install pyopenssl seemed to solve it for me.

https://github.com/requests/requests/issues/4246


回答 4

我遇到了类似的问题,但是以下代码对我有用。

url = <some REST url>    
page = requests.get(url, verify=False)

“ verify = False”禁用SSL验证。尝试捕获可以像往常一样添加。

I got similar problem but the following code worked for me.

url = <some REST url>    
page = requests.get(url, verify=False)

“verify=False” disables SSL verification. Try and catch can be added as usual.


回答 5

实施异常处理总是好的。它不仅有助于避免脚本意外退出,还可以帮助记录错误和信息通知。当使用Python请求时,我更喜欢捕获这样的异常:

    try:
        res = requests.get(adress,timeout=30)
    except requests.ConnectionError as e:
        print("OOPS!! Connection Error. Make sure you are connected to Internet. Technical Details given below.\n")
        print(str(e))            
        renewIPadress()
        continue
    except requests.Timeout as e:
        print("OOPS!! Timeout Error")
        print(str(e))
        renewIPadress()
        continue
    except requests.RequestException as e:
        print("OOPS!! General Error")
        print(str(e))
        renewIPadress()
        continue
    except KeyboardInterrupt:
        print("Someone closed the program")

这里的renewIPadress()是一个用户定义函数,如果被阻止,它可以更改IP地址。您可以不使用此功能。

It is always good to implement exception handling. It does not only help to avoid unexpected exit of script but can also help to log errors and info notification. When using Python requests I prefer to catch exceptions like this:

    try:
        res = requests.get(adress,timeout=30)
    except requests.ConnectionError as e:
        print("OOPS!! Connection Error. Make sure you are connected to Internet. Technical Details given below.\n")
        print(str(e))            
        renewIPadress()
        continue
    except requests.Timeout as e:
        print("OOPS!! Timeout Error")
        print(str(e))
        renewIPadress()
        continue
    except requests.RequestException as e:
        print("OOPS!! General Error")
        print(str(e))
        renewIPadress()
        continue
    except KeyboardInterrupt:
        print("Someone closed the program")

Here renewIPadress() is a user define function which can change the IP address if it get blocked. You can go without this function.


回答 6

在公司环境中指定代理可以为我解决问题。

page = requests.get("http://www.google.com:80", proxies={"http": "http://111.233.225.166:1234"})

完整的错误是:

requests.exceptions.ConnectionError:HTTPSConnectionPool(host =’www.google.com’,port = 80):URL超过了最大重试次数:/(由NewConnectionError(’:导致:无法建立新连接:[WinError 10060]连接尝试失败,因为连接的一方在一段时间后未正确响应,或者建立的连接失败,因为连接的主机未能响应’))

Specifying the proxy in a corporate environment solved it for me.

page = requests.get("http://www.google.com:80", proxies={"http": "http://111.233.225.166:1234"})

The full error is:

requests.exceptions.ConnectionError: HTTPSConnectionPool(host=’www.google.com’, port=80): Max retries exceeded with url: / (Caused by NewConnectionError(‘: Failed to establish a new connection: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond’))


回答 7

即使安装了pyopenssl并尝试了各种python版本(尽管在Mac上运行良好)后,我仍无法使其在Windows上运行,所以我切换到urllib,并且在python 3.6(来自python .org)和3.7(anaconda)上工作)

import urllib 
from urllib.request import urlopen
html = urlopen("http://pythonscraping.com/pages/page1.html")
contents = html.read()
print(contents)

i wasn’t able to make it work on windows even after installing pyopenssl and trying various python versions (while it worked fine on mac), so i switched to urllib and it works on python 3.6 (from python .org) and 3.7 (anaconda)

import urllib 
from urllib.request import urlopen
html = urlopen("http://pythonscraping.com/pages/page1.html")
contents = html.read()
print(contents)

回答 8

在编写硒浏览器测试脚本时,driver.quit()在使用JS api 调用之前进行调用时会遇到此错误。请记住退出网络驱动程序是最后要做的事情!

When I was writing a selenium browser test script, I encountered this error when calling driver.quit() before a usage of a JS api call.Remember that quiting webdriver is last thing to do!


回答 9

为以后遇到这种情况的人增加我自己的经验。我的具体错误是

Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known'

事实证明,这实际上是因为我已达到系统上打开文件的最大数量。它与失败的连接或指示的DNS错误无关。

Adding my own experience for those who are experiencing this in the future. My specific error was

Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known'

It turns out that this was actually because I had reach the maximum number of open files on my system. It had nothing to do with failed connections, or even a DNS error as indicated.


回答 10

添加我自己的经验:

r = requests.get(download_url)

当我尝试下载url中指定的文件时。

错误是

HTTPSConnectionPool(host, port=443): Max retries exceeded with url (Caused by SSLError(SSLError("bad handshake: Error([('SSL routines', 'tls_process_server_certificate', 'certificate verify failed')])")))

我通过添加verify = False如下函数来更正了它:

r = requests.get(download_url + filename)
open(filename, 'wb').write(r.content)

Adding my own experience :

r = requests.get(download_url)

when I tried to download a file specified in the url.

The error was

HTTPSConnectionPool(host, port=443): Max retries exceeded with url (Caused by SSLError(SSLError("bad handshake: Error([('SSL routines', 'tls_process_server_certificate', 'certificate verify failed')])")))

I corrected it by adding verify = False in the function as follows :

r = requests.get(download_url + filename)
open(filename, 'wb').write(r.content)

回答 11

添加此请求的标头。

headers={
'Referer': 'https://itunes.apple.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'
}

requests.get(ap, headers=headers)

Add headers for this request.

headers={
'Referer': 'https://itunes.apple.com',
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36'
}

requests.get(ap, headers=headers)

带有参数数据的Python请求发布

问题:带有参数数据的Python请求发布

这是对API调用的原始请求:

POST http://192.168.3.45:8080/api/v2/event/log?sessionKey=b299d17b896417a7b18f46544d40adb734240cc2&format=json HTTP/1.1
Accept-Encoding: gzip,deflate
Content-Type: application/json
Content-Length: 86
Host: 192.168.3.45:8080
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.1.1 (java 1.5)

{"eventType":"AAS_PORTAL_START","data":{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}"""

该请求返回成功(2xx)响应。

现在,我尝试使用发送此请求requests

>>> import requests
>>> headers = {'content-type' : 'application/json'}
>>> data ={"eventType":"AAS_PORTAL_START","data{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}
>>> url = "http://192.168.3.45:8080/api/v2/event/log?sessionKey=9ebbd0b25760557393a43064a92bae539d962103&format=xml&platformId=1"
>>> requests.post(url,params=data,headers=headers)
<Response [400]>

一切对我来说看起来不错,我不太确定自己张贴的错误是什么引起400响应。

This is the raw request for an API call:

POST http://192.168.3.45:8080/api/v2/event/log?sessionKey=b299d17b896417a7b18f46544d40adb734240cc2&format=json HTTP/1.1
Accept-Encoding: gzip,deflate
Content-Type: application/json
Content-Length: 86
Host: 192.168.3.45:8080
Connection: Keep-Alive
User-Agent: Apache-HttpClient/4.1.1 (java 1.5)

{"eventType":"AAS_PORTAL_START","data":{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}"""

This request returns a success (2xx) response.

Now I am trying to post this request using requests:

>>> import requests
>>> headers = {'content-type' : 'application/json'}
>>> data ={"eventType":"AAS_PORTAL_START","data{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}
>>> url = "http://192.168.3.45:8080/api/v2/event/log?sessionKey=9ebbd0b25760557393a43064a92bae539d962103&format=xml&platformId=1"
>>> requests.post(url,params=data,headers=headers)
<Response [400]>

Everything looks fine to me and I am not quite sure what I posting wrong to get a 400 response.


回答 0

params用于GET样式的URL参数,data用于POST样式的正文信息。在请求中同时提供两种类型的信息是完全合法的,您的请求也是如此,但是您已经将URL参数编码为URL。

您的原始帖子虽然包含JSON数据。requests可以为您处理JSON编码,并且也会设置正确的代码Content-Header;您需要做的就是将Python对象传递为json关键字JSON编码。

您也可以拆分URL参数:

params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

然后通过以下方式发布您的数据:

import requests

url = 'http://192.168.3.45:8080/api/v2/event/log'

data = {"eventType": "AAS_PORTAL_START", "data": {"uid": "hfe3hf45huf33545", "aid": "1", "vid": "1"}}
params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

requests.post(url, params=params, json=data)

json关键字是新的requests2.4.2版本; 如果仍然需要使用旧版本,请使用json模块手动编码JSON,然后将编码后的结果作为data密钥发布;在这种情况下,您将必须显式设置Content-Type标头:

import requests
import json

headers = {'content-type': 'application/json'}
url = 'http://192.168.3.45:8080/api/v2/event/log'

data = {"eventType": "AAS_PORTAL_START", "data": {"uid": "hfe3hf45huf33545", "aid": "1", "vid": "1"}}
params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

requests.post(url, params=params, data=json.dumps(data), headers=headers)

params is for GET-style URL parameters, data is for POST-style body information. It is perfectly legal to provide both types of information in a request, and your request does so too, but you encoded the URL parameters into the URL already.

Your raw post contains JSON data though. requests can handle JSON encoding for you, and it’ll set the correct Content-Header too; all you need to do is pass in the Python object to be encoded as JSON into the json keyword argument.

You could split out the URL parameters as well:

params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

then post your data with:

import requests

url = 'http://192.168.3.45:8080/api/v2/event/log'

data = {"eventType": "AAS_PORTAL_START", "data": {"uid": "hfe3hf45huf33545", "aid": "1", "vid": "1"}}
params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

requests.post(url, params=params, json=data)

The json keyword is new in requests version 2.4.2; if you still have to use an older version, encode the JSON manually using the json module and post the encoded result as the data key; you will have to explicitly set the Content-Type header in that case:

import requests
import json

headers = {'content-type': 'application/json'}
url = 'http://192.168.3.45:8080/api/v2/event/log'

data = {"eventType": "AAS_PORTAL_START", "data": {"uid": "hfe3hf45huf33545", "aid": "1", "vid": "1"}}
params = {'sessionKey': '9ebbd0b25760557393a43064a92bae539d962103', 'format': 'xml', 'platformId': 1}

requests.post(url, params=params, data=json.dumps(data), headers=headers)

回答 1

将数据设置为此:

data ={"eventType":"AAS_PORTAL_START","data":{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}

Set data to this:

data ={"eventType":"AAS_PORTAL_START","data":{"uid":"hfe3hf45huf33545","aid":"1","vid":"1"}}

回答 2

将响应分配给值并测试其属性。这些应该告诉您一些有用的信息。

response = requests.post(url,params=data,headers=headers)
response.status_code
response.text
  • status_code应该只重新确认您之前提供的代码

Assign the response to a value and test the attributes of it. These should tell you something useful.

response = requests.post(url,params=data,headers=headers)
response.status_code
response.text
  • status_code should just reconfirm the code you were given before, of course

带有Python请求的异步请求

问题:带有Python请求的异步请求

我尝试了python 请求库文档中提供的示例。

使用async.map(rs),我得到了响应代码,但是我想获得所请求的每个页面的内容。例如,这不起作用:

out = async.map(rs)
print out[0].content

I tried the sample provided within the documentation of the requests library for python.

With async.map(rs), I get the response codes, but I want to get the content of each page requested. This, for example, does not work:

out = async.map(rs)
print out[0].content

回答 0

注意

下面的答案是适用于请求v0.13.0 +。编写此问题后,异步功能已移至grequests。但是,您可以将其替换requestsgrequests下面的内容,它应该可以工作。

我已经留下了这个答案,以反映原始问题,该问题与使用请求<v0.13.0有关。


async.map 异步执行多个任务,您必须:

  1. 为每个对象定义一个函数(您的任务)
  2. 将该函数添加为请求中的事件挂钩
  3. 调用async.map所有请求/操作的列表

例:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

Note

The below answer is not applicable to requests v0.13.0+. The asynchronous functionality was moved to grequests after this question was written. However, you could just replace requests with grequests below and it should work.

I’ve left this answer as is to reflect the original question which was about using requests < v0.13.0.


To do multiple tasks with async.map asynchronously you have to:

  1. Define a function for what you want to do with each object (your task)
  2. Add that function as an event hook in your request
  3. Call async.map on a list of all the requests / actions

Example:

from requests import async
# If using requests > v0.13.0, use
# from grequests import async

urls = [
    'http://python-requests.org',
    'http://httpbin.org',
    'http://python-guide.org',
    'http://kennethreitz.com'
]

# A simple task to do to each response object
def do_something(response):
    print response.url

# A list to hold our things to do via async
async_list = []

for u in urls:
    # The "hooks = {..." part is where you define what you want to do
    # 
    # Note the lack of parentheses following do_something, this is
    # because the response will be used as the first argument automatically
    action_item = async.get(u, hooks = {'response' : do_something})

    # Add the task to our list of things to do via async
    async_list.append(action_item)

# Do our list of things to do via async
async.map(async_list)

回答 1

async现在是一个独立的模块:grequests

看到这里:https : //github.com/kennethreitz/grequests

那里:通过Python发送多个HTTP请求的理想方法?

安装:

$ pip install grequests

用法:

建立一个堆栈:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

发送堆栈

grequests.map(rs)

结果看起来像

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests似乎没有为并发请求设置限制,即当多个请求发送到同一服务器时。

async is now an independent module : grequests.

See here : https://github.com/kennethreitz/grequests

And there: Ideal method for sending multiple HTTP requests over Python?

installation:

$ pip install grequests

usage:

build a stack:

import grequests

urls = [
    'http://www.heroku.com',
    'http://tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://kennethreitz.com'
]

rs = (grequests.get(u) for u in urls)

send the stack

grequests.map(rs)

result looks like

[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]

grequests don’t seem to set a limitation for concurrent requests, ie when multiple requests are sent to the same server.


回答 2

我同时测试了request-futuresgrequests。Grequests速度更快,但是会带来Monkey补丁和依赖关系的其他问题。request-futures比grequests慢几倍。我决定将自己的请求简单地包装到ThreadPoolExecutor中,这几乎与grequests一样快,但是没有外部依赖项。

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

I tested both requests-futures and grequests. Grequests is faster but brings monkey patching and additional problems with dependencies. requests-futures is several times slower than grequests. I decided to write my own and simply wrapped requests into ThreadPoolExecutor and it was almost as fast as grequests, but without external dependencies.

import requests
import concurrent.futures

def get_urls():
    return ["url1","url2"]

def load_url(url, timeout):
    return requests.get(url, timeout = timeout)

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

    future_to_url = {executor.submit(load_url, url, 10): url for url in     get_urls()}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            resp_err = resp_err + 1
        else:
            resp_ok = resp_ok + 1

回答 3

也许要求-未来是另一种选择。

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

办公文件中也建议使用此功能。如果您不想参与gevent,那将是一个很好的选择。

maybe requests-futures is another choice.

from requests_futures.sessions import FuturesSession

session = FuturesSession()
# first request is started in background
future_one = session.get('http://httpbin.org/get')
# second requests is started immediately
future_two = session.get('http://httpbin.org/get?foo=bar')
# wait for the first request to complete, if it hasn't already
response_one = future_one.result()
print('response one status: {0}'.format(response_one.status_code))
print(response_one.content)
# wait for the second request to complete, if it hasn't already
response_two = future_two.result()
print('response two status: {0}'.format(response_two.status_code))
print(response_two.content)

It is also recommended in the office document. If you don’t want involve gevent, it’s a good one.


回答 4

我在发布的大多数答案中都遇到了很多问题-它们要么使用已过时的库,这些库已被移植以具有有限的功能,要么为解决方案的执行提供了太多魔力,因此难以处理错误。如果它们不属于上述类别之一,则说明它们是第三方库或已弃用。

某些解决方案完全可以在http请求中正常工作,但是对于任何其他种类的请求(这都是荒谬的),这些解决方案都不够。这里不需要高度定制的解决方案。

简单地使用python内置库asyncio足以执行任何类型的异步请求,并为复杂的和用例特定的错误处理提供足够的流动性。

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

它是如何工作的很简单。您正在创建一系列要异步执行的任务,然后要求循环执行这些任务并在完成时退出。没有多余的库,无需维护,也无需缺少功能。

I have a lot of issues with most of the answers posted – they either use deprecated libraries that have been ported over with limited features, or provide a solution with too much magic on the execution of the request, making it difficult to error handle. If they do not fall into one of the above categories, they’re 3rd party libraries or deprecated.

Some of the solutions works alright purely in http requests, but the solutions fall short for any other kind of request, which is ludicrous. A highly customized solution is not necessary here.

Simply using the python built-in library asyncio is sufficient enough to perform asynchronous requests of any type, as well as providing enough fluidity for complex and usecase specific error handling.

import asyncio

loop = asyncio.get_event_loop()

def do_thing(params):
    async def get_rpc_info_and_do_chores(id):
        # do things
        response = perform_grpc_call(id)
        do_chores(response)

    async def get_httpapi_info_and_do_chores(id):
        # do things
        response = requests.get(URL)
        do_chores(response)

    async_tasks = []
    for element in list(params.list_of_things):
       async_tasks.append(loop.create_task(get_chan_info_and_do_chores(id)))
       async_tasks.append(loop.create_task(get_httpapi_info_and_do_chores(ch_id)))

    loop.run_until_complete(asyncio.gather(*async_tasks))

How it works is simple. You’re creating a series of tasks you’d like to occur asynchronously, and then asking a loop to execute those tasks and exit upon completion. No extra libraries subject to lack of maintenance, no lack of functionality required.


回答 5

我知道这已经关闭了一段时间,但我认为推广另一个基于请求库的异步解决方案可能很有用。

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

这些文档在这里:http : //pythonhosted.org/simple-requests/

I know this has been closed for a while, but I thought it might be useful to promote another async solution built on the requests library.

list_of_requests = ['http://moop.com', 'http://doop.com', ...]

from simple_requests import Requests
for response in Requests().swarm(list_of_requests):
    print response.content

The docs are here: http://pythonhosted.org/simple-requests/


回答 6

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...
from threading import Thread

threads=list()

for requestURI in requests:
    t = Thread(target=self.openURL, args=(requestURI,))
    t.start()
    threads.append(t)

for thread in threads:
    thread.join()

...

def openURL(self, requestURI):
    o = urllib2.urlopen(requestURI, timeout = 600)
    o...

回答 7

如果你想使用ASYNCIO,然后requests-async提供异步/ AWAIT功能为requestshttps://github.com/encode/requests-async

If you want to use asyncio, then requests-async provides async/await functionality for requestshttps://github.com/encode/requests-async


回答 8

我一直在使用python请求对github的gist API进行异步调用。

有关示例,请参见此处的代码:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

这种样式的python可能不是最清晰的例子,但是我可以向您保证代码可以工作。让我知道这是否使您感到困惑,我们将对其进行记录。

I have been using python requests for async calls against github’s gist API for some time.

For an example, see the code here:

https://github.com/davidthewatson/flasgist/blob/master/views.py#L60-72

This style of python may not be the clearest example, but I can assure you that the code works. Let me know if this is confusing to you and I will document it.


回答 9

您可以使用httpx它。

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

如果您需要功能语法,则gamla lib 会将其包装到中get_async

那你可以做


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

10以秒为单位的超时时间。

(免责声明:我是它的作者)

You can use httpx for that.

import httpx

async def get_async(url):
    async with httpx.AsyncClient() as client:
        return await client.get(url)

urls = ["http://google.com", "http://wikipedia.org"]

# Note that you need an async context to use `await`.
await asyncio.gather(*map(get_async, urls))

if you want a functional syntax, the gamla lib wraps this into get_async.

Then you can do


await gamla.map(gamla.get_async(10), ["http://google.com", "http://wikipedia.org"])

The 10 is the timeout in seconds.

(disclaimer: I am its author)


回答 10

我还尝试了使用python中的异步方法进行某些操作,但是使用twist进行异步编程的运气却更好。它具有较少的问题,并且有据可查。这是一些类似于您正在尝试的东西的链接。

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html

I have also tried some things using the asynchronous methods in python, how ever I have had much better luck using twisted for asynchronous programming. It has fewer problems and is well documented. Here is a link of something simmilar to what you are trying in twisted.

http://pythonquirks.blogspot.com/2011/04/twisted-asynchronous-http-request.html