一、簡介
Celery是由Python開發、簡單、靈活、可靠的分布式任務佇列,其本質是生產者消費者模型,生產者發送任務到訊息佇列,消費者負責處理任務,Celery側重于實時操作,但對調度支持也很好,其每天可以處理數以百萬計的任務,特點:
簡單:熟悉celery的作業流程后,配置使用簡單
高可用:當任務執行失敗或執行程序中發生連接中斷,celery會自動嘗試重新執行任務
快速:一個單行程的celery每分鐘可處理上百萬個任務
靈活:幾乎celery的各個組件都可以被擴展及自定制
應用場景舉例:
1.web應用:當用戶在網站進行某個操作需要很長時間完成時,我們可以將這種操作交給Celery執行,直接回傳給用戶,等到Celery執行完成以后通知用戶,大大提好網站的并發以及用戶的體驗感,
2.任務場景:比如在運維場景下需要批量在幾百臺機器執行某些命令或者任務,此時Celery可以輕松搞定,
3.定時任務:向定時導資料報表、定時發送通知類似場景,雖然Linux的計劃任務可以幫我實作,但是非常不利于管理,而Celery可以提供管理介面和豐富的API,
二、架構&作業原理
Celery由以下三部分構成:訊息中間件(Broker)、任務執行單元Worker、結果存盤(Backend),如下圖:

作業原理:
任務模塊Task包含異步任務和定時任務,其中,異步任務通常在業務邏輯中被觸發并發往訊息佇列,而定時任務由Celery Beat行程周期性地將任務發往訊息佇列;
任務執行單元Worker實時監視訊息佇列獲取佇列中的任務執行;
Woker執行完任務后將結果保存在Backend中;
訊息中間件Broker
訊息中間件Broker官方提供了很多備選方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached 等,官方推薦RabbitMQ,
任務執行單元Worker
Worker是任務執行單元,負責從訊息佇列中取出任務執行,它可以啟動一個或者多個,也可以啟動在不同的機器節點,這就是其實作分布式的核心,
結果存盤Backend
Backend結果存盤官方也提供了諸多的存盤方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch,
celery.py
此檔案跟settings檔案同級
# 將代碼里的runyi,改成自己的專案名
import os
from celery import Celery
from celery.utils.log import get_task_logger
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'runyi.settings')
app = Celery('runyi')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
settings檔案的配置
# Celery settings
# 本地連接有可能會導致celery連接錯誤,將CELERY_BROKER_URL和CELERY_RESULT_BACKEND,加入到本地環境變數,值為:'redis://localhost'
# 經濟人
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://:'+ REDIS_PASSWORD + '@127.0.0.1' + ':' + REDIS_PORT+ '/' +REDIS_DB)
# 存盤結果后端
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'redis://:'+ REDIS_PASSWORD + '@127.0.0.1' + ':' + REDIS_PORT+ '/' +REDIS_DB)
# 時區
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_TASK_SERIALIZER = 'json'
各個子應用下如果需要異步任務,創建task.py檔案
task.py
import logging
import time
from celery import shared_task
# from management.aliyun.utils import bytes_to_oss
# 如果logging.getLogger()加上子應用,日志會生成2行一模一樣的
logger = logging.getLogger()
#測驗celery異步
@shared_task
def test_celery(str):
logger.info('當前已經進入到celery中')
time.sleep(5)
logger.info('傳遞過來的值為:%s'%str)
logger.info('從celery中出來了')
在views.py檔案中呼叫
class Test_token(APIView):
def get(self,request):
mes = {'code':200}
test_celery.apply_async(args=['hello,celery'])
return Response(mes)
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/248077.html
標籤:其他
上一篇:LNMP架構部署(附:部署Discuz社區論壇Web應用)
下一篇:搭建LNMP基礎框架
