IOTXING

记录技术学习之路

0%

django开发分布式应用

django配合celery开发分布式应用

需要安装的包

pip install -y celery djcelery

django设置celery配置

在setting.py中添加celery需要的相关配置

BROKER_URL='redis://192.168.5.203:6379'
CELERY_RESULT_BACKEND = 'redis://192.168.5.203:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'

celery主要有三部分组成,broker,消费者和生产者 生产者生产任务,传入 到broker。 消费者从borker处接受到任务,然后开始执行 其中broker支持多种消息中间件,例如RabbitMQ,Redis等

设置celery app

在项目的根目录下面新建文件,命名为celery_task.py(此处不能命名为celery.py,因为一会我们还会从celery导入包,这样 会导致导入失败)

import os
from celery import  Celery
os.environ.setdefault('DJANGO_SETTING_MODULE','ITPlatform.settings')
##设置django的setting所在位置
app = Celery('Nebula')
##新建app
app.config_from_object('django.conf:settings')
##设置app从setting读取配置信息
app.autodiscover_tasks(['Nebula'])
##设置app自动发现的模块

自动引入app

在需要自动发现task的模块的init文件中添加以下内容

from celery_task import app as celery_app

这样在启动的时候就能自动的引入celery的app了 如果不做的话,可能会出现worker起来之后,发现不了task

创建任务

我们常见的消费者主要有两种,一种是基于方法的,一种是基于class的 基于方法的消费者比较简单,只需要使用装饰器就能让celery发现到

from celery import task

@task()
def startApprove(approveId):

    print 'celery is working'

基于class的消费者有多种方法能够实现,我只找到了一种对我来说改造成本不高的方法,那就是新建一个class继承与Task,然后里面书写方法

from celery import task, Task
from ITPlatform.celery_task import app

class changeStep(Task):
    def run(self, approveId, step):
        '''
        发起更改步骤异步任务
        :param approveId:
        :param step:
        :return:
        '''
        approve().changeStep(approveId, step)

ChangeStep = changeStep()
app.tasks.register(ChangeStep)

然后我们生成一个该class的实例,并将这个实例注册到我们上面创建的app里面 在celery启动之后,我们就能看到我们继承与Task的这个class了

创建生产者

生产者的创建十分简单,只需要引入这个函数,然后使用delay函数就行

from approve import startApprove

startApprove.delay(ticketId)

class的生产者创建

ChangeStep.delay(ticketId, step + 1)

启动celery

在项目的根目录输入

python manage.py celery worker --loglevel=info


usr/lib/python2.7/site-packages/requests/__init__.py:80: RequestsDependencyWarning: urllib3 (1.22) or chardet (2.2.1) doesn't match a supported version!
  RequestsDependencyWarning)
/usr/lib/python2.7/site-packages/celery/platforms.py:812: RuntimeWarning: You are running the worker with superuser privileges, which is
absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

  uid=uid, euid=euid, gid=gid, egid=egid,

 -------------- celery@202 v3.1.25 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.10.0-693.el7.x86_64-x86_64-with-centos-7.4.1708-Core
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         Nebula:0x3519990
- ** ---------- .> transport:   redis://192.168.5.203:6379//
- ** ---------- .> results:     redis://192.168.5.203:6379/
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . ITPlatform.celery_task.debug_task
  . Nebula.approve.changeStep
  . Nebula.approve.startApprove
  . Nebula.approve.stopApprove

[2018-04-02 10:40:01,370: INFO/MainProcess] Connected to redis://192.168.5.203:6379//
[2018-04-02 10:40:01,379: INFO/MainProcess] mingle: searching for neighbors
[2018-04-02 10:40:02,432: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: celery@0e7e76ed0bdf.
Please make sure you give each node a unique nodename using the `-n` option.
  pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)),
[2018-04-02 10:40:02,432: INFO/MainProcess] mingle: sync with 1 nodes
[2018-04-02 10:40:02,432: INFO/MainProcess] mingle: sync complete
[2018-04-02 10:40:02,442: WARNING/MainProcess] /usr/lib/python2.7/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments!
  warnings.warn('Using settings.DEBUG leads to a memory leak, never '
[2018-04-02 10:40:02,443: WARNING/MainProcess] celery@202 ready.