问题:在Celery中检索队列中的任务列表
How can I retrieve a list of tasks in a queue that are yet to be processed?
回答 0
编辑:查看其他答案以获得队列中的任务列表。
您应该在这里查看:
Celery指南-检查工人
基本上是这样的:
from celery.app.control import Inspect
# Inspect all nodes.
i = Inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
根据您想要的
EDIT: See other answers for getting a list of tasks in the queue.
You should look here:
Celery Guide – Inspecting Workers
Basically this:
from celery.app.control import Inspect
# Inspect all nodes.
i = Inspect()
# Show the items that have an ETA or are scheduled for later processing
i.scheduled()
# Show tasks that are currently active.
i.active()
# Show tasks that have been claimed by workers
i.reserved()
Depending on what you want
回答 1
如果使用的是rabbitMQ,请在终端中使用:
sudo rabbitmqctl list_queues
它将打印带有待处理任务数量的队列列表。例如:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
右列中的数字是队列中的任务数。在上面,Celery队列有166个待处理任务。
if you are using rabbitMQ, use this in terminal:
sudo rabbitmqctl list_queues
it will print list of queues with number of pending tasks. for example:
Listing queues ...
0b27d8c59fba4974893ec22d478a7093 0
0e0a2da9828a48bc86fe993b210d984f 0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7 0
15c036ad25884b82839495fb29bd6395 1
celerey_mail_worker@torob2.celery.pidbox 0
celery 166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0
the number in right column is number of tasks in the queue. in above, celery queue has 166 pending task.
回答 2
如果您不使用优先任务,那么使用Redis 实际上非常简单。获取任务计数:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
但是,优先任务在redis中使用了不同的键,因此整个过程稍微复杂一些。整个情况是您需要查询redis的每个任务优先级。在python中(以及Flower项目中),它看起来像:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
如果您想完成一项实际任务,可以使用以下方法:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
从那里,您必须反序列化返回的列表。就我而言,我能够通过以下方式完成此任务:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
请注意,反序列化可能需要一些时间,您需要调整以上命令以使用各种优先级。
If you don’t use prioritized tasks, this is actually pretty simple if you’re using Redis. To get the task counts:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
But, prioritized tasks use a different key in redis, so the full picture is slightly more complicated. The full picture is that you need to query redis for every priority of task. In python (and from the Flower project), this looks like:
PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]
def make_queue_name_for_pri(queue, pri):
"""Make a queue name for redis
Celery uses PRIORITY_SEP to separate different priorities of tasks into
different queues in Redis. Each queue-priority combination becomes a key in
redis with names like:
- batch1\x06\x163 <-- P3 queue named batch1
There's more information about this in Github, but it doesn't look like it
will change any time soon:
- https://github.com/celery/kombu/issues/422
In that ticket the code below, from the Flower project, is referenced:
- https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135
:param queue: The name of the queue to make a name for.
:param pri: The priority to make a name with.
:return: A name for the queue-priority pair.
"""
if pri not in DEFAULT_PRIORITY_STEPS:
raise ValueError('Priority not in priority steps')
return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
(queue, '', '')))
def get_queue_length(queue_name='celery'):
"""Get the number of tasks in a celery queue.
:param queue_name: The name of the queue you want to inspect.
:return: the number of items in the queue.
"""
priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
DEFAULT_PRIORITY_STEPS]
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
return sum([r.llen(x) for x in priority_names])
If you want to get an actual task, you can use something like:
redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
From there you’ll have to deserialize the returned list. In my case I was able to accomplish this with something like:
r = redis.StrictRedis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Just be warned that deserialization can take a moment, and you’ll need to adjust the commands above to work with various priorities.
回答 3
要从后端检索任务,请使用此
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
To retrieve tasks from backend, use this
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
回答 4
回答 5
使用json序列化的Redis复制粘贴解决方案:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
它适用于Django。只是不要忘记改变yourproject.celery
。
A copy-paste solution for Redis with json serialization:
def get_celery_queue_items(queue_name):
import base64
import json
# Get a configured instance of a celery app:
from yourproject.celery import app as celery_app
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
body = json.loads(base64.b64decode(j['body']))
decoded_tasks.append(body)
return decoded_tasks
It works with Django. Just don’t forget to change yourproject.celery
.
回答 6
The celery inspect module appears to only be aware of the tasks from the workers perspective. If you want to view the messages that are in the queue (yet to be pulled by the workers) I suggest to use pyrabbit, which can interface with the rabbitmq http api to retrieve all kinds of information from the queue.
An example can be found here:
Retrieve queue length with Celery (RabbitMQ, Django)
回答 7
我认为获取正在等待的任务的唯一方法是保留启动任务的列表,并让任务在启动时将其自身从列表中删除。
使用rabbitmqctl和list_queues,您可以概述正在等待多少个任务,而不是任务本身:http ://www.rabbitmq.com/man/rabbitmqctl.1.man.html
如果要包含正在处理但尚未完成的任务,则可以保留任务列表并检查其状态:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
或者,您让Celery使用CELERY_RESULT_BACKEND存储结果,并检查其中没有哪些任务。
I think the only way to get the tasks that are waiting is to keep a list of tasks you started and let the task remove itself from the list when it’s started.
With rabbitmqctl and list_queues you can get an overview of how many tasks are waiting, but not the tasks itself: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
If what you want includes the task being processed, but are not finished yet, you can keep a list of you tasks and check their states:
from tasks import add
result = add.delay(4, 4)
result.ready() # True if finished
Or you let Celery store the results with CELERY_RESULT_BACKEND and check which of your tasks are not in there.
回答 8
这在我的应用程序中为我工作:
def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
active_jobs
将是与队列中的任务相对应的字符串列表。
不要忘记与您自己交换CELERY_APP_INSTANCE。
感谢@ashish将我的答案指向正确的方向:https ://stackoverflow.com/a/19465670/9843399
This worked for me in my application:
def get_celery_queue_active_jobs(queue_name):
connection = <CELERY_APP_INSTANCE>.connection()
try:
channel = connection.channel()
name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
active_jobs = []
def dump_message(message):
active_jobs.append(message.properties['application_headers']['task'])
channel.basic_consume(queue=queue_name, callback=dump_message)
for job in range(jobs):
connection.drain_events()
return active_jobs
finally:
connection.close()
active_jobs
will be a list of strings that correspond to tasks in the queue.
Don’t forget to swap out CELERY_APP_INSTANCE with your own.
Thanks to @ashish for pointing me in the right direction with his answer here: https://stackoverflow.com/a/19465670/9843399
回答 9
据我所知,Celery没有提供用于检查队列中正在等待的任务的API。这是特定于经纪人的。如果使用Redis作为代理,那么检查celery
(默认)队列中正在等待的任务非常简单:
- 连接到代理数据库
- 列出列表中的项目
celery
(以LRANGE命令为例)
请记住,这些是等待有空工作人员选择的任务。您的群集中可能正在运行某些任务-由于已经选择了这些任务,因此它们不会出现在此列表中。
As far as I know Celery does not give API for examining tasks that are waiting in the queue. This is broker-specific. If you use Redis as a broker for an example, then examining tasks that are waiting in the celery
(default) queue is as simple as:
- connect to the broker database
- list items in the
celery
list (LRANGE command for an example)
Keep in mind that these are tasks WAITING to be picked by available workers. Your cluster may have some tasks running – those will not be in this list as they have already been picked.
回答 10
我得出的结论是,获得队列中作业数量的最佳方法是使用,rabbitmqctl
正如这里多次建议的那样。为了允许任何选定的用户使用sudo
我按照此处的说明运行命令(我跳过了概要文件部分的编辑,因为我不介意在命令前键入sudo。)
我还抓住了jamesc的代码段grep
和cut
片段,并将其包装在子流程调用中。
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
I’ve come to the conclusion the best way to get the number of jobs on a queue is to use rabbitmqctl
as has been suggested several times here. To allow any chosen user to run the command with sudo
I followed the instructions here (I did skip editing the profile part as I don’t mind typing in sudo before the command.)
I also grabbed jamesc’s grep
and cut
snippet and wrapped it up in subprocess calls.
from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
回答 11
from celery.task.control import inspect
def key_in_list(k, l):
return bool([True for i in l if k in i.values()])
def check_task(task_id):
task_value_dict = inspect().active().values()
for task_list in task_value_dict:
if self.key_in_list(task_id, task_list):
return True
return False
from celery.task.control import inspect
def key_in_list(k, l):
return bool([True for i in l if k in i.values()])
def check_task(task_id):
task_value_dict = inspect().active().values()
for task_list in task_value_dict:
if self.key_in_list(task_id, task_list):
return True
return False
回答 12
如果您控制任务的代码,则可以通过让任务在首次执行时触发一次重试来解决此问题,然后选中inspect().reserved()
。重试将任务注册到结果后端,而celery可以看到。该任务必须接受self
或context
作为第一个参数,以便我们可以访问重试计数。
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
此解决方案与代理无关,即。您不必担心使用的是RabbitMQ还是Redis来存储任务。
编辑:经过测试,我发现这只是部分解决方案。保留的大小限于工作程序的预取设置。
If you control the code of the tasks then you can work around the problem by letting a task trigger a trivial retry the first time it executes, then checking inspect().reserved()
. The retry registers the task with the result backend, and celery can see that. The task must accept self
or context
as first parameter so we can access the retry count.
@task(bind=True)
def mytask(self):
if self.request.retries == 0:
raise self.retry(exc=MyTrivialError(), countdown=1)
...
This solution is broker agnostic, ie. you don’t have to worry about whether you are using RabbitMQ or Redis to store the tasks.
EDIT: after testing I’ve found this to be only a partial solution. The size of reserved is limited to the prefetch setting for the worker.
回答 13
与subprocess.run
:
import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))
小心改变my_proj
与your_proj
With subprocess.run
:
import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))
Be careful to change my_proj
with your_proj