情景: 用戶發起request,并等待response回傳,在本些views中,可能需要執行一段耗時的程式,那么用戶就會等待很長時間,造成不好的用戶體驗,比如發送郵件、手機驗證碼等,
使用celery后,情況就不一樣了,
解決: 將耗時的程式放到celery中執行,
Celery 是一個包含一系列的訊息任務佇列,您可以不用了解內部的原理直接使用,它的使用時非常簡單的,
- 選擇并且安裝一個訊息中間件(Broker)
- 安裝 Celery 并且創建第一個任務
- 運行職程(Worker)以及呼叫任務
- 跟蹤任務的情況以及回傳值
celery官方網站: http://www.celeryproject.org/
celery中文檔案: https://www.celerycn.io/
Celery名詞:
- 任務
task:就是一個Python函式, - 佇列
queue:將需要執行的任務加入到佇列中, - 工人
worker:在一個新行程中,負責執行佇列中的任務, - 代理人
broker:負責調度,在布置環境中使用redis,
選擇中間人(Broker)
Celery 需要一個中間件來進行接收和發送訊息,通常以獨立的服務形式出現,成為 訊息中間人(Broker)
以下有幾種選擇:
- RabbitMQ:RabbitMQ 的功能比較齊全、穩定、便于安裝,在生產環境來說是首選的,
- Redis: Redis 功能比較全,但是如果突然停止運行或斷電會造成資料丟失,
安裝 Celery
可以用標準的 Python 工具,諸如 pip 或 easy_install 來安裝:
$ pip install celery
Celery單應用
創建Celery實體物件及task任務
創建第一個 Celery 實體程式,我們把創建 Celery 程式成為 Celery 應用或直接簡稱 為 app,創建的第一個實體程式可能需要包含 Celery 中執行操作的所有入口點,例如創建任務、管理職程(Worker)等,所以必須要匯入 Celery 模塊,
在本教程中將所有的內容,保存為一個 app 檔案中,針對大型的專案,可能需要創建 獨立的模塊,
首先創建 utils/tasks.py:
import os
import sys
import django
from celery import Celery
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
project_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 確定專案路徑
sys.path.append(project_path) # 將專案路徑添加到系統搜索路徑中
os.environ['DJANGO_SETTINGS_MODULE'] = 'django_test.settings' # 設定專案的組態檔
django.setup() # Django初始化
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def task_send_email(user_id, username, email):
# 加密用戶資訊,生成token
info = {'confirm': user_id}
serializer = Serializer(settings.SECRET_KEY, 3600)
token = serializer.dumps(info).decode()
# 發送郵件
'''發送重置密碼郵件'''
reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
subject = "重置密碼"
message = '郵件正文'
sender = settings.DEFAULT_FROM_EMAIL
receiver = [email]
html_message = '<h1>{}, 您好!</h1>請在1小時內點擊下面鏈接重置密碼:<br/><a href="https://www.cnblogs.com/gaogang/p/{}">{}</a>'.format(username, reset_url, reset_url)
send_mail(subject, message, sender, receiver, html_message=html_message)
-
第一個引數為當前模塊的名稱,只有在
__main__模塊中定義任務時才會生產名稱, -
第二個引數為中間人(Broker)的鏈接 URL ,實體中使用的 RabbitMQ(Celery默認使用的也是RabbitMQ),
更多相關的 Celery 中間人(Broker)的選擇方案,可查閱上面的中間人(Broker),
例如,對于 RabbitMQ 可以寫為
amqp://localhost,使用 Redis 可以寫為redis://localhost, -
創建了一個名稱為
task_send_email的任務,實作郵件的發送,
注意: 在 單檔案中,想要匯入 Django專案的
settings.py組態檔
- 先確定Django專案所在的絕對路徑
- 追加Django所在的路徑到系統搜索路徑
- 配置系統的Django環境變數
運行 Celery 子行程(Worker)服務
現在可以使用 worker 引數進行執行我們剛剛創建職程(Worker):
注意: 此時運行celery,是在Django專案下根目錄中,*此時啟動檔案路徑必須和 呼叫 異步任務的導包路徑一致,否則,不識別*,-A celery實體名,即為當前檔案的名
$ celery -A utils.task worker --loglevel=info
Celery集成django使用
注意: 以前版本的Celery需要一個單獨的庫才能與Django一起使用,但是從3.1開始,情況不再如此,現在就已支持Django,
注意: Celery 4.0支持Django 1.8和更高版本,對于Django 1.8之前的版本,請使用Celery 3.1,
創建Celery實體物件
要將Celery與Django專案一起使用,必須首先定義Celery庫的實體(稱為 “app”)
如果您擁有當前主流的Django專案結構,例如:
- django_test/
- manage.py
- django_test/
- __init__.py
- settings.py
- urls.py
那么建議的方法是創建一個新的django_test/django_test/celery.py模塊,該模塊定義Celery實體
file: django_test/django_test/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')
app = Celery('django_test')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
Django加載Celery
然后,需要將此程式匯入到django_test/django_test/__ init__.py模塊中,這樣可以確保在Django啟動時加載應用程式,以便@shared_task裝飾器將使用該應用程式:
file:django_test/django_test/__init__.py:
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
請注意,此示例專案結構適用于較大的專案,對于簡單專案,可以使用一個包含的模塊來定義應用程式和任務,
步驟決議
讓我們分解一下第一個模塊中發生的情況,首先我們從__freture__匯入absolute_import匯入unicode_literals,以使我們的celery.py模塊不會與庫沖突:
from __future__ import absolute_import
然后,為celery命令列程式設定默認的DJANGO_SETTINGS_MODULE環境變數:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django_test.settings')
不需要此行,但可以避免始終將設定模塊傳遞到celery程式中,它必須始終在創建應用程式實體之前出現,我們接下來要做的是:
app = Celery('django_test')
這是我們的庫實體,可以有很多實體,但是使用Django時可能沒有理由, 我們還將Django設定模塊添加為Celery的配置源,這意味著不必使用多個組態檔,而直接從Django設定中配置Celery;但也可以根據需要將它們分開,
app.config_from_object('django.conf:settings', namespace='CELERY')
大寫名稱空間意味著所有Celery配置選項必須以大寫而不是小寫指定,并以CELERY_開頭,因此例如task_always_eager設定變為CELERY_TASK_ALWAYS_EAGER,broker_url設定變為CELERY_BROKER_URL,
可以直接傳遞設定物件,但是使用字串更好,因為這樣一來,作業人員就不必序列化該物件, CELERY_名稱空間也是可選的,但建議使用(以防止與其他Django設定重疊),
接下來,可重用應用程式的常見做法是在單獨的task.py模塊中定義所有任務,而Celery可以自動發現這些模塊:
app.autodiscover_tasks()
在上述行中,Celery將按照task.py約定自動發現所有已安裝應用程式中的任務:
- app1/
- tasks.py
- models.py
- app2/
- tasks.py
- models.py
這樣,不必手動將各個模塊添加到CELERY_IMPORTS設定中, 最后,debug_task示例是一個轉儲其自己的請求資訊的任務,這是使用Celery 3.1中引入的新的bind = True任務選項來輕松參考當前任務實體,
使用@shared_task裝飾器
撰寫的任務可能會存在于可重用應用程式中,并且可重用應用程式不能依賴于專案本身,因此不能直接匯入應用程式實體, @shared_task裝飾器可讓您創建任務而無需任何具體的應用程式實體:
file: user/tasks.py:
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.conf import settings
from django.core.mail import send_mail
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
@shared_task
def task_send_email(user_id, username, email):
# 加密用戶資訊,生成token
info = {'confirm': user_id}
serializer = Serializer(settings.SECRET_KEY, 3600)
token = serializer.dumps(info).decode()
# 發送郵件
'''發送重置密碼郵件'''
reset_url = "{}/user/reset/password/?token={}".format(settings.HOST_URL, token)
subject = "重置密碼"
message = '郵件正文'
sender = settings.DEFAULT_FROM_EMAIL
receiver = [email]
html_message = '<h1>{}, 您好!</h1>請在1小時內點擊下面鏈接重置密碼:<br/><a href="https://www.cnblogs.com/gaogang/p/{}">{}</a>'.format(username, reset_url,reset_url)
send_mail(subject, message, sender, receiver, html_message=html_message)
運行 Celery 子行程(Worker)服務
現在可以使用 worker 引數進行執行我們剛剛創建職程(Worker):
注意: 此時運行celery,是在Django專案下,-A celery實體名,即為Django專案的同名目錄
$ celery -A django_test worker --loglevel=info
呼叫任務
需要呼叫我們創建的實體任務,可以通過 delay() 進行呼叫,
delay() 是 apply_async() 的快捷方法,可以更好的控制任務的執行(詳情:呼叫任務:Calling Tasks):
from django.contrib.auth import authenticate
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from user.tools.auth import JSONWebTokenAuthentication # 自定義token認證類
# from .task import task_send_email # 匯入Django集成的task任務
from utils.task import task_send_email # 匯入 celery單應用的task任務
class ResetPassword(APIView):
authentication_classes = [JSONWebTokenAuthentication]
permission_classes = [IsAuthenticated]
def post(self, request):
user = request.user
task_send_email.delay(user_id=user.id, username=user.username, email=user.email)
return Response('發送成功')
該任務已經有職程(Worker)開始處理,可以通過控制臺輸出的日志進行查看執行情況,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/285823.html
標籤:其他
