本文首發于公眾號:Hunter后端
原文鏈接:celery筆記五之訊息佇列的介紹
前面我們介紹過 task 的處理方式,將 task 發送到佇列 queue,然后 worker 從 queue 中一個個的獲取 task 進行處理,
task 的佇列 queue 可以是多個,處理 task 的 worker 也可以是多個,worker 可以處理任意 queue 的 task,也可以處理指定 queue 的 task,這個我們在介紹 queue 的時候再做介紹,
這一篇我們來介紹一下存盤 task 的佇列 queue,
- 默認佇列 task_default_queue
- 定義佇列
- 將 task 指定到佇列 queue 消費
以下的操作都是在 Django 系統的配置中使用,
1、默認佇列 task_default_queue
當我們運行一個最簡單的延時任務比如 add.delay(1, 2) 時,并沒有設定一個訊息佇列,因為如果我們沒有指定,系統會為我們創建一個默認佇列,
這個默認的佇列被命名為 celery,值在 app.conf.task_default_queue,我們可以查看一下:
from hunter.celery import app
app.conf.task_default_queue
# 輸出為 'celery'
2、定義佇列
我們可以設想一下這個場景,我們只有一個 worker 處理 task,每個 task 需要處理的時間很長,因為 worker 被占用,這樣在我們的任務佇列里就會積壓很多的 task,
有一些需要即時處理的任務則會被推遲處理,這樣的情況下,我們理想的設計是設定多個 worker,多個 worker 分別處理指定佇列的 task,
關于 worker 的設定,比如添加多個 worker,給 worker 消費指定佇列的 task,我們在 worker 的筆記中再介紹,這里我們介紹一下如何定義佇列,
任務佇列的定義如下:
# hunter/celery.py
from kombu import Queue
app.conf.task_queues = (
Queue('blog_tasks', ),
)
當我們定義了任務佇列之后,我們可以將 task 指定輸出到對應的 queue,假設 blog/tasks.py 下有這樣一個 task:
# blog/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
接下來我們呼叫這個 task 的時候,需要指定佇列:
from blog.tasks import add
add.apply_async((1, 2), queue='blog_tasks')
如果我們就這樣配置 celery,這個時候如果我們直接再呼叫 delay() 函式,也就是不指定 queue 的話,會發現我們發出的 task 是不能被 worker 處理的,
也就是說,下面的操作是不起作用的:
from blog.tasks import add
add.delay(1, 2) # 此時,我們的呼叫不會被佇列接收到
如果需要在呼叫 task 的時候不指定佇列,使用系統默認的佇列,這個時候我們需要額外來指定一個 task_default_queue,celery 的配置如下:
# hunter/celery.py
app.conf.task_queues = (
Queue('blog_tasks'),
Queue('default_queue'),
)
app.conf.task_default_queue = 'default_queue'
這樣,我們在使用延時任務的時候,就不需要指定 queue 引數了,都會走我們的默認 task 佇列:
from blog.tasks import add
add.delay(1, 2) # 佇列會被 default_queue 接收到
而如果我們想實作 add 的延時任務走的是 blog_tasks 這個佇列,但是我們在呼叫的時候不想那么麻煩每次都指定 queue 引數,這個就需要用到 task_routes 配置項了,
3、將 task 指定到佇列 queue 消費
如果我們想某些函式使用指定的 queue,我們可以使用 task_routes 配置項來操作,
現在我們有兩個 application,blog 和 polls,這兩個 application 下都有各自的 tasks,檔案的內容如下:
# blog/tasks.py
from celery import shared_task
@shared_task
def add(x, y):
return x + y
@shared_task
def minus(x, y):
return x - y
# polls/tasks.py
from celery import shared_task
@shared_task
def multi(x, y):
return x * y
我們想要實作的最終的目的是在呼叫延時任務的時候,可以直接使用 delay() 的方式,不需要使用 apply_async(queue='xx'),
我們想要實作的功能是,polls/tasks.py 下的所有的延時任務以及 blog/tasks.py 下的 add() 函式進入 queue_1 佇列
blog 下的 minus() 函式進入 queue_2 佇列
其他所有的 task 都走默認的佇列,default_queue,
我們可以如下配置:
app.conf.task_queues = (
Queue('queue_1'),
Queue('queue_2'),
Queue('default_queue'),
)
app.conf.task_routes = {
'polls.tasks.*': {
'queue': 'queue_1',
},
'blog.tasks.add': {
'queue': 'queue_1',
},
'blog.tasks.minus': {
'queue': 'queue_2',
},
}
app.conf.task_default_queue = 'default_queue'
如果想獲取更多后端相關文章,可掃碼關注閱讀:

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555737.html
標籤:Python
下一篇:返回列表
