问题:什么是好的速率限制算法?

我可以使用一些伪代码,或者更好的Python。我正在尝试为Python IRC机器人实现一个限速队列,并且部分起作用,但是如果有人触发的消息少于限制(例如,限速为每8秒5条消息,而该人仅触发4条消息),并且下一个触发时间超过8秒(例如16秒后),机器人将发送消息,但队列已满,机器人将等待8秒,即使由于8秒钟的时间已过去也不需要它。

I could use some pseudo-code, or better, Python. I am trying to implement a rate-limiting queue for a Python IRC bot, and it partially works, but if someone triggers less messages than the limit (e.g., rate limit is 5 messages per 8 seconds, and the person triggers only 4), and the next trigger is over the 8 seconds (e.g., 16 seconds later), the bot sends the message, but the queue becomes full and the bot waits 8 seconds, even though it’s not needed since the 8 second period has lapsed.


回答 0

这是最简单的算法,如果您只想在消息到达太快时就丢弃它们(而不是对其进行排队,这很有意义,因为队列可能会变得任意大):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

此解决方案中没有数据结构,计时器等,它可以正常工作:)看到这一点,“津贴”最多以每秒5/8个单位的速度增长,即每八秒最多五个单位。转发的每封邮件都会扣除一个单位,因此每八秒钟发送的邮件不能超过五个。

请注意,该值rate应为整数,即不含非零的小数部分,否则该算法将无法正常工作(实际费率将不是rate/per)。例如,rate=0.5; per=1.0;它无法正常工作,因为allowance它将永远不会增长到1.0。但是rate=1.0; per=2.0;效果很好。

Here the simplest algorithm, if you want just to drop messages when they arrive too quickly (instead of queuing them, which makes sense because the queue might get arbitrarily large):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

There are no datastructures, timers etc. in this solution and it works cleanly :) To see this, ‘allowance’ grows at speed 5/8 units per seconds at most, i.e. at most five units per eight seconds. Every message that is forwarded deducts one unit, so you can’t send more than five messages per every eight seconds.

Note that rate should be an integer, i.e. without non-zero decimal part, or the algorithm won’t work correctly (actual rate will not be rate/per). E.g. rate=0.5; per=1.0; does not work because allowance will never grow to 1.0. But rate=1.0; per=2.0; works fine.


回答 1

在函数加入之前,使用此装饰器@RateLimited(ratepersec)。

基本上,这检查自上次以来是否过去了1 / rate秒,如果没有,则等待其余时间,否则不等待。这有效地限制了您的速率/秒。装饰器可以应用于您要限制速率的任何功能。

对于您的情况,如果每8秒最多需要5条消息,请在sendToQueue函数之前使用@RateLimited(0.625)。

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

Use this decorator @RateLimited(ratepersec) before your function that enqueues.

Basically, this checks if 1/rate secs have passed since the last time and if not, waits the remainder of the time, otherwise it doesn’t wait. This effectively limits you to rate/sec. The decorator can be applied to any function you want rate-limited.

In your case, if you want a maximum of 5 messages per 8 seconds, use @RateLimited(0.625) before your sendToQueue function.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

回答 2

令牌桶很容易实现。

从带有5个令牌的存储桶开始。

每5/8秒:如果存储桶中的令牌少于5个,则添加一个。

每次您要发送消息时:如果存储桶中的令牌≥1,则取出一个令牌并发送消息。否则,请等待/丢弃消息/任何内容。

(显然,在实际代码中,您将使用整数计数器代替实际的令牌,并且可以通过存储时间戳来优化每5/8秒的步长)


再次阅读问题,如果速率限制每8秒被完全重置一次,则可以进行以下修改:

last_send很久以前的某个时间(例如,纪元)开始,以一个时间戳记开始。同样,从相同的5令牌桶开始。

每5/8秒执行一次规则。

每次发送消息时:首先,检查是否last_send≥8秒。如果是这样,请填充存储桶(将其设置为5个令牌)。其次,如果存储桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,设置last_send为现在。

那应该适合那种情况。


我实际上已经使用这种策略(第一种方法)编写了IRC机器人。它在Perl中而不是Python中,但是这里有一些代码来说明:

第一部分处理将令牌添加到存储桶的过程。您可以看到基于时间(从第二行到最后一行)添加令牌的优化,然后最后一行将存储桶内容限制为最大值(MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn是一个传递的数据结构。这是在常规运行的方法中进行的(它计算下一次要执行的操作,并休眠很长时间或直到获得网络流量为止)。该方法的下一部分处理发送。这非常复杂,因为消息具有与之关联的优先级。

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

那是第一个队列,无论如何运行。即使它使我们的连接因洪灾而被杀死。用于极其重要的事情,例如响应服务器的PING。接下来,其余队列:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

最后,将存储区状态保存回$ conn数据结构(实际上是该方法的稍后部分;它首先计算将有多长时间进行更多工作)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

如您所见,实际的存储桶处理代码非常小-大约四行。其余代码是优先级队列处理。该机器人具有优先级队列,因此,例如,与它聊天的人无法阻止其执行重要的踢/禁止任务。

A Token Bucket is fairly simple to implement.

Start with a bucket with 5 tokens.

Every 5/8 seconds: If the bucket has less than 5 tokens, add one.

Each time you want to send a message: If the bucket has ≥1 token, take one token out and send the message. Otherwise, wait/drop the message/whatever.

(obviously, in actual code, you’d use an integer counter instead of real tokens and you can optimize out the every 5/8s step by storing timestamps)


Reading the question again, if the rate limit is fully reset each 8 seconds, then here is a modification:

Start with a timestamp, last_send, at a time long ago (e.g., at the epoch). Also, start with the same 5-token bucket.

Strike the every 5/8 seconds rule.

Each time you send a message: First, check if last_send ≥ 8 seconds ago. If so, fill the bucket (set it to 5 tokens). Second, if there are tokens in the bucket, send the message (otherwise, drop/wait/etc.). Third, set last_send to now.

That should work for that scenario.


I’ve actually written an IRC bot using a strategy like this (the first approach). Its in Perl, not Python, but here is some code to illustrate:

The first part here handles adding tokens to the bucket. You can see the optimization of adding tokens based on time (2nd to last line) and then the last line clamps bucket contents to the maximum (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn is a data structure which is passed around. This is inside a method that runs routinely (it calculates when the next time it’ll have something to do, and sleeps either that long or until it gets network traffic). The next part of the method handles sending. It is rather complicated, because messages have priorities associated with them.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

That’s the first queue, which is run no matter what. Even if it gets our connection killed for flooding. Used for extremely important things, like responding to the server’s PING. Next, the rest of the queues:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Finally, the bucket status is saved back to the $conn data structure (actually a bit later in the method; it first calculates how soon it’ll have more work)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

As you can see, the actual bucket handling code is very small — about four lines. The rest of the code is priority queue handling. The bot has priority queues so that e.g., someone chatting with it can’t prevent it from doing its important kick/ban duties.


回答 3

为了阻止处理,直到消息可以发送为止,从而使更多消息排队,antti的漂亮解决方案也可以这样修改:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

它只是等待直到有足够的余量来发送消息。为了不以两倍的比率开始,津贴也可以用0初始化。

to block processing until the message can be sent, thus queuing up further messages, antti’s beautiful solution may also be modified like this:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

it just waits until enough allowance is there to send the message. to not start with two times the rate, allowance may also initialized with 0.


回答 4

保留最后五行的发送时间。保留排队的消息,直到最近的第五条消息(如果存在)过去至少8秒(以last_five作为时间数组)为止:

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

Keep the time that the last five lines were sent. Hold the queued messages until the time the fifth-most-recent message (if it exists) is a least 8 seconds in the past (with last_five as an array of times):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

回答 5

一种解决方案是将时间戳记附加到每个队列项目,并在经过8秒后丢弃该项目。您可以在每次添加队列时执行此检查。

仅当将队列大小限制为5并在队列已满时丢弃所有添加项时,此方法才有效。

One solution is to attach a timestamp to each queue item and to discard the item after 8 seconds have passed. You can perform this check each time the queue is added to.

This only works if you limit the queue size to 5 and discard any additions whilst the queue is full.


回答 6

如果仍然有兴趣,我可以将此简单的可调用类与定时LRU键值存储结合使用,以限制每个IP的请求速率。使用双端队列,但可以重写为与列表一起使用。

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

If someone still interested, I use this simple callable class in conjunction with a timed LRU key value storage to limit request rate per IP. Uses a deque, but can rewritten to be used with a list instead.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

回答 7

只是接受的答案中的代码的python实现。

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

Just a python implementation of a code from accepted answer.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

回答 8

这个怎么样:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

How about this:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

回答 9

我需要一个Scala版本。这里是:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A  B) extends (A  B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

使用方法如下:

val f = Limiter((5d, 8d), { 
  _: Unit  
    println(System.currentTimeMillis) 
})
while(true){f(())}

I needed a variation in Scala. Here it is:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Here is how it can be used:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。