标签归档:celery

您如何对Celery任务进行单元测试?

问题:您如何对Celery任务进行单元测试?

Celery文档提到了在Django中测试Celery,但没有解释如果不使用Django,如何测试Celery任务。你怎么做到这一点?

The Celery documentation mentions testing Celery within Django but doesn’t explain how to test a Celery task if you are not using Django. How do you do this?


回答 0

可以使用任何unittest库同步测试任务。在处理Celery任务时,我通常会进行2次不同的测试。第一个(如我所建议的,下面是波纹管)是完全同步的,应该确保算法执行应做的工作。第二部分使用整个系统(包括代理),并确保我没有序列化问题或任何其他分发,通信问题。

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

和您的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

希望有帮助!

It is possible to test tasks synchronously using any unittest lib out there. I normaly do 2 different test sessions when working with celery tasks. The first one (as I’m suggesting bellow) is completely synchronous and should be the one that makes sure the algorithm does what it should do. The second session uses the whole system (including the broker) and makes sure I’m not having serialization issues or any other distribution, comunication problem.

So:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

And your test:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Hope that helps!


回答 1

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

文件:http : //docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER使您可以同步运行任务,并且不需要Celery服务器。

I use this:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER lets you run your task synchronous, and you don’t need a celery server.


回答 2

取决于您要测试的内容。

  • 直接测试任务代码。不要调用“ task.delay(…)”,而只是在单元测试中调用“ task(…)”。
  • 使用CELERY_ALWAYS_EAGER。这将导致您的任务在您说“ task.delay(…)”时立即被调用,因此您可以测试整个路径(但不能测试任何异步行为)。

Depends on what exactly you want to be testing.

  • Test the task code directly. Don’t call “task.delay(…)” just call “task(…)” from your unit tests.
  • Use CELERY_ALWAYS_EAGER. This will cause your tasks to be called immediately at the point you say “task.delay(…)”, so you can test the whole path (but not any asynchronous behavior).

回答 3

单元测试

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test装置

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

附录:让send_task尊重

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Addendum: make send_task respect eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

回答 4

对于Celery 4上的用户:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

由于设置名称已更改,如果您选择升级,则需要更新,请参见

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

For those on Celery 4 it’s:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Because the settings names have been changed and need updating if you choose to upgrade, see

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names


回答 5

Celery 3.0开始CELERY_ALWAYS_EAGERDjango中进行设置的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

As of Celery 3.0, one way to set CELERY_ALWAYS_EAGER in Django is:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

回答 6

从Celery v4.0开始提供了py.test固定装置来启动celery工人只是为了进行测试,并在完成后将其关闭:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他灯具中,您可以通过celery_config以下方式重新定义灯具来更改celery的默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

默认情况下,测试人员使用内存中的代理和结果后端。如果不测试特定功能,则无需使用本地Redis或RabbitMQ。

Since Celery v4.0, py.test fixtures are provided to start a celery worker just for the test and are shut down when done:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Among other fixtures described on http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, you can change the celery default options by redefining the celery_config fixture this way:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

By default, the test worker uses an in-memory broker and result backend. No need to use a local Redis or RabbitMQ if not testing specific features.


回答 7

使用pytest 参考

def test_add(celery_worker):
    mytask.delay()

如果您使用烧瓶,请设置应用配置

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

和在 conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

reference using pytest.

def test_add(celery_worker):
    mytask.delay()

if you use flask, set the app config

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

and in conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

回答 8

就我而言(我假设还有很多其他人),我想要做的就是使用pytest测试任务的内部逻辑。

TL; DR; 最终嘲笑了一切(选项2


示例用例

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

但是,由于shared_task装饰器执行了许多Celery内部逻辑操作,因此它实际上不是单元测试。

因此,对我来说,有2种选择:

选项1:独立的内部逻辑

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

这看起来很奇怪,除了使可读性降低之外,它还需要手动提取并传递属于请求的属性,例如task_id在您需要的情况下,这会使逻辑不那么纯净。

选项2:模拟
嘲笑Celery内部

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

然后,它允许我模拟请求对象(同样,如果您需要请求中的内容,例如ID或重试计数器)。

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

该解决方案更加手动,但是,它为我提供了实际进行单元测试所需的控制,而无需重复自己,也不会丢失Celery范围。

In my case (and I assume many others), all I wanted was to test the inner logic of a task using pytest.

TL;DR; ended up mocking everything away (OPTION 2)


Example Use Case:

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

but, since shared_task decorator does a lot of celery internal logic, it isn’t really a unit tests.

So, for me, there were 2 options:

OPTION 1: Separate internal logic

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

This looks very odd, and other than making it less readable, it requires to manually extract and pass attributes that are part of the request, for instance the task_id in case you need it, which make the logic less pure.

OPTION 2: mocks
mocking away celery internals

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

which then allows me to mock the request object (again, in case you need things from the request, like the id, or the retries counter.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

This solution is much more manual, but, it gives me the control I need to actually unit test, without repeating myself, and without losing the celery scope.


使用Celery vs. RQ的利弊[关闭]

问题:使用Celery vs. RQ的利弊[关闭]

目前,我正在处理需要实施一些后台作业(主要用于电子邮件发送和大量数据库更新)的python项目。我将Redis用于任务代理。因此,在这一点上,我有两个候选人:CeleryRQ。我对这些工作队列有一定的经验,但我想请大家分享使用此工具的经验。所以。

  1. 使用Celery vs.RQ有什么优缺点。
  2. 适合使用Celery vs. RQ的项目/任务的任何示例。

Celery看起来很复杂,但是它是功能齐全的解决方案。实际上,我认为我不需要所有这些功能。从另一方面讲,RQ非常简单(例如,配置,集成),但是它似乎缺少一些有用的功能(例如,任务吊销,代码自动重载)

Currently I’m working on python project that requires implement some background jobs (mostly for email sending and heavily database updates). I use Redis for task broker. So in this point I have two candidates: Celery and RQ. I had some experience with these job queues, but I want to ask you guys to share you experience of using this tools. So.

  1. What pros and cons to use Celery vs. RQ.
  2. Any examples of projects/task suitable to use Celery vs. RQ.

Celery looks pretty complicated but it’s full featured solution. Actually I don’t think that I need all these features. From other side RQ is very simple (e.g configuration, integration), but it seems that it lacks some useful features (e.g task revoking, code auto-reloading)


回答 0

这是我在尝试回答这个完全相同的问题时发现的。它可能不全面,甚至在某些方面可能不准确。

简而言之,RQ被设计为更简单。Celery被设计为更坚固。他们俩都很棒。

  • 文档。RQ的文档全面而又不复杂,并且反映了项目的整体简单性-您永远不会感到迷茫或困惑。Celery的文档也很全面,但是在您第一次进行设置时,由于有太多可供选择的内部化选项,因此希望能重新访问它
  • 监控。Celery的FlowerRQ仪表板都很容易设置,并且至少为您提供了90%的所有信息

  • 经纪人支持。Celery无疑是赢家,RQ仅支持Redis。这意味着有关“什么是经纪人”的文档更少,但是也意味着,如果Redis不再为您服务,则您将来将无法切换经纪人。例如,Instagram在Celery中同时考虑了Redis和RabbitMQ。这很重要,因为不同的代理有不同的保证,例如,Redis 无法(截至撰写时)保证100%传递您的消息。

  • 优先级队列。RQ的优先级队列模型简单有效,工作人员按顺序从队列中读取。Celery要求纺纱多个工人从不同的队列消费。两种方法都有效

  • 操作系统支持。Celery无疑是赢家,因为RQ仅在支持forkUnix系统的系统上运行

  • 语言支持。RQ仅支持Python,而Celery允许您将任务从一种语言发送到另一种语言

  • API。Celery非常灵活(多个结果后端,漂亮的配置格式,工作流画布支持),但是自然地,这种功能可能会令人困惑。相比之下,RQ API很简单。

  • 子任务支持。Celery支持子任务(例如,从现有任务中创建新任务)。我不知道RQ是否

  • 社区与稳定。Celery可能更成熟,但它们都是活跃的项目。截至撰写本文时,Celery在Github上拥有约3500颗星,而RQ拥有约2000颗星,并且两个项目都显示出积极的发展

我认为,Celery并不像其声誉可能会让您相信的那么复杂,但是您将不得不使用RTFM。

那么,为什么有人愿意将(可能功能更全的)Celery换成RQ?在我看来,这全都归结为简单性。通过限制自己使用Redis + Unix,RQ提供了更简单的文档,更简单的代码库和更简单的API。这意味着您(以及潜在的项目贡献者)可以专注于您关心的代码,而不必在工作内存中保留有关任务队列系统的详细信息。我们每个人一次都有多少个限制,并且不再需要在其中保留任务队列详细信息,因此RQ让您回到您关心的代码。这种简单性是以语言间任务队列,广泛的操作系统支持,100%可靠的消息保证以及轻松切换消息代理的功能为代价的。

Here is what I have found while trying to answer this exact same question. It’s probably not comprehensive, and may even be inaccurate on some points.

In short, RQ is designed to be simpler all around. Celery is designed to be more robust. They are both excellent.

  • Documentation. RQ’s documentation is comprehensive without being complex, and mirrors the project’s overall simplicity – you never feel lost or confused. Celery’s documentation is also comprehensive, but expect to be re-visiting it quite a lot when you’re first setting things up as there are too many options to internalize
  • Monitoring. Celery’s Flower and the RQ dashboard are both very simple to setup and give you at least 90% of all information you would ever want

  • Broker support. Celery is the clear winner, RQ only supports Redis. This means less documentation on “what is a broker”, but also means you cannot switch brokers in the future if Redis no longer works for you. For example, Instagram considered both Redis and RabbitMQ with Celery. This is important because different brokers have different guarantees e.g. Redis cannot (as of writing) guarantee 100% that your messages are delivered.

  • Priority queues. RQs priority queue model is simple and effective – workers read from queues in order. Celery requires spinning up multiple workers to consume from different queues. Both approaches work

  • OS Support. Celery is the clear winner here, as RQ only runs on systems that support fork e.g. Unix systems

  • Language support. RQ only supports Python, whereas Celery lets you send tasks from one language to a different language

  • API. Celery is extremely flexible (multiple result backends, nice config format, workflow canvas support) but naturally this power can be confusing. By contrast, the RQ api is simple.

  • Subtask support. Celery supports subtasks (e.g. creating new tasks from within existing tasks). I don’t know if RQ does

  • Community and Stability. Celery is probably more established, but they are both active projects. As of writing, Celery has ~3500 stars on Github while RQ has ~2000 and both projects show active development

In my opinion, Celery is not as complex as its reputation might lead you to believe, but you will have to RTFM.

So, why would anyone be willing to trade the (arguably more full-featured) Celery for RQ? In my mind, it all comes down to the simplicity. By restricting itself to Redis+Unix, RQ provides simpler documentation, simpler codebase, and a simpler API. This means you (and potential contributors to your project) can focus on the code you care about, instead of having to keep details about the task queue system in your working memory. We all have a limit on how many details can be in our head at once, and by removing the need to keep task queue details in there RQ lets get back to the code you care about. That simplicity comes at the expense of features like inter-language task queues, wide OS support, 100% reliable message guarantees, and ability to switch message brokers easily.


回答 1

Celery并不那么复杂。从本质上讲,您可以从进行逐步配置tutorials,创建一个celery实例,用装饰您的功能,@celery.task然后使用来运行任务my_task.delay(*args, **kwargs)

从您自己的评估来看,似乎您必须在缺少(关键)功能或有些多余的东西之间进行选择。在我的书中,这并不是很难选择的选择。

Celery is not that complicated. At its core, you do the step by step configuration from the tutorials, create a celery instance, decorate your function with @celery.task then run the task with my_task.delay(*args, **kwargs).

Judging from your own assessment, it seems you have to choose between lacking (key) features or having some excess hanging around. That is not too hard of a choice in my book.


在Celery中检索队列中的任务列表

问题:在Celery中检索队列中的任务列表

如何检索队列中尚未处理的任务列表?

How can I retrieve a list of tasks in a queue that are yet to be processed?


回答 0

编辑:查看其他答案以获得队列中的任务列表。

您应该在这里查看: Celery指南-检查工人

基本上是这样的:

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

根据您想要的

EDIT: See other answers for getting a list of tasks in the queue.

You should look here: Celery Guide – Inspecting Workers

Basically this:

from celery.app.control import Inspect

# Inspect all nodes.
i = Inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

Depending on what you want


回答 1

如果使用的是rabbitMQ,请在终端中使用:

sudo rabbitmqctl list_queues

它将打印带有待处理任务数量的队列列表。例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

右列中的数字是队列中的任务数。在上面,Celery队列有166个待处理任务。

if you are using rabbitMQ, use this in terminal:

sudo rabbitmqctl list_queues

it will print list of queues with number of pending tasks. for example:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

the number in right column is number of tasks in the queue. in above, celery queue has 166 pending task.


回答 2

如果您不使用优先任务,那么使用Redis 实际上非常简单。获取任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

但是,优先任务在redis中使用了不同的键,因此整个过程稍微复杂一些。整个情况是您需要查询redis的每个任务优先级。在python中(以及Flower项目中),它看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

如果您想完成一项实际任务,可以使用以下方法:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

从那里,您必须反序列化返回的列表。就我而言,我能够通过以下方式完成此任务:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

请注意,反序列化可能需要一些时间,您需要调整以上命令以使用各种优先级。

If you don’t use prioritized tasks, this is actually pretty simple if you’re using Redis. To get the task counts:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

But, prioritized tasks use a different key in redis, so the full picture is slightly more complicated. The full picture is that you need to query redis for every priority of task. In python (and from the Flower project), this looks like:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

If you want to get an actual task, you can use something like:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

From there you’ll have to deserialize the returned list. In my case I was able to accomplish this with something like:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Just be warned that deserialization can take a moment, and you’ll need to adjust the commands above to work with various priorities.


回答 3

要从后端检索任务,请使用此

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

To retrieve tasks from backend, use this

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)

回答 4

如果您使用的是Celery + Django的最简单方法来检查任务,可直接从虚拟环境中的终端中使用命令或使用celery 的完整路径进行检查:

Dochttp : //docs.celeryproject.org/en/latest/userguide/workers.html? highlight = revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

另外,如果您使用的是Celery + RabbitMQ,则可以使用以下命令检查队列列表

更多信息https : //linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

If you are using Celery+Django simplest way to inspect tasks using commands directly from your terminal in your virtual environment or using a full path to celery:

Doc: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

Also if you are using Celery+RabbitMQ you can inspect the list of queues using the following command:

More info: https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

回答 5

使用json序列化的Redis复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

它适用于Django。只是不要忘记改变yourproject.celery

A copy-paste solution for Redis with json serialization:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

It works with Django. Just don’t forget to change yourproject.celery.


回答 6

Celery检查模块似乎仅从工人角度了解任务。如果您想查看队列中的消息(但尚未由工作人员拉出),我建议使用pyrabbit,它可以与Rabbitmq http api进行接口以从队列中检索各种信息。

可以在此处找到一个示例: 使用Celery(RabbitMQ,Django)检索队列长度

The celery inspect module appears to only be aware of the tasks from the workers perspective. If you want to view the messages that are in the queue (yet to be pulled by the workers) I suggest to use pyrabbit, which can interface with the rabbitmq http api to retrieve all kinds of information from the queue.

An example can be found here: Retrieve queue length with Celery (RabbitMQ, Django)


回答 7

我认为获取正在等待的任务的唯一方法是保留启动任务的列表,并让任务在启动时将其自身从列表中删除。

使用rabbitmqctl和list_queues,您可以概述正在等待多少个任务,而不是任务本身:http ://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果要包含正在处理但尚未完成的任务,则可以保留任务列表并检查其状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

或者,您让Celery使用CELERY_RESULT_BACKEND存储结果,并检查其中没有哪些任务。

I think the only way to get the tasks that are waiting is to keep a list of tasks you started and let the task remove itself from the list when it’s started.

With rabbitmqctl and list_queues you can get an overview of how many tasks are waiting, but not the tasks itself: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

If what you want includes the task being processed, but are not finished yet, you can keep a list of you tasks and check their states:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

Or you let Celery store the results with CELERY_RESULT_BACKEND and check which of your tasks are not in there.


回答 8

这在我的应用程序中为我工作:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs 将是与队列中的任务相对应的字符串列表。

不要忘记与您自己交换CELERY_APP_INSTANCE。

感谢@ashish将我的答案指向正确的方向:https ://stackoverflow.com/a/19465670/9843399

This worked for me in my application:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()

active_jobs will be a list of strings that correspond to tasks in the queue.

Don’t forget to swap out CELERY_APP_INSTANCE with your own.

Thanks to @ashish for pointing me in the right direction with his answer here: https://stackoverflow.com/a/19465670/9843399


回答 9

据我所知,Celery没有提供用于检查队列中正在等待的任务的API。这是特定于经纪人的。如果使用Redis作为代理,那么检查celery(默认)队列中正在等待的任务非常简单:

  1. 连接到代理数据库
  2. 列出列表中的项目celery(以LRANGE命令为例)

请记住,这些是等待有空工作人员选择的任务。您的群集中可能正在运行某些任务-由于已经选择了这些任务,因此它们不会出现在此列表中。

As far as I know Celery does not give API for examining tasks that are waiting in the queue. This is broker-specific. If you use Redis as a broker for an example, then examining tasks that are waiting in the celery (default) queue is as simple as:

  1. connect to the broker database
  2. list items in the celery list (LRANGE command for an example)

Keep in mind that these are tasks WAITING to be picked by available workers. Your cluster may have some tasks running – those will not be in this list as they have already been picked.


回答 10

我得出的结论是,获得队列中作业数量的最佳方法是使用,rabbitmqctl正如这里多次建议的那样。为了允许任何选定的用户使用sudo我按照此处的说明运行命令(我跳过了概要文件部分的编辑,因为我不介意在命令前键入sudo。)

我还抓住了jamesc的代码段grepcut片段,并将其包装在子流程调用中。

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

I’ve come to the conclusion the best way to get the number of jobs on a queue is to use rabbitmqctl as has been suggested several times here. To allow any chosen user to run the command with sudo I followed the instructions here (I did skip editing the profile part as I don’t mind typing in sudo before the command.)

I also grabbed jamesc’s grep and cut snippet and wrapped it up in subprocess calls.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))

回答 11

from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False
from celery.task.control import inspect
def key_in_list(k, l):
    return bool([True for i in l if k in i.values()])

def check_task(task_id):
    task_value_dict = inspect().active().values()
    for task_list in task_value_dict:
        if self.key_in_list(task_id, task_list):
             return True
    return False

回答 12

如果您控制任务的代码,则可以通过让任务在首次执行时触发一次重试来解决此问题,然后选中inspect().reserved()。重试将任务注册到结果后端,而celery可以看到。该任务必须接受selfcontext作为第一个参数,以便我们可以访问重试计数。

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

此解决方案与代理无关,即。您不必担心使用的是RabbitMQ还是Redis来存储任务。

编辑:经过测试,我发现这只是部分解决方案。保留的大小限于工作程序的预取设置。

If you control the code of the tasks then you can work around the problem by letting a task trigger a trivial retry the first time it executes, then checking inspect().reserved(). The retry registers the task with the result backend, and celery can see that. The task must accept self or context as first parameter so we can access the retry count.

@task(bind=True)
def mytask(self):
    if self.request.retries == 0:
        raise self.retry(exc=MyTrivialError(), countdown=1)
    ...

This solution is broker agnostic, ie. you don’t have to worry about whether you are using RabbitMQ or Redis to store the tasks.

EDIT: after testing I’ve found this to be only a partial solution. The size of reserved is limited to the prefetch setting for the worker.


回答 13

subprocess.run

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

小心改变my_projyour_proj

With subprocess.run:

import subprocess
import re
active_process_txt = subprocess.run(['celery', '-A', 'my_proj', 'inspect', 'active'],
                                        stdout=subprocess.PIPE).stdout.decode('utf-8')
return len(re.findall(r'worker_pid', active_process_txt))

Be careful to change my_proj with your_proj


干货源码剖析!详解 Celery Beat 实现原理

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。

为了讲解 Celery Beat 的周期调度机制及实现原理,我们会基于Django从制作一个简单的周期任务开始,然后一步一步拆解 Celery Beat 的源代码。

相关前置应用知识,可以阅读以下文章:

1.Django Celery 异步与定时任务实战教程
2.Python Celery 异步快速下载股票数据

1.Celery 简单周期任务示例

在 celery_app.tasks.py 中添加如下任务:

@shared_task
def pythondict_task():
    print("pythondict_task")

在 django.celery.py 文件中添加如下配置,

from celery_django import settings
from datetime import timedelta


app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

CELERYBEAT_SCHEDULE = {
    'pythondict_task': {
        'task': 'celery_app.tasks.pythondict_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此时,先启动 Celery Beat 定时任务命令:

celery beat -A celery_django -S django

然后打开第二个终端进程启动消费者:

celery -A celery_django worker 

此时在worker的终端上就会输出类似如下的信息:

    [2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
    [2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
    [2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
    [2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到结果正常输出,说明任务成功定时执行。

2.源码剖析

为了明白 Celery Beat 是如何实现周期任务调度的,我们需要从 Celery 源码入手。

当你执行 Celery Beat 启动命令的时候,到底发生了什么?

celery beat -A celery_django -S django

当你执行这个命令的时候,Celery/bin/celery.py 中的 CeleryCommand 类接收到命令后,会选择 beat 对应的类执行如下代码:

# Python 实用宝典
# https://pythondict.com

from celery.bin.beat import beat

class CeleryCommand(Command):
    commands = {
        # ...
        'beat': beat,
        # ...
    }
    # ...
    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help']
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

此时cls对应的是beat类,通过查看位于bin/beat.py中的 beat 类可知,该类只重写了run方法和add_arguments方法。

所以此时执行的 run_from_argv 方法是 beat 继承的 Command 的 run_from_argv 方法:

# Python 实用宝典
# https://pythondict.com

def run_from_argv(self, prog_name, argv=None, command=None):
    return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

该方法中会调用 Command 的 handle_argv 方法,而该方法在经过相关参数处理后会调用 self(*args, **options) 到 __call__ 函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def handle_argv(self, prog_name, argv, command=None):
        """Parse command-line arguments from ``argv`` and dispatch
        to :meth:`run`.

        :param prog_name: The program name (``argv[0]``).
        :param argv: Command arguments.

        Exits with an error message if :attr:`supports_args` is disabled
        and ``argv`` contains positional arguments.

        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

Command 类的 __call__函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def __call__(self, *args, **kwargs):
        random.seed()  # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

可见,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法:

# Python 实用宝典
# https://pythondict.com
    
class beat(Command):
    """Start the beat periodic task scheduler.

    Examples::

        celery beat -l info
        celery beat -s /var/run/celery/beat-schedule --detach
        celery beat -S djcelery.schedulers.DatabaseScheduler

    """
    doc = __doc__
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, **kwargs):
        # 是否开启后台运行
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        workdir = working_directory
        kwargs.pop('app', None)
        # 设定偏函数
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run() # 后台运行
        else:
            return beat().run() # 立即运行

这里引用了偏函数的知识,偏函数就是从基函数创建一个新的带默认参数的函数,详细可见廖雪峰老师的介绍:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可见,此时创建了app的Beat方法的偏函数,并通过 .run 函数执行启动 beat 进程,首先看看这个 beat 方法:

    # Python 实用宝典
    # https://pythondict.com
    @cached_property
    def Beat(self, **kwargs):
        # 导入celery.apps.beat:Beat类
        return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此时就实例化了 celery.apps.beat 中的 Beat 类,并调用了该实例的 run 方法:

    # Python 实用宝典
    # https://pythondict.com
    def run(self):
        print(str(self.colored.cyan(
            'celery beat v{0} is starting.'.format(VERSION_BANNER))))
        # 初始化loader
        self.init_loader()
        # 设置进程
        self.set_process_title()
        # 开启任务调度
        self.start_scheduler()

init_loader 中,会导入默认的modules,此时会引入相关的定时任务,这些不是本文重点。我们重点看 start_scheduler 是如何开启任务调度的:

    # Python 实用宝典
    # https://pythondict.com
    def start_scheduler(self):
        c = self.colored
        if self.pidfile: # 是否设定了pid文件
            platforms.create_pidlock(self.pidfile)  # 创建pid文件
        # 初始化service
        beat = self.Service(app=self.app,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)
        
        # 打印启动信息
        print(str(c.blue('__    ', c.magenta('-'),
                  c.blue('    ... __   '), c.magenta('-'),
                  c.blue('        _\n'),
                  c.reset(self.startup_info(beat)))))
        # 开启日志
        self.setup_logging()
        if self.socket_timeout:
            logger.debug('Setting default socket timeout to %r',
                         self.socket_timeout)
            # 设置超时
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            # 注册handler
            self.install_sync_handler(beat)
            # 开启beat
            beat.start()
        except Exception as exc:
            logger.critical('beat raised exception %s: %r',
                            exc.__class__, exc,
                            exc_info=True)

我们看下beat是如何开启的:

    # Python 实用宝典
    # https://pythondict.com
    def start(self, embedded_process=False, drift=-0.010):
        info('beat: Starting...')
        # 打印最大间隔时间
        debug('beat: Ticking with max interval->%s',
              humanize_seconds(self.scheduler.max_interval))
        
        # 通知注册该signal的函数
        signals.beat_init.send(sender=self)
        if embedded_process:
            signals.beat_embedded_init.send(sender=self)
            platforms.set_process_title('celery beat')

        try:
            while not self._is_shutdown.is_set():
                # 调用scheduler.tick()函数检查还剩多余时间
                interval = self.scheduler.tick()
                interval = interval + drift if interval else interval
                # 如果大于0
                if interval and interval > 0:
                    debug('beat: Waking up %s.',
                          humanize_seconds(interval, prefix='in '))
                    # 休眠
                    time.sleep(interval)
                    if self.scheduler.should_sync():
                        self.scheduler._do_sync()
        except (KeyboardInterrupt, SystemExit):
            self._is_shutdown.set()
        finally:
            self.sync()

这里重点看 self.scheduler.tick() 方法:

    # Python 实用宝典
    # https://pythondict.com
    def tick(self):
        """Run a tick, that is one iteration of the scheduler.

        Executes all due tasks.

        """
        remaining_times = []
        try:
            # 遍历每个周期任务设定
            for entry in values(self.schedule):
                # 下次运行时间
                next_time_to_run = self.maybe_due(entry, self.publisher)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            pass

        return min(remaining_times + [self.max_interval])

这里通过 self.schedule 拿到了所有存放在用 shelve 写入的 celerybeat-schedule 文件的定时任务,遍历所有定时任务,调用 self.maybe_due 方法:

    # Python 实用宝典
    # https://pythondict.com
    def maybe_due(self, entry, publisher=None):
        # 是否到达运行时间
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            # 打印任务发送日志
            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                # 执行任务
                result = self.apply_async(entry, publisher=publisher)
            except Exception as exc:
                error('Message Error: %s\n%s',
                      exc, traceback.format_stack(), exc_info=True)
            else:
                debug('%s sent. id->%s', entry.task, result.id)
        return next_time_to_run

可以看到,此处会判断任务是否到达定时时间,如果是的话,会调用 apply_async 调用Worker执行任务。如果不是,则返回下次运行时间,让 Beat 进程进行 Sleep,减少进程资源消耗。

到此,我们就讲解完了 Celery Beat 在周期定时任务的检测调度机制,怎么样,小伙伴们有没有什么疑惑?可以在下方留言区留言一起讨论哦。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Django Celery 异步与定时任务实战教程

Django与Celery是基于Python进行Web后端开发的核心搭配,在运营开发(即面向企业内部)的场景中非常常见。

下面是基于Django的Celery异步任务和定时任务的实战教程,大家觉得有用的话点个赞/在看吧!

1.配置Django Celery

配置celery主要有几点:

1. 在settings.py的同级目录下,创建celery.py文件(名字自己随意取),这个文件主要是用来生成celery的实例app.

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'NBAsite.settings')

app = Celery('NBAsite',broker='redis://localhost:6379/0',backend='redis://localhost')
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

我们将 celery 实例的 broker 和 backend 都设为了redis.

其中 broker 的意思是“经纪人”,像股票经纪人一样,是用于促成“交易”的,Celery中它的职责就是给 worker 推送任务。

而backend的职责是存放执行信息和结果,这些数据需要被持久化存于数据库。但为了简化问题,我们将其与broker一样放置于redis当中。

2. 需要你在自己已经创建的app(不是celery的app,而是django项目的app)目录下面,创建task.py文件(这个文件名只能是这个)

因为Celery会统一从每个app下面的tasks里面监听任务。

3. 编写tasks.py的任务

看一下tasks内部的任务如何写:

from __future__ import absolute_import, unicode_literals
from NBAsite.celery import app
from celery import shared_task
import time

@shared_task
def waste_time():
    time.sleep(3)
    return "Run function 'waste_time' finished."

任务的目标是延迟3秒后,返回一个语句。

4. init.py中的设置

这个是非常关键的一点,如何让django在启动的时候,也把celery给启动了呢?
答案是在项目的init文件内,导入celery的app

from __future__ import absolute_import, unicode_literals
import pymysql

from .celery import app as celery_app

pymysql.install_as_MySQLdb()
__all__ = ('celery_app',)

2.Django 其他配置

为了能够触发该异步任务,我们接下来配置一些常规文件,views和url,首先是views函数:

from .tasks import waste_time

def test_c(request):
    result = waste_time.delay().get()
    return JsonResponse({'status':'successful'})

然后是url:

path('test_c', test_c, name='test_c'),

3.进行测试

首先,运行django项目

python manage.py runserver

这样,django项目和celery的app就被一起启动了,但是这个时候是无法执行这个task的,因为worker没有被启动,我们可以试一下:

访问http://127.0.0.1:8000/stats/test_c 会得到以下报错:

正确的姿势是怎么样的?需要先激活worker,然后再访问API:

celery -A NBAsite worker -l info

从上图下方的log信息里可以看到,在延迟了3秒后,任务启动并返回字符串,而在页面上,也可以看到成功返回。

需要注意的是,如果你修改了tasks的内容,是需要重启celery才能生效的,最简单的方法就是重启django项目。

这样,我们就完成了简单的异步任务的配置和使用。

4.定时任务配置

在异步任务中,我们只用到了worker,而在定时任务中,还要用到celery的beat调度器。

首先来看下如何配置定时任务,或者说如何配置这个调度器。

还是在celery.py里面进行配置:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'NBAsite.settings')

app = Celery('NBAsite',broker = 'redis://localhost:6379/0',backend='redis://localhost')

app.config_from_object('django.conf:settings',namespace='CELERY')

app.conf.beat_schedule ={
        'autosc':{                           
            'task':'stats.tasks.auto_sc',    
            'schedule':crontab(hour=20,minute=47),   
        },
}
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

重点是增加了app.conf.beat_schedule这个定时任务配置,指定了 stats 文件夹下 tasks.py 中的auto_sc函数,定时于20:47分执行。

5.具体任务页面tasks

增加一个对应要做定时任务的task

@shared_task
def auto_sc():
    print ('sc test?')
    return 'halo'

6.运行命令和结果

命令的话可以将激活worker和激活beat合并在一起,如下:

celery -A NBAsite worker -B -l info

不过,windows不被允许这么使用,因此在windows环境下,你需要同时打开worker和beater:

celery -A NBAsite worker -l info
celery -A NBAsite beat -l info

看上图下方的log可知定时任务被成功执行。

参考资料:
https://www.jianshu.com/p/173070bcdfaf
https://www.jianshu.com/p/ee32074a10de

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么?Python Celery 也能调度Go worker?

我们曾经研究过如何让Python和Go互相调度,当时发现,将Go语言写的模块打包成动态链接库,就能在Python中进行调度:

优劣互补! Python+Go结合开发的探讨

Go的优势很明显,从1亿减到1,在我的设备上测试,用Go运行只需要50ms,Python可能需要接近100倍的时间。

但是,这种写法也有缺点:实在太麻烦了,大大增加了整个项目的耦合性。

那Python中有没有办法不通过打包成动态链接库的方法,用Python调度Go的任务呢?答案是Go celery.

https://github.com/gocelery/gocelery

我们可以用Go写一个计算密集型任务的Worker,然后用Python的Celery beat来调度这个Worker,下面给大家演示一下:

1.编写Go Worker

最好是将计算密集型的任务改造成Go语言版的,这样收益才能最大化。

比如这里,我使用的是上回从1亿减到1的老梗。

// 文件名: main.go
// Python实用宝典
package main

import (
	"fmt"
	"time"

	"github.com/gocelery/gocelery"
	"github.com/gomodule/redigo/redis"
)

func minus() {
    start := time.Now()
    decrement(100000000)
    fmt.Println(time.Since(start))
}

func decrement(n int) {
    for n > 0 {
        n -= 1
    }
}


func main() {

	// create redis connection pool
	redisPool := &redis.Pool{
		MaxIdle:     3,                 // maximum number of idle connections in the pool
		MaxActive:   0,                 // maximum number of connections allocated by the pool at a given time
		IdleTimeout: 240 * time.Second, // close connections after remaining idle for this duration
		Dial: func() (redis.Conn, error) {
			c, err := redis.DialURL("redis://")
			if err != nil {
				return nil, err
			}
			return c, err
		},
		TestOnBorrow: func(c redis.Conn, t time.Time) error {
			_, err := c.Do("PING")
			return err
		},
	}

	// initialize celery client
	cli, _ := gocelery.NewCeleryClient(
		gocelery.NewRedisBroker(redisPool),
		&gocelery.RedisCeleryBackend{Pool: redisPool},
		5, // number of workers
	)

	// register task
	cli.Register("go_tasks.minus", minus)

	// start workers (non-blocking call)
	cli.StartWorker()

	// wait for client request
	time.Sleep(1000 * time.Second)

	// stop workers gracefully (blocking call)
	cli.StopWorker()
}

输入命令:

go run main.go

即可运行该worker

2.编写Python客户端

# 文件名: go_tasks.py
# Python实用宝典
from celery import Celery

app = Celery('go_tasks',broker='redis://127.0.0.1:6379')

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ENABLE_UTC=True,
    CELERY_TASK_PROTOCOL=1,
)


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每5秒调度一次1亿减到1,不过不跑Python worker,
    # 由于Go Worker在运行,这里的minus会被Go Worker消费。
    sender.add_periodic_task(5.0, minus.s())


@app.task
def minus():
    x = 100000000
    while x > 1:
        x = x-1

每5秒调度一次1亿减到1,不过不跑Python worker. 由于Go Worker在运行,这里的minus会被Go Worker消费。

另外请注意,这里的minus函数实际上只是为了能被识别到而编写的,其内容毫无意义,直接写个pass都没问题(因为实际上是Go Worker在消费)。

编写完后,针对go_tasks模块启动beat:

celery -A go_tasks beat

此时,调度器就会调度Go Worker执行任务:

可以看到,我们成功用Python的Celery Beat调度了Go写的Worker!可喜可贺。

接下来可以看看如果单纯用Python的Worker做这样的计算是有多耗时:

# 文件名: python_tasks
# Python实用宝典
from celery import Celery

app = Celery('python_tasks')


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(1.0, minus.s())

@app.task
def minus():
    x = 100000000
    while x > 1:
        x = x-1

启动worker:

celery worker -A python_tasks -l info --pool=eventlet

启动beat调度器:

celery -A python_tasks beat

结果如下:

可以看到,Python从1亿减到1平均需要5.2秒左右的时间,和Go版相差了100倍左右。

如果我们将调度器的频率提高到每秒计算1次,Python版的Worker,其任务队列一定会堵塞,因为Worker消费能力不够强大。相比之下,Go版的Worker可就非常给力了。

因此,如果你的项目中有这种计算密集型的任务,可以尝试将其提取成Go版本试试,说不定有惊喜呢。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python celery异步快速下载股票数据

上一篇股票文章中,我们讲了如何通过tushare下载股票数据,并存入mongodb:

Python 获取股票数据并存入MongoDB实战教程

其中有非常大的优化空间,比如,单线程的下载速度太慢了,能不能用多线程的方式?有时候网络连接会失败,能不能增加重试机制?

这些问题,我们将在这篇文章里进行优化。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上噢,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),准备开始输入命令安装依赖。

当然,我更推荐大家用VSCode编辑器,把本文代码Copy下来,在编辑器下方的终端运行命令安装依赖模块,多舒服的一件事啊:Python 编程的最好搭档—VSCode 详细指南。

此外,本文章是 Python 获取股票数据并存入MongoDB实战教程 的优化版,请根据该文章提前装好tushare和mongodb.

除此之外,你还需要安装celery和eventlet:

pip install celery
pip install eventlet

看到 Successfully installed xxx 则说明安装成功。

2.使用Celery异步下载股票数据

Celery是一个强大的异步任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。

我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

为了能异步下载股票,我们需要将上篇文章的内容拆分为两个部分:

1.Celery任务:获得某股票的日线数据
2.分发任务:分发获取指定股票的日线数据

这样,在我们的场景中,使用Celery实现异步任务主要包含三个步骤:

1.创建Celery任务实例(获得某股票的日线数据)
2.启动Celery Worker,用于运行任务
3.应用程序分发任务(分发获取指定股票的日线数据)

思路已经理清楚了,下面动手实践一下:

2.1 创建Celery任务实例:

此处大部分代码和上篇文章相似,因此下面仅显示和celery相关的核心代码:

# Python实用宝典
# https://pythondict.com
from celery import Celery

# 设置BROKER
BROKER_URL = 'mongodb://127.0.0.1:27017/celery'
# 新建celery任务
app = Celery('my_task', broker=BROKER_URL)

@app.task
def get_stock_daily(start_date, end_date, code):
    """
    Celery任务:获得某股票的日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
        code (str): 指定股票
    """


    # 请求tushare数据,并转化为json格式
    df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
    data = json.loads(df.T.to_json()).values()

    # 这里为了保证数据的绝对稳定性,选择一条条创建
    for row in data:
        daily.update({"_id": f"{row['ts_code']}-{row['trade_date']}"}, row, upsert=True)

    print(f"{code}: 插入\更新 完毕 - {start_date} to {end_date}")

2.2 启动worker

在cmd执行以下命令启动celery worker:

python -m celery worker -A tasks --loglevel=info --pool=eventlet

注意,这里使用了–pool=eventlet,是为了让windows机器具有并行运行的能力。

2.3 分发获取指定股票的日数据

遍历一遍股票列表,通过delay调用Celery任务实例,将任务分发给worker:

# Python实用宝典
# https://pythondict.com
from tasks import get_stock_daily

def delay_stock_data(start_date, end_date):
    """
    获得A股所有股票日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
    """

    codes = open('./codes.csv', 'r', encoding='utf-8').readlines()

    # 遍历所有股票ID
    for code in codes:
        get_stock_daily.delay(start_date, end_date, code)

delay_stock_data("20180101", "20200725")

这样,worker就会在后台异步执行这些任务,切换到worker的命令行中,你会看到输出如丝般润滑:

好景不长,不久后你肯定会受到tushare发送回来的CPS限制错误:

Exception: 抱歉,您每分钟最多访问该接口800次,权限的具体详情访问:https://tushare.pro/document/1?doc_id=108。

3.限制访问次数与重试机制

为了解决这个CPS问题,我确实花了不少时间,尝试控制worker频率,无果,最终选择了一个不是办法的办法:

在Tushare报错的时候,捕捉起来,控制其60秒后重试

    # 请求tushare数据,并转化为json格式
    try:
        df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
    except Exception as e:
        # 触发普通用户CPS限制,60秒后重试
        print(e)
        get_stock_daily.retry(countdown=60)

简单,但有效。

此外,为了防止网络请求出现问题,导致某个任务丢失,我们还可以在下发任务的时候使用apply_async配置失败重试。默认重试三次:

def delay_stock_data(start_date, end_date):
    """
    获得A股所有股票日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
    """

    codes = open('./codes.csv', 'r', encoding='utf-8').readlines()

    # 遍历所有股票ID
    for code in codes:
        get_stock_daily.apply_async(
            (start_date, end_date, code), retry=True
        )

这样,一个相对健壮的股票数据异步下载器就完成了。

用该方法把A股所有股票下载一遍,应该不会超过5分钟。

如果你给tushare氪了金,没有cps限制,下载时间不会超过1分钟。

目前github上好像缺少这样的项目,因此我将该项目开源到了GitHub上:

https://github.com/Ckend/stock_download_celery

目前仅支持日线数据,欢迎补充。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典