环境准备

  • Python 2.7
  • Djang 1.10
  • celery 4.1.0
  • RabbitMQ 3.2.4

开始

创建虚拟环境,构建 Django 项目

$ virtualenv django-exp
$ source django-exp/bin/activate
$ pip intall django==1.10
$ django-admin startproject myproject
$ cd myproject/
$ django-admin startapp myapp
$ tree
.
├── manage.py
├── myapp
│   ├── admin.py
│   ├── apps.py
│   ├── __init__.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tests.py
│   └── views.py
└── myproject
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
$ pip install celery
$ pip install pysqlite
$ sudo apt-get install rabbitmq-server

配置 Celery

  • myproject/celery.py
$ cat myproject/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
  • myproject/__init__.py
$ cat myproject/__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
  • myapp/tasks.py
$ cat myapp/tasks.py
from __future__ import absolute_import, unicode_literals
from celery.task.schedules import crontab
from celery.decorators import periodic_task
@periodic_task(run_every=(crontab(minute="*/1"))) # 每隔一分钟写入 hello world
def hello_world():
with open("/tmp/output.txt", "a") as f:
f.write("hello world")
f.write("\n")
  • myproject/setting.py
$ cat myproject/setting.py
...
CELERY_BROKER_URL = 'amqp://localhost'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'myapp',
]
...
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
...

同步数据库

$ python manage.py makemigrations
$ python manage.py migrate

启动 worker 和 beat

$ celery -A myproject worker -l info -B
...
[2018-02-25 16:59:00,000: INFO/Beat] Scheduler: Sending due task myapp.tasks.hello_world (myapp.tasks.hello_world)
[2018-02-25 16:59:00,008: INFO/MainProcess] Received task: myapp.tasks.hello_world[e1aa8b70-586d-4a2b-b598-64c322b211a7]
[2018-02-25 16:59:00,010: INFO/MainProcess] Task myapp.tasks.hello_world[e1aa8b70-586d-4a2b-b598-64c322b211a7] succeeded in 0.00109826028347s: None
[2018-02-25 17:00:00,056: INFO/Beat] Scheduler: Sending due task myapp.tasks.hello_world (myapp.tasks.hello_world)
[2018-02-25 17:00:00,058: INFO/MainProcess] Received task: myapp.tasks.hello_world[78ec725f-61b7-4fe6-a1c1-b2c60fb9ffed]
[2018-02-25 17:00:00,060: INFO/MainProcess] Task myapp.tasks.hello_world[78ec725f-61b7-4fe6-a1c1-b2c60fb9ffed] succeeded in 0.000685669481754s: None
$ tail /tmp/output.txt
hello world
hello world
hello world
hello world
hello world

除了使用该方法,我们还可以通过 admin 后台设置计划任务,不过需要安装 django-celery 并设置 Database-backed,具体请参考官方文档。

参考

Using Celery with Django

How to install Celery on Django and Create a Periodic Task

Celery 4 Periodic Task in Django