我試圖在celery中安排一項任務。
主專案目錄下的celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
os.environ.setdefault('DJANGO_SETTINGS_MODULE','example_api.settings')
app = Celery('example_api')
app.config_from_object('django.conf:settings',namespace="CELERY")
app.conf.beat_schedule = {
'add_trades_to_database_periodically': {
'task': 'transactions.tasks.add_trades_to_database',
'schedule': crontab(minute='*/1')。
# 'args': (16,16),
},
}
app.autodiscover_tasks()
該專案有一個名為 transactions 的應用程式。
transactions/tasks.py里面的函式
@task(name="add_trades_to_database"/span>)
def add_trades_to_database() 。
start_date = '20000101' #YYYYDDMM
end_date ='20150101'
url = f'https://api.example.com/trade-retriever-api/v1/fx/trades?fromDate={start_date}& toDate={end_date}'/span>
content = get_json(url)
print(content)
save_data_to_model(content,BulkTrade)
settings.py
""
nordea_api專案的Django設定。
由'django-admin startproject'生成,使用Django 3.2.7。
關于這個檔案的更多資訊,見
https://docs.djangoproject.com/en/3.2/topics/settings/
完整的設定串列和它們的值,見
https://docs.djangoproject.com/en/3.2/ref/settings/
"""
from pathlib import Path
import os
import environ
env = environ.Env()
# 像這樣在專案內部建立路徑。BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
env.read_env(env.str('BASE_DIR', ' .env')
# 快速啟動開發設定 - 不適合生產。
# See https://docs.djangoproject.com/en/3.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'example'。
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
allowed_hosts = []
#應用程式定義
install_apps = []
'django.contrib.admin'。
'django.contrib.auth',
'django.contrib.contenttypes'。
'django.contrib.session'。
'django.contrib. messages'。
'django.contrib.staticfiles'。
'django.contrib.sites'。
'rest_framework'。
'rest_framework.authtoken'。
'rest_auth'。
'rest_auth.registration'。
'allauth'。
'allauth.account'。
'allauth.socialaccount'。
'corsheaders'。
'transactions.apps.TransactionsConfig'。
'django_celery_beat'。
]
# REST_FRAMEWORK = {.
# 'DEFAULT_PERMISSION_CLASSES':[)
# 'rest_framework.permissions.IsAuthenticated',/span>
# ]
# }
CELERY_TIMEZONE = "UTC"/span>
# CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'。
CELERY_TASK_SERIALIZER = 'json'/span>
CELERY_RESULT_SERIALIZER = 'json'/span>
default_authentication_classes = [
'rest_framework.authentication.SessionAuthentication'。
'rest_framework.authentication.TokenAuthentication',
]
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'。
EMAIL_USE_TLS = True
EMAIL_PORT = 587。
EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER = os.environ.get('EMAIL')
EMAIL_HOST_PASSWORD = os.environ.get('EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = os.environ.get('EMAIL')
SITE_ID = 1
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware'。
'django.contrib.session.middleware.SessionMiddleware'。
'corsheaders.middleware.CorsMiddleware'。
'django.middleware.common.CommonMiddleware'。
'django.middleware.csrf.CsrfViewMiddleware'。
'django.contrib.auth.middleware.AuthenticationMiddleware'。
'django.contrib. messages.middleware.MessageMiddleware'。
'django.middleware.clickjacking.XFrameOptionsMiddleware'。
]
ROOT_URLCONF = 'example_api.urls', >。
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth'。
'django.contrib. messages.context_processors.messages'。
],
},
},
]
WSGI_APPLICATION='example_api.wsgi.application'。
#資料庫
# https://docs.djangoproject.com/en/3.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': 'example_transaction',
'USER': 'myUser',
'PASSWORD': os.environ.get('DATABASE_PASSWORD')。
'HOST': 'localhost',
'PORT':''。
}
}
# 密碼驗證# https://docs.djangoproject.com/en/3.2/ref/settings/#auth-password-validators
auth_password_validators = [
{
'NAME'。'django.contrib.auth.password_validation.UserAttributeSimilarityValidator'。
},
{
'NAME'。'django.contrib.auth.password_validation.MinimumLengthValidator'。
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator'。
},
{
'NAME'。'django.contrib.auth.password_validation.NumericPasswordValidator'。
},
]
# Internationalization
# https://docs.djangoproject.com/en/3.2/topics/i18n/
LANGUAGE_CODE = 'en-us'/span>
TIME_ZONE = 'UTC'/span>
USE_I18N = True
USE_L10N = True
USE_TZ = True[/span
# 靜態檔案(CSS、JavaScript、圖片)
# https://docs.djangoproject.com/en/3.2/howto/static-files/
STATIC_URL = '/static/'/span>
# 默認主鍵欄位型別
# https://docs.djangoproject.com/en/3.2/ref/settings/#default-auto-field
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'/span>
我在使用rabbitmq-server做任務佇列。
- 我的rabbitmq-server一直處于活動狀態。
- 其他的 celery 任務作業得非常好。(我試著實作了一個電子郵件功能,使用 celery 作業得很好)。
我啟動了celery worker,并使用
擊敗了celery -A project worker -l info
celery -A project beat -l info
作業者終端出現以下錯誤
訊息體的全部內容是。
'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}' (77b)
回溯(最近一次呼叫)。
檔案 "/home/......./env/lib/python3.8/Site-packages/celery/worker/consumer/consumer.py", 行 581, in on_task_received
策略 = 策略[type_]
關鍵錯誤。'transactions.tasks.add_trades_to_database'
我使用ubuntu.
uj5u.com熱心網友回復:
你已經明確命名了任務 "add_trades_to_database"
@task(name="add_trades_to_database")
def add_trades_to_database() 。
...
然而,你是以"transactions.tasks.add_trades_to_database"
app.conf.beat_schedule = {
'add_trades_to_database_periodically': {
'task': 'transactions.tasks.add_trades_to_database',
'schedule': crontab(minute='*/1')。
},
}
你可以選擇的解決方案:
- 不要明確地為任務設定一個名稱。Celery會根據模塊名稱和函式名稱為其設定一個默認的名字,作為documented。節拍表保持不變(假設
add_trades_to_database位于my_proj/transactions/tasks.py::add_trades_to_database)。@task def add_trades_to_database()。 ... - 或者你可以直接改變節拍時間表來參考明確設定的名稱。
app.conf.beat_schedule = { 'add_trades_to_database_periodically': { 'task': 'add_trades_to_database', 'schedule': crontab(minute='*/1')。 }, }
要強調的是,在這兩個解決方案中只選擇一個,因為同時做這兩個解決方案顯然還是會因為同樣的原因而失敗。
另外,請注意,當使用Django時,慣例是使用裝飾器@shared_task,所以你可能想將你的任務改為:
標籤:from celery import shared_task
@shared_task
def add_trades_to_database() 。
...
