问题:在Celery中检索队列中的任务列表

如何检索队列中尚未处理的任务列表?

How can I retrieve a list of tasks in a queue that are yet to be processed?


回答 0

编辑:查看其他答案以获得队列中的任务列表。

您应该在这里查看: Celery指南-检查工人

基本上是这样的:

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

根据您想要的

EDIT: See other answers for getting a list of tasks in the queue.

You should look here: Celery Guide – Inspecting Workers

Basically this:

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

Depending on what you want


回答 1

如果使用的是rabbitMQ,请在终端中使用:

sudo rabbitmqctl list_queues

它将打印带有待处理任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右列中的数字是队列中的任务数。在上面,Celery队列有166个待处理任务。

if you are using rabbitMQ, use this in terminal:

sudo rabbitmqctl list_queues

it will print list of queues with number of pending tasks. for example:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

the number in right column is number of tasks in the queue. in above, celery queue has 166 pending task.


回答 2

如果您不使用优先任务,那么使用Redis 实际上非常简单。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先任务在redis中使用了不同的键,因此整个过程稍微复杂一些。整个情况是您需要查询redis的每个任务优先级。在python中(以及Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果您想完成一项实际任务,可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。就我而言,我能够通过以下方式完成此任务:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整以上命令以使用各种优先级。

If you don’t use prioritized tasks, this is actually pretty simple if you’re using Redis. To get the task counts:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

But, prioritized tasks use a different key in redis, so the full picture is slightly more complicated. The full picture is that you need to query redis for every priority of task. In python (and from the Flower project), this looks like:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

If you want to get an actual task, you can use something like:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

From there you’ll have to deserialize the returned list. In my case I was able to accomplish this with something like:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Just be warned that deserialization can take a moment, and you’ll need to adjust the commands above to work with various priorities.


回答 3

要从后端检索任务,请使用此

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

To retrieve tasks from backend, use this

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

回答 4

如果您使用的是Celery + Django的最简单方法来检查任务,可直接从虚拟环境中的终端中使用命令或使用celery 的完整路径进行检查:

Dochttp : //docs.celeryproject.org/en/latest/userguide/workers.html? highlight = revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果您使用的是Celery + RabbitMQ,则可以使用以下命令检查队列列表

更多信息https : //linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

If you are using Celery+Django simplest way to inspect tasks using commands directly from your terminal in your virtual environment or using a full path to celery:

Doc: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

Also if you are using Celery+RabbitMQ you can inspect the list of queues using the following command:

More info: https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

回答 5

使用json序列化的Redis复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它适用于Django。只是不要忘记改变yourproject.celery

A copy-paste solution for Redis with json serialization:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

It works with Django. Just don’t forget to change yourproject.celery.


回答 6

Celery检查模块似乎仅从工人角度了解任务。如果您想查看队列中的消息(但尚未由工作人员拉出),我建议使用pyrabbit,它可以与Rabbitmq http api进行接口以从队列中检索各种信息。

可以在此处找到一个示例: 使用Celery(RabbitMQ,Django)检索队列长度

The celery inspect module appears to only be aware of the tasks from the workers perspective. If you want to view the messages that are in the queue (yet to be pulled by the workers) I suggest to use pyrabbit, which can interface with the rabbitmq http api to retrieve all kinds of information from the queue.

An example can be found here: Retrieve queue length with Celery (RabbitMQ, Django)


回答 7

我认为获取正在等待的任务的唯一方法是保留启动任务的列表,并让任务在启动时将其自身从列表中删除。

使用rabbitmqctl和list_queues,您可以概述正在等待多少个任务,而不是任务本身:http ://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果要包含正在处理但尚未完成的任务,则可以保留任务列表并检查其状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者,您让Celery使用CELERY_RESULT_BACKEND存储结果,并检查其中没有哪些任务。

I think the only way to get the tasks that are waiting is to keep a list of tasks you started and let the task remove itself from the list when it’s started.

With rabbitmqctl and list_queues you can get an overview of how many tasks are waiting, but not the tasks itself: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

If what you want includes the task being processed, but are not finished yet, you can keep a list of you tasks and check their states:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

Or you let Celery store the results with CELERY_RESULT_BACKEND and check which of your tasks are not in there.


回答 8

这在我的应用程序中为我工作:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs 将是与队列中的任务相对应的字符串列表。

不要忘记与您自己交换CELERY_APP_INSTANCE。

感谢@ashish将我的答案指向正确的方向:https ://stackoverflow.com/a/19465670/9843399

This worked for me in my application:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs will be a list of strings that correspond to tasks in the queue.

Don’t forget to swap out CELERY_APP_INSTANCE with your own.

Thanks to @ashish for pointing me in the right direction with his answer here: https://stackoverflow.com/a/19465670/9843399


回答 9

据我所知,Celery没有提供用于检查队列中正在等待的任务的API。这是特定于经纪人的。如果使用Redis作为代理,那么检查celery(默认)队列中正在等待的任务非常简单:

  1. 连接到代理数据库
  2. 列出列表中的项目celery(以LRANGE命令为例)

请记住,这些是等待有空工作人员选择的任务。您的群集中可能正在运行某些任务-由于已经选择了这些任务,因此它们不会出现在此列表中。

As far as I know Celery does not give API for examining tasks that are waiting in the queue. This is broker-specific. If you use Redis as a broker for an example, then examining tasks that are waiting in the celery (default) queue is as simple as:

  1. connect to the broker database
  2. list items in the celery list (LRANGE command for an example)

Keep in mind that these are tasks WAITING to be picked by available workers. Your cluster may have some tasks running – those will not be in this list as they have already been picked.


回答 10

我得出的结论是,获得队列中作业数量的最佳方法是使用,rabbitmqctl正如这里多次建议的那样。为了允许任何选定的用户使用sudo我按照此处的说明运行命令(我跳过了概要文件部分的编辑,因为我不介意在命令前键入sudo。)

我还抓住了jamesc的代码段grepcut片段,并将其包装在子流程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

I’ve come to the conclusion the best way to get the number of jobs on a queue is to use rabbitmqctl as has been suggested several times here. To allow any chosen user to run the command with sudo I followed the instructions here (I did skip editing the profile part as I don’t mind typing in sudo before the command.)

I also grabbed jamesc’s grep and cut snippet and wrapped it up in subprocess calls.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

回答 11

from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

回答 12

如果您控制任务的代码,则可以通过让任务在首次执行时触发一次重试来解决此问题,然后选中inspect().reserved()。重试将任务注册到结果后端,而celery可以看到。该任务必须接受selfcontext作为第一个参数,以便我们可以访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

此解决方案与代理无关,即。您不必担心使用的是RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是部分解决方案。保留的大小限于工作程序的预取设置。

If you control the code of the tasks then you can work around the problem by letting a task trigger a trivial retry the first time it executes, then checking inspect().reserved(). The retry registers the task with the result backend, and celery can see that. The task must accept self or context as first parameter so we can access the retry count.

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

This solution is broker agnostic, ie. you don’t have to worry about whether you are using RabbitMQ or Redis to store the tasks.

EDIT: after testing I’ve found this to be only a partial solution. The size of reserved is limited to the prefetch setting for the worker.


回答 13

subprocess.run

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

小心改变my_projyour_proj

With subprocess.run:

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

Be careful to change my_proj with your_proj


声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。