您如何对Celery任务进行单元测试?

问题:您如何对Celery任务进行单元测试?

Celery文档提到了在Django中测试Celery,但没有解释如果不使用Django,如何测试Celery任务。你怎么做到这一点?

The Celery documentation mentions testing Celery within Django but doesn’t explain how to test a Celery task if you are not using Django. How do you do this?


回答 0

可以使用任何unittest库同步测试任务。在处理Celery任务时,我通常会进行2次不同的测试。第一个(如我所建议的,下面是波纹管)是完全同步的,应该确保算法执行应做的工作。第二部分使用整个系统(包括代理),并确保我没有序列化问题或任何其他分发,通信问题。

所以:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

和您的测试:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

希望有帮助!

It is possible to test tasks synchronously using any unittest lib out there. I normaly do 2 different test sessions when working with celery tasks. The first one (as I’m suggesting bellow) is completely synchronous and should be the one that makes sure the algorithm does what it should do. The second session uses the whole system (including the broker) and makes sure I’m not having serialization issues or any other distribution, comunication problem.

So:

from celery import Celery

celery = Celery()

@celery.task
def add(x, y):
    return x + y

And your test:

from nose.tools import eq_

def test_add_task():
    rst = add.apply(args=(4, 4)).get()
    eq_(rst, 8)

Hope that helps!


回答 1

我用这个:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

文件:http : //docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER使您可以同步运行任务,并且不需要Celery服务器。

I use this:

with mock.patch('celeryconfig.CELERY_ALWAYS_EAGER', True, create=True):
    ...

Docs: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager

CELERY_ALWAYS_EAGER lets you run your task synchronous, and you don’t need a celery server.


回答 2

取决于您要测试的内容。

  • 直接测试任务代码。不要调用“ task.delay(…)”,而只是在单元测试中调用“ task(…)”。
  • 使用CELERY_ALWAYS_EAGER。这将导致您的任务在您说“ task.delay(…)”时立即被调用,因此您可以测试整个路径(但不能测试任何异步行为)。

Depends on what exactly you want to be testing.

  • Test the task code directly. Don’t call “task.delay(…)” just call “task(…)” from your unit tests.
  • Use CELERY_ALWAYS_EAGER. This will cause your tasks to be called immediately at the point you say “task.delay(…)”, so you can test the whole path (but not any asynchronous behavior).

回答 3

单元测试

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test装置

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

附录:让send_task尊重

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

unittest

import unittest

from myproject.myapp import celeryapp

class TestMyCeleryWorker(unittest.TestCase):

  def setUp(self):
      celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

py.test fixtures

# conftest.py
from myproject.myapp import celeryapp

@pytest.fixture(scope='module')
def celery_app(request):
    celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
    return celeryapp

# test_tasks.py
def test_some_task(celery_app):
    ...

Addendum: make send_task respect eager

from celery import current_app

def send_task(name, args=(), kwargs={}, **opts):
    # https://github.com/celery/celery/issues/581
    task = current_app.tasks[name]
    return task.apply(args, kwargs, **opts)

current_app.send_task = send_task

回答 4

对于Celery 4上的用户:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

由于设置名称已更改,如果您选择升级,则需要更新,请参见

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names

For those on Celery 4 it’s:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)

Because the settings names have been changed and need updating if you choose to upgrade, see

https://docs.celeryproject.org/en/latest/history/whatsnew-4.0.html?highlight=what%20is%20new#lowercase-setting-names


回答 5

Celery 3.0开始CELERY_ALWAYS_EAGERDjango中进行设置的一种方法是:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

As of Celery 3.0, one way to set CELERY_ALWAYS_EAGER in Django is:

from django.test import TestCase, override_settings

from .foo import foo_celery_task

class MyTest(TestCase):

    @override_settings(CELERY_ALWAYS_EAGER=True)
    def test_foo(self):
        self.assertTrue(foo_celery_task.delay())

回答 6

从Celery v4.0开始提供了py.test固定装置来启动celery工人只是为了进行测试,并在完成后将其关闭:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test中描述的其他灯具中,您可以通过celery_config以下方式重新定义灯具来更改celery的默认选项:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

默认情况下,测试人员使用内存中的代理和结果后端。如果不测试特定功能,则无需使用本地Redis或RabbitMQ。

Since Celery v4.0, py.test fixtures are provided to start a celery worker just for the test and are shut down when done:

def test_myfunc_is_executed(celery_session_worker):
    # celery_session_worker: <Worker: gen93553@gnpill.local (running)>
    assert myfunc.delay().wait(3)

Among other fixtures described on http://docs.celeryproject.org/en/latest/userguide/testing.html#py-test, you can change the celery default options by redefining the celery_config fixture this way:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'accept_content': ['json', 'pickle'],
        'result_serializer': 'pickle',
    }

By default, the test worker uses an in-memory broker and result backend. No need to use a local Redis or RabbitMQ if not testing specific features.


回答 7

使用pytest 参考

def test_add(celery_worker):
    mytask.delay()

如果您使用烧瓶,请设置应用配置

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

和在 conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

reference using pytest.

def test_add(celery_worker):
    mytask.delay()

if you use flask, set the app config

    CELERY_BROKER_URL = 'memory://'
    CELERY_RESULT_BACKEND = 'cache+memory://'

and in conftest.py

@pytest.fixture
def app():
    yield app   # Your actual Flask application

@pytest.fixture
def celery_app(app):
    from celery.contrib.testing import tasks   # need it
    yield celery_app    # Your actual Flask-Celery application

回答 8

就我而言(我假设还有很多其他人),我想要做的就是使用pytest测试任务的内部逻辑。

TL; DR; 最终嘲笑了一切(选项2


示例用例

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

但是,由于shared_task装饰器执行了许多Celery内部逻辑操作,因此它实际上不是单元测试。

因此,对我来说,有2种选择:

选项1:独立的内部逻辑

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

这看起来很奇怪,除了使可读性降低之外,它还需要手动提取并传递属于请求的属性,例如task_id在您需要的情况下,这会使逻辑不那么纯净。

选项2:模拟
嘲笑Celery内部

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

然后,它允许我模拟请求对象(同样,如果您需要请求中的内容,例如ID或重试计数器)。

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

该解决方案更加手动,但是,它为我提供了实际进行单元测试所需的控制,而无需重复自己,也不会丢失Celery范围。

In my case (and I assume many others), all I wanted was to test the inner logic of a task using pytest.

TL;DR; ended up mocking everything away (OPTION 2)


Example Use Case:

proj/tasks.py

@shared_task(bind=True)
def add_task(self, a, b):
    return a+b;

tests/test_tasks.py

from proj import add_task

def test_add():
    assert add_task(1, 2) == 3, '1 + 2 should equal 3'

but, since shared_task decorator does a lot of celery internal logic, it isn’t really a unit tests.

So, for me, there were 2 options:

OPTION 1: Separate internal logic

proj/tasks_logic.py

def internal_add(a, b):
    return a + b;

proj/tasks.py

from .tasks_logic import internal_add

@shared_task(bind=True)
def add_task(self, a, b):
    return internal_add(a, b);

This looks very odd, and other than making it less readable, it requires to manually extract and pass attributes that are part of the request, for instance the task_id in case you need it, which make the logic less pure.

OPTION 2: mocks
mocking away celery internals

tests/__init__.py

# noinspection PyUnresolvedReferences
from celery import shared_task

from mock import patch


def mock_signature(**kwargs):
    return {}


def mocked_shared_task(*decorator_args, **decorator_kwargs):
    def mocked_shared_decorator(func):
        func.signature = func.si = func.s = mock_signature
        return func

    return mocked_shared_decorator

patch('celery.shared_task', mocked_shared_task).start()

which then allows me to mock the request object (again, in case you need things from the request, like the id, or the retries counter.

tests/test_tasks.py

from proj import add_task

class MockedRequest:
    def __init__(self, id=None):
        self.id = id or 1


class MockedTask:
    def __init__(self, id=None):
        self.request = MockedRequest(id=id)


def test_add():
    mocked_task = MockedTask(id=3)
    assert add_task(mocked_task, 1, 2) == 3, '1 + 2 should equal 3'

This solution is much more manual, but, it gives me the control I need to actually unit test, without repeating myself, and without losing the celery scope.