django配合celery开发分布式应用
需要安装的包
pip install -y celery djcelery
django设置celery配置
在setting.py中添加celery需要的相关配置
BROKER_URL='redis://192.168.5.203:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.5.203:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
celery主要有三部分组成,broker,消费者和生产者 生产者生产任务,传入 到broker。 消费者从borker处接受到任务,然后开始执行 其中broker支持多种消息中间件,例如RabbitMQ,Redis等
设置celery app
在项目的根目录下面新建文件,命名为celery_task.py(此处不能命名为celery.py,因为一会我们还会从celery导入包,这样 会导致导入失败)
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTING_MODULE','ITPlatform.settings')
##设置django的setting所在位置
app = Celery('Nebula')
##新建app
app.config_from_object('django.conf:settings')
##设置app从setting读取配置信息
app.autodiscover_tasks(['Nebula'])
##设置app自动发现的模块
自动引入app
在需要自动发现task的模块的init文件中添加以下内容
from celery_task import app as celery_app
这样在启动的时候就能自动的引入celery的app了 如果不做的话,可能会出现worker起来之后,发现不了task
创建任务
我们常见的消费者主要有两种,一种是基于方法的,一种是基于class的 基于方法的消费者比较简单,只需要使用装饰器就能让celery发现到
from celery import task
@task()
def startApprove(approveId):
print 'celery is working'
基于class的消费者有多种方法能够实现,我只找到了一种对我来说改造成本不高的方法,那就是新建一个class继承与Task,然后里面书写方法
from celery import task, Task
from ITPlatform.celery_task import app
class changeStep(Task):
def run(self, approveId, step):
'''
发起更改步骤异步任务
:param approveId:
:param step:
:return:
'''
approve().changeStep(approveId, step)
ChangeStep = changeStep()
app.tasks.register(ChangeStep)
然后我们生成一个该class的实例,并将这个实例注册到我们上面创建的app里面 在celery启动之后,我们就能看到我们继承与Task的这个class了
创建生产者
生产者的创建十分简单,只需要引入这个函数,然后使用delay函数就行
from approve import startApprove
startApprove.delay(ticketId)
class的生产者创建
ChangeStep.delay(ticketId, step + 1)
启动celery
在项目的根目录输入
python manage.py celery worker --loglevel=info
usr/lib/python2.7/site-packages/requests/__init__.py:80: RequestsDependencyWarning: urllib3 (1.22) or chardet (2.2.1) doesn't match a supported version!
RequestsDependencyWarning)
/usr/lib/python2.7/site-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!
Please specify a different user using the -u option.
User information: uid=0 euid=0 gid=0 egid=0
uid=uid, euid=euid, gid=gid, egid=egid,
-------------- celery@202 v3.1.25 (Cipater)
---- **** -----
--- * *** * -- Linux-3.10.0-693.el7.x86_64-x86_64-with-centos-7.4.1708-Core
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: Nebula:0x3519990
- ** ---------- .> transport: redis://192.168.5.203:6379//
- ** ---------- .> results: redis://192.168.5.203:6379/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
[tasks]
. ITPlatform.celery_task.debug_task
. Nebula.approve.changeStep
. Nebula.approve.startApprove
. Nebula.approve.stopApprove
[2018-04-02 10:40:01,370: INFO/MainProcess] Connected to redis://192.168.5.203:6379//
[2018-04-02 10:40:01,379: INFO/MainProcess] mingle: searching for neighbors
[2018-04-02 10:40:02,432: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: celery@0e7e76ed0bdf.
Please make sure you give each node a unique nodename using the `-n` option.
pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
[2018-04-02 10:40:02,432: INFO/MainProcess] mingle: sync with 1 nodes
[2018-04-02 10:40:02,432: INFO/MainProcess] mingle: sync complete
[2018-04-02 10:40:02,442: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-04-02 10:40:02,443: WARNING/MainProcess] celery@202 ready.