一、簡介
作為分布式任務平臺的基本框架,Celery 是由 Python 語言開發的,Celery 本身不是任務佇列,是管理分布式任務佇列的工具,它封裝了操作常見任務佇列的各種操作,我們使用它可以快速進行任務佇列的使用與管理,本文主要說明如何更規范的配置和管理任務,
更詳細的技術檔案,請訪問官網 http://www.celeryproject.org/,
二、一些概念
在使用 Celery 之前請務必理解以下概念:
a. Celery Beat: 任務調度器,Beat 行程會讀取組態檔的內容,周期性的將配置中到期需要執行的任務發送給任務佇列
b. Celery Worker: 執行任務的消費者,通常會在多臺服務器運行多個消費者來提高運行效率,
c. Broker: 訊息代理,也是任務佇列本身(通常是訊息佇列或者資料庫),通常稱為訊息中間件,接收任務生產者發送過來的任務訊息,存進佇列再按序分發給任務消費方,
d. Producer: 任務生產者,呼叫 Celery API 的函式或者裝飾器而產生任務并交給任務佇列處理的都是任務生產者,
三、配置項說明
Celery 是通過組態檔中的配置項來定制任務的,
CELERY_IMPORTS: 配置匯入哥哥任務的代碼模塊
CELERY_QUEUES: 定義任務執行的各個任務佇列(如按照執行時間分slow、fast等),默認有一個佇列,暫稱為一般任務佇列,
CELERY_ROUTES: 配置各個任務分配到不同的任務佇列
CELERY_SCHEDULE: 配置各個任務執行的時機引數
CELERY_TIMEZONE: 設定時區
CELERY_ENABLE_UTC: 是否啟動時區設定,默認值是True
CELERY_CONCURRENCY: 并發的worker數量
CELERY_PREFETCH_MULTIPLIER: 每次去訊息佇列讀取任務的數量,默認值是4
CELERY_MAX_TASKS_PRE_CHILD: 每個worker執行多少次任務后會死掉
BROKER_URL: 使用redis作為任務佇列
CELERY_TASK_RESULT_EXPIRES: 任務執行結果的超時時間
CELERY_TASK_TIME_LIMIT: 單個任務運行的時間限制,超時會被殺死,不建議使用該引數,而用CELERY_TASK_SOFT_TIME_LIMIT
CELERY_RESULT_TACKEND: 使用redis存盤執行結果
CELERY_TASK_SERIALIZER: 任務序列化方式
CELERY_RESULT_SERIALIZER: 任務執行結果序列化方式
CELERY_DISABLE_RATE_LIMITS: 關閉執行限速
四、實用中的目錄結構
background
|---celery_task
|----daemons
|----day
|----hour
|----minute
|----week
|----test
|----test.py
|----CeleryConfig.py # 組態檔
|----__init__.py # 定義celery任務的名稱及組態檔位置
五、簡要配置
__init__.py內容:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 # by MR 4 5 from celery import Celery 6 7 app = Celery("MyCelery") 8 app.config_from_object("background.celery_task.CeleryConfig")
CeleryConfig.py 內容:
1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 4 from celery.schedulers import crontab 5 from kombu import Exchange, Queue 6 7 BROKER_URL = "redis://127.0.0.1:6379/10" 8 CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/11" 9 CELERY_TASK_SERIALIZER = "json" 10 CELERY_RESULT_SERIALIZER = "json" 11 CELERY_TSKK_RESULT_EXPIRES = 60 * 60 * 24 * 3 12 CELERY_ACCEPT_CONTENT = ["json"] 13 CELERY_TIMEZONE = "Asia/Shanghai" 14 15 CELERY_CONCURRENCY = 20 16 CELERY_MAX_TASK_PER_CHILD = 100 17 CELERY_TASK_SOFT_TIME_LIMIT = 300 18 CELERY_DISABLE_RATE_LIMITS = True 19 20 21 CELERY_IMPORTS = ( 22 "background.celery_task.test.TestTask", 23 ) 24 25 CELERY_QUEUES = ( 26 Queue('task_slow', exchange=Exchange('task_slow'), routing_key='task_slow'), 27 ) 28 29 CELERY_ROUTES = { 30 'background.celery_task.test.TestTask.test': {'queue': 'task_slow, 'routing_key': 'task_slow'}, 31 } 32 33 34 CELERYBEAT_SCHEDULE = { 35 '1_minute_test_task_test': { 36 'task': 'background.celery_task.test.TestTask.test', 37 'schedule': crontab(minute='*/1') 38 'args': (), 39 } 40 }
test.py的內容:
#!/usr/bin/python # -*- coding: utf-8 -*- import random
from background.celery_task import app app.task(soft_time_limit=300) def test(): a = random.randint(1, 10) b = random.randint(20, 30) print ("%(a)s+%(b)s=%(c)s" % {"a": a, "b":b, "c": a+b}) print 'OK'
六、Celry啟動
安裝一個叫flower的webui,提供任務查詢,worker生命管理,以及路由管理等(底層是通過tornado框架封裝的)
flower -A background.celery_task --loglevel=info --url-prefix=task_manager --basic_auth=celery:celerywd --log_file_prefix=~/MR/celery/flower.log >/dev/null 2>&1 &
celery -A background.celery_task worker -Q celery -f ~/MR/celery/celery.log --loglevel=info --beat >/dev/null 2>&1 &
七、demo地址
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/162027.html
標籤:Python
上一篇:旋轉陣列-python
