以前版本的 Celery 需要一個單獨的庫(django-celery)才能與 Django 一起作業, 但從 Celery 3.1 開始,情況便不再如此,我們可以直接通過 Celery 庫來完成在 Django 中的任務,
安裝 Redis 服務端
以 Docker 安裝為例,安裝一個密碼為 mypassword 的 Redis 服務端
docker run -itd --name redis -p 127.0.0.1:6379:6379 redis:alpine redis-server --requirepass mypassword
在 Python 中安裝 Celery 和 Redis
pip install celery redis
在 Django 專案中添加 Celery 配置
在 Django 專案中創建一個 celery.py 檔案,并配置 Celery 應用程式,這個檔案應該與 settings.py 檔案位于同一目錄下:
import os
from celery import Celery
# 設定 Django 的默認環境變數
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
app = Celery('myproject')
# 使用 Django 的 settings.py 檔案配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
# 從所有已安裝的應用中自動發現并加載任務模塊
app.autodiscover_tasks()
然后在 settings.py 檔案中添加配置:
# 使用 Redis 作為訊息代理(broker)來傳遞任務訊息,連接地址為 localhost:6379/0,并提供密碼 mypassword 進行身份驗證,
CELERY_BROKER_URL = 'redis://:mypassword@localhost:6379/0'
# 使用 Redis 作為結果存盤后端,連接地址同上,使用相同的密碼進行身份驗證,
CELERY_RESULT_BACKEND = 'redis://:mypassword@localhost:6379/0'
# 指定發送到代理(broker)的任務訊息序列化格式為 JSON 格式,
CELERY_TASK_SERIALIZER = 'json'
# 指定從結果后端獲取的結果序列化格式為 JSON 格式,
CELERY_RESULT_SERIALIZER = 'json'
# 指定支持接收的內容型別為 JSON 格式,
CELERY_ACCEPT_CONTENT = ['json']
# 將時區設定為亞洲/上海時區,
CELERY_TIMEZONE = 'Asia/Shanghai'
# 啟用 UTC 時間,
CELERY_ENABLE_UTC = True
在 Django 應用程式中創建一個 tasks.py 檔案,并撰寫要運行的任務函式,例如,此處我們將撰寫一個名為 send_email() 的任務,來定期發送電子郵件:
from django.core.mail import send_mail
from celery import shared_task
@shared_task
def send_email():
# 發送電子郵件的代碼
pass
如果想要實作異步任務的功能,在 Django 專案中的任何位置呼叫任務函式即可,例如,在 views.py 檔案中,我們可以從視圖函式中啟動任務,如下所示:
from myapp.tasks import send_email
def my_view(request):
send_email.delay()
return HttpResponse('任務已經在后臺執行,')
如果想要實作定時任務的功能,可以在 Celery 的組態檔中設定定時任務的調度方式,例如,要每小時運行一次 send_email() 任務,我們可以添加以下代碼:
from celery.task.schedules import crontab
app.conf.beat_schedule = {
'send-email-every-hour': {
'task': 'myapp.tasks.send_email',
'schedule': crontab(minute=0, hour='*/1'),
},
}
定時任務的具體寫法可以參考官方檔案:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab
運行 Celery-worker 與 Celery-beat
Celery是一個分布式任務佇列,由三個主要組件組成:Celery worker、Celery beat 和訊息代理(例如 Redis 或 RabbitMQ),這些組件一起協作,讓開發者能夠輕松地執行異步任務和定時任務,
Celery worker:負責接收任務請求并執行任務,當您在 Django 應用程式中呼叫 apply_async 方法時,任務將被發送到 Celery worker,然后由 worker 執行,
Celery beat:負責調度定時任務,它會根據定義的規則定期觸發任務,并將其發送到 Celery worker 處理,
所以,對于需要運行定時任務的情況,我們需要同時啟動 Celery worker 和 Celery beat 行程來確保所有任務都可以被正確地處理和執行,
如果只需要使用 Celery 來執行異步任務,那么只需啟動 Celery worker 即可,但如果需要周期性地執行任務,那么需要啟動 Celery beat 來幫助完成調度這些任務,
# 運行 worker 與 beat
celery -A proj worker --loglevel=info --detach --pidfile=worker.pid --logfile=./logs/worker.log
celery -A proj beat --loglevel=info --detach --pidfile=beat.pid --logfile=./logs/beat.log
- -A proj:指定 Celery 應用程式所在的模塊或包,這里假設其名為 proj,
- worker 或 beat:啟動的行程名稱,分別對應 worker 和 beat 兩種型別的 Celery 行程,
- --loglevel=info:設定日志級別為 info,即只記錄 info 級別及以上的日志資訊,
- --detach:以守護行程(daemonized)方式啟動 Celery 行程,使其在后臺運行,
- --pidfile=worker.pid 或 --pidfile=beat.pid:將行程 ID(PID)寫入指定的 PID 檔案,方便后續管理和監控,
- --logfile=./logs/worker.log 或 --logfile=./logs/beat.log:指定日志檔案路徑,所有日志資訊都會輸出到該檔案中,
隨后我們設定的定時任務便會按規則執行,可以通過指定的日志檔案查看執行結果,當我們需要停止 Celery worker 與 Celery beat 時,可以執行以下操作:
kill -TERM $(cat worker.pid)
kill -TERM $(cat beat.pid)
參考
[1] [Using Celery with Django]: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#using-celery-with-django
[2] [Periodic Tasks]: https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/551159.html
標籤:Python
上一篇:簡述PHP中trait的使用和同時引入多個trait時同名方法沖突的處理
下一篇:返回列表
