前提:需要做個對某個任務暫停(撤銷)的同時且不影響其他任務繼續執行的功能
查找資料得知celery3.0版本后加有revoke 功能
celery的官方檔案對revoke的描述
下面開始
Django=3.2.8 (這個版本不重要,重要的是下面的那兩個)
django-celery=3.3.1
celery=3.1.26.post2
兩個celery版本不搭的話用起來很麻煩
django-celery這個包支持的最大celery好像就是celery3…
他這個django-celery好久不更新了,有個新的叫django-celery-base,有時間再看
還有個重要的地方!!!
還有個重要的地方!!!
還有個重要的地方!!!
忘記了python幾之后async成了內置關鍵詞了
而celery版本比較早,所以和這個async關鍵詞沖突了,如果不能升/降級python版本的話
建議修改celery中的這個async關鍵詞(個人推薦這種方法,升/降python版本太麻煩,保不齊還有別的不能用的)
File "/home/lice/.local/lib/python3.8/site-packages/celery/utils/timer2.py", line 19
from kombu.async.timer import Entry, Timer as Schedule, to_timestamp, logger
^
SyntaxError: invalid syntax
#這里一定要看,甭管他彈出多少行錯誤,找準這個async這一行給他改了

不止這一處需要修改
不止這一處需要修改
不止這一處需要修改
就多啟動專案幾次,他報一次改一次,直到改的不報錯就好了(挺扯的= =||)
代碼功能描述
發布celery任務,同時記錄任務id(task.id),在任務還在運行的時候revoke(task.id),進行銷毀,就這

模型檔案

專案根目錄下的celery

我這里用的時rabbitMQ.

redis這樣配一下就好了
下面開始任務和視圖
from .tasks import *
from Dw_test.celery import app # 這里是你的根目錄(setting的那個目錄)
from django.http import HttpResponse
from .models import Tasks
from .tasks import *
def ads(request, ls):
tasks = test_app.apply_async(args=[ls])
# 記住這個tasks,把他的ID記下來,后面撤銷用
print(tasks.id)
Tasks.objects.create(task_id=tasks.id)
# 資料庫記錄操作
return HttpResponse(tasks.id)
def dels(req, ls):
tasks_all = Tasks.objects.get(id=ls)
app.control.revoke(tasks_all.task_id, terminate=True)
# terminate的加上,不加上不會撤銷= =,還可以加個signal='SIGKILL',但是我沒測出來區別
tasks_all.is_delete = True
tasks_all.save()
# 都是我的資料庫記錄操作
return HttpResponse(tasks_all.task_id + " is delete")
views.py
from __future__ import absolute_import
from time import sleep
from billiard.exceptions import Terminated
from celery import shared_task
# 這里推薦使用shared_task,他能把你的任務函式變成全域的共享的,task不能,task還得指定目標celery
@shared_task(throws=(Terminated,)) # 這里這個terminated有用的,下面說
def test_app(x):
print('-----Start-----')
for i in range(x):
sleep(1)
print(i)
print('-----Over-----')
tasks.py(得自己在app下手動建)
from django.urls import path
from .views import *
urlpatterns = [
path('task/<int:ls>', ads),
path('task_del/<int:ls>', dels),
]
這是app下的urls.py(自己手動建)
順利的話就開始啟動celery和專案了
Celery命令:celery -A 專案名 worker -l info
專案啟動命令:python manage.py runserver
celery成功可見↓

下面開始測驗

數越大時間越長,前面寫的,可以改,回傳的這個是任務id

可以看到已成功開始運行了
下面嘗試終止這個任務


不出意外的話,會出現報錯并成功撤銷任務
解決方法就是我上面說的那個任務函式(task.py里的)加個拋出例外

這樣
再試一次

成功停止
針對暫停和開啟功能,可通過把未處理的資料保存的資料庫,開啟的時候從資料庫處理未完成任務
以上!~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/317935.html
標籤:python
