celery是什么?
我的理解比較簡單,它是一個「任務佇列」,我主要拿他來做兩件事情:
1.處理異步任務
2.處理定時任務
一個簡單任務
安裝相應的pip包
pip install celery[redis]
準備專案檔案
專案檔案結構如下:
.
├── caller.py
├── tasks.py
tasks.py中存放任務函式:
import time
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
time.sleep(30)
print(f'add end, res:{x+y}')
return x + y
caller.py中存放呼叫函式:
from tasks import add
for i in range(0, 20):
add.delay(4, 4)
運行celery
在當前目錄下運行一個worker:
celery -A tasks worker --loglevel=info
如何查看任務狀態
一些原理
此處使用redis做celery的訊息佇列(broker),函式觸發一次任務呼叫時,發送訊息給redis,woker從redis中獲取任務進行處理,如果任務過多,worker處理不過來,worker不會領取任務,任務會先存放在redis中,
默認運行的worker啟動的行程數等于cpu核數(引),例如我這里是4,也就是說此處的celery一次只能處理4個任務,如果一次有多個任務發送過來,有的任務就需要排隊,此處將add函式的處理時間模擬為30s,
celery有一個「prefetch」的動作,例如上面的例子,雖然這個worker一次只能處理4個任務,但是這個worker除了接收它能處理的4個任務立即去執行,還要再接收一些任務準備運行,它打算再接收多少任務預備著呢?這取決于一個配置引數:worker_prefetch_multiplier,這個引數默認是4(引),那么他會多接收的數量為:worker_prefetch_multiplier * 并發行程數,放在此處就是:4 * 4 = 16,也就是說,如果redis中有很多等待處理的任務,其實worker運行起來會一次拿走4 + 16 = 20個任務,
測驗
查看有多少任務在訊息佇列中
celery在redis中的存放任務的佇列key默認名稱是celery(引),這個key只有當redis中有積壓任務時才會存在,如果它不存在就代表當前訊息佇列中無訊息,
運行caller.py,然后登陸redis查詢:
127.0.0.1:6379> llen celery
(integer) 0
可以看到,盡管celery一次只能處理4個任務,但它把20個任務全領走了,由于我們的任務要30s才能處理完成,立即再運行一次caller.py,然后redis查詢:
127.0.0.1:6379> llen celery
(integer) 20
可以看到,有20個任務還在訊息佇列中等待處理,
查看有多少任務正在運行
celery -A tasks inspect active
查看有多少任務接收了但還未運行
這種任務在celery中叫做reserved task,
celery -A tasks inspect reserved
# 統計個數(數量為下面的結果-1)
celery -A tasks inspect reserved | wc -l
查看worker狀態
運行:
$ celery -A tasks status
[email protected]: OK
1 node online.
可以看到,提示有一個worker(node)是在線的(online),
使用flower在線查看
flower可以實時監控celery的狀態,并且還能修改一些配置(生產環境慎用),(引)
pip install flower
celery -A tasks flower
瀏覽器打開http://localhost:5555即可看到一個現實celery狀態的網頁,
參考
celery官方檔案
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/151102.html
標籤:Python
上一篇:經典問題
