我有一個使用Celery的Flask應用程式,當該應用程式在本地運行時,異步處理作業正常。然而,當我試圖測驗(pytest)一個使用Celery任務的路由時,我得到了這樣的錯誤:
app/bp_dir/routes.py:12。in <module>
from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
E AttributeError: 'NoneType' object沒有屬性'task'。
在我測驗時,Celery似乎沒有被正確地旋轉起來。我認為潛在的問題是如何設定 pytest 固定程式,以便對匯入 Celery 任務的路由進行測驗。
app/celery_tasks.py看起來是這樣的:
from app import celeryapp
celery = celeryapp.celery
@celery.task()
def example_task()。
time.sleep(3)
return 1
app/init.py
...
from app import celeryapp
def create_app(config_class=Config)。
app = Flask(__name__)
app.config.from_object(Config)
...
# Celery[/span
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
...
return app
app/celeryapp/init.py
from celery import Celery
celery_task_list = [
'app.celery_tasks',
]
db_session = NoneNone
def create_celery_app(_app=None) 。
""
創建一個新的Celery物件,并將Celery配置與應用程式的配置聯系起來。
在Flask應用程式的背景關系中包裝所有任務。
:param _app: Flask應用程式
:回傳。Celery應用程式
""
from app import db
celery = Celery(_app.import_name,
backend=_app.config['CELERY_BACKEND_URL'] 。
broker=_app.config['CELERY_BROKER_URL']。
include=CELERY_TASK_LIST)
celery.conf.update(_app.config)
always_eager = _app.config['TESTING'] or False[/span
celery.conf.update({'TASK_ALWAYS_EAGER'/span>: always_eager,
'CELERY_RESULT_BACKEND': 'redis'})
TaskBase = celery.Task
class ContextTask(TaskBase)。
abstract = True def __call__(self, *args, **kwargs) 。
if not celery.conf.CELERY_ALWAYS_EAGER:
with _app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
# 特殊的pytest設定。
db.session = db_session
return TaskBase.__call__(self, *args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo)。
""
在每個Celery任務之后,拆解我們的db會話。
FMI: https://gist.github.com/twolfson/a1b329e9353f9b575131
Flask-SQLAlchemy在啟動時使用create_scoped_session,這避免了在每個請求基礎上的任何設定。
每一個請求的基礎上進行設定。這意味著Celery可以從這個初始化中搭便車。
"""
if _app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'/span>]。
if not isinstance(retval, Exception)。
db.session.commit()
# 如果我們不是在一個急切的請求中(即Flask將執行拆分),那么拆分。
if not celery.conf.CELERY_ALWAYS_EAGER:
db.session.remove()
celery.Task = ContextTask
return celery
app/celeryapp/celery_worker.py
from app import celeryapp, create_app
app = create_app()
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
Celery設定的代碼主要是從這個[repo]https://github.com/kwiersma/flask-celery-sqlalchemy中借用的。該 repo 有測驗 [here]https://github.com/kwiersma/flask-celery-sqlalchemy/tree/master/tests,但我真的看不出我錯過了什么。
/testing_fixtures.py
import re
import flask
import pytest
from app import create_app, db
EMAIL = '[email protected]'。
密碼='Password123!'
def get_codes(data_structure)。
return [(str(x)) for x in range(len(data_structure))]
def extract_csrf_token(response: flask.Response) -> str:
# 是否有更好的方法來獲取CSRF令牌?我沒有找到。
return re.search('name="csrf_token" type="hidden" value="([^"]*)"'/span>, str(response.data)).group(1)
@pytest.fixture[/span]。
def flask_app()。
app = create_app()
app.config['SQLALCHEMY_DATABASE_URI'/span>] = "sqlite://"/span>
app.config['TESTING'] = True。
app.config['SECRET_KEY'] = '這對存盤部分至關重要'。
app.testing = True[/span]。
return app
@pytest.fixture
def app_context(flask_app)。
with flask_app.app_context():
yield: yield.
@pytest.fixture[/span]。
def setup_db(app_context)。
db.create_all()
@pytest.fixture: DB.create_all()
def test_client(setup_db, flask_app)。
with flask_app.test_client() as c:
yield c
@pytest.fixture[/span
def registered_user(test_client, flask_app)。
resp = test_client.get("/auth/register") # 獲得csrf_token。
csrf_token = extract_csrf_token(respon)。
# follow_redirect=False很重要--否則你將無法區分成功與否。
# 和錯誤的注冊。
resp = test_client.post('/auth/register',
data={'email': EMAIL, 'password': PASSWORD, 'password2': PASSWORD。
'csrf_token': csrf_token, 'is_test':True}。
follow_redirects=False)
assert resp.status_code == 302, "/auth/register應該在注冊成功后重定向"。
@pytest.fixture。
def logged_client(registered_user,test_client)。
resp = test_client.get("/auth/login") # 獲得csrf_token。
csrf_token = extract_csrf_token(resp)
resp = test_client.post('/auth/login'/span>, data={'email'/span>: EMAIL, 'password'/span>: PASSWORD, 'csrf_token': csrf_token})
assert resp.status_code == 302, "/auth/login應該在登錄成功后重定向到其他頁面"。
yield test_client
resp = test_client.get('/auth/logout')
assert resp.status_code == 302
測驗示例:
from testing_fixtures import *
def test_example_route(logged_client)。
response = logged_client.get('/example_route')
assert response.status_code == 200
示例路由:
from app import db, celery_tasks
...
@example_bp.route('/example_route')/span>
def example_route() 。
return 1
uj5u.com熱心網友回復:
要解開一個有點棘手的結,所以我將提供一個輕微的結構調整。
首先,請注意,你不需要Celery的實體來宣告任務。
import celery
@celery.task
def mytask()。
...
這就足夠了。
現在,考慮在app/__init__.py中創建一個Celery的實體,并將其初始化推遲到create_app()被呼叫。類似于
celery = Celery(__name__)
def create_app(config_class=...) 。
app = Flask(__name__)
app.config.from_object(config_class)
...
celery.config_from_object(config_class)
可能會讓你完全摒棄celeryapp,避免了大部分匯入順序對你造成困擾的機會。
我有一個作業實體,你可以從這里借鑒。 或者看看Flask大型教程中的做法,當它 介紹 Rq,它是 Celery 的替代品。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/329828.html
標籤:
上一篇:如何在haskell中從兩個選項中正確做出隨機選擇?
下一篇:用flask做CRUD的菜鳥錯誤
