我在 Heroku 上部署了一個 Flask 應用程式。我使用 Celery Redis 通過 Stripe 處理付款。當我處理 Stripe 事件時,我立即遇到了最大記憶體問題 (R14)。我有 1 個工人 dyno
- 我使用 Stripe 結賬
- 我的端點按預期接收事件
- 端點發送要異步處理的事件
celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key]) - 我可以看到正在發生的事件,但記憶體已滿:
2021-11-12T00:49:07.801006 00:00 app[worker.1]: redis.exceptions.ConnectionError: max number of clients reached
2021-11-12T00:49:18.170270 00:00 heroku[worker.1]: Process running mem=549M(107.4%)
2021-11-12T00:49:18.224263 00:00 heroku[worker.1]: Error R14 (Memory quota exceeded)
我想知道是不是我的 Celery 任務太占用記憶體了。如果這是真的,我不知道如何解決它。盡管可以優化代碼,但我只用 1 次結帳對其進行了測驗-肯定 Celery Redis 設定可以處理少量(約 10 次)同時結帳嗎?我還可以限制發送到端點的事件,但我不確定這是否是擴大規模的最佳方式。
端點代碼:
@payments_bp.route('/wh', methods=['POST'])
def wh():
# This endpoint receives all webhooks
stripe_api_key = current_app.config['STRIPE_SECRET_KEY']
# Payloads should be small (>1MB)
if request.content_length > (1024**2):
abort(400)
payload = request.get_data()
sig_header = request.environ.get('HTTP_STRIPE_SIGNATURE')
endpoint_secret = current_app.config['STRIPE_ENDPOINT_SECRET']
event = None
try:
event = stripe.Webhook.construct_event(
payload, sig_header, endpoint_secret
)
celery_tasks.process_webhook_event.apply_async(args=[event, stripe_api_key])
except ValueError as e:
return {}, 400
except stripe.error.SignatureVerificationError as e:
return {}, 400
return 'Success', 200
celery 任務的代碼:
@celery_app.task
def process_webhook_event(event, stripe_api_key):
stripe.api_key = stripe_api_key
if 'customer' in event['type']:
customer_id = event['data']['object']['id']
else:
customer_id = event['data']['object']['customer']
all_events_dict = json.loads(str(stripe.Event.list(related_object=customer_id)))
# types=['customer.subscription.created','subscription.updated', 'charge.succeeded']
events_to_process = []
for event in all_events_dict['data']:
event_id = event['id']
# This ensures we don't process events multiple times
if event_id not in events_to_process and not StripeEvent.query.filter_by(stripe_event_id=event_id).first():
event_type = event['type']
event_created = event['created']
stripe_event = StripeEvent(stripe_event_id=event_id,
event_type=event_type,
event_created=event_created,
event_json=str(event))
try:
db.session.add(stripe_event)
db.session.commit()
events_to_process.append(event_id)
except:
pass
customer_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.created' and item['id'] in events_to_process), None)
subscription_created = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.created'
and item['id'] in events_to_process and not Subscription.query.filter_by(id=item['data']['object']['id']).first()), None)
subscription_updated = next((item for item in all_events_dict['data'] if item['type'] == 'customer.subscription.updated' and item['id'] in events_to_process), None)
invoice_created = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.created'
and item['id'] in events_to_process and not Invoice.query.filter_by(id=item['data']['object']['id']).first()), None)
invoice_updated = next((item for item in all_events_dict['data'] if item['type'] == 'invoice.updated' and item['id'] in events_to_process), None)
charge_succeeded = next((item for item in all_events_dict['data'] if item['type'] == 'charge.succeeded'), None)
if customer_created:
customer_id = customer_created['data']['object']['id']
customer_email = customer_created['data']['object']['email']
user = User.query.filter_by(email=customer_email).first()
# TODO: add catch here
if not user:
pass
if not user.stripe_customer_id:
user.add_stripe_customer_id(customer_id)
db.session.commit()
if subscription_created:
subscription_id = subscription_created['data']['object']['id']
current_subscription = Subscription.query.filter_by(id=subscription_id).first()
if not current_subscription:
subscription = Subscription()
subscription.add_subscription(subscription_created)
db.session.add(subscription)
db.session.commit()
else:
current_subscription.update_subscription(subscription_created)
db.session.commit()
if subscription_updated:
subscription_id = subscription_updated['data']['object']['id']
event_created = subscription_updated['created']
current_subscription = Subscription.query.filter_by(id=subscription_id).first()
if current_subscription and event_created > current_subscription.last_updated:
current_subscription.update_subscription(subscription_updated)
db.session.commit()
if invoice_created:
invoice_id = invoice_created['data']['object']['id']
current_invoice = Invoice.query.filter_by(id=invoice_id).first()
if not current_invoice:
invoice = Invoice()
invoice.add_invoice(invoice_created)
db.session.add(invoice)
db.session.commit()
if invoice_updated:
invoice_id = invoice_updated['data']['object']['id']
current_invoice = Invoice.query.filter_by(id=invoice_id).first()
if current_invoice:
current_invoice.update_invoice(invoice_updated)
else:
invoice = Invoice()
invoice.add_invoice(invoice_updated)
db.session.commit()
if charge_succeeded:
payment_id = charge_succeeded['data']['object']['id']
invoice_id = charge_succeeded['data']['object']['invoice']
if not Payment.query.filter_by(id=payment_id).first():
payment = Payment()
payment.add_payment(charge_succeeded)
invoice = Invoice.query.filter_by(id=invoice_id).first()
try:
invoice.pay_invoice(charge_succeeded)
except AttributeError:
print('Invoice not found')
db.session.add(payment)
db.session.commit()
user = User.query.filter_by(stripe_customer_id=customer_id).first()
subscription = Subscription.query.filter_by(stripe_customer_id=customer_id).first()
if user and subscription:
if subscription.status in ['Active', 'Trial']:
user.membership_type = 'Premium'
else:
user.membership_type = 'Free'
db.session.commit()
return None
非常感謝任何幫助,謝謝
uj5u.com熱心網友回復:
解決了這個問題。設計全錯了:
沒有必要處理一堆事件。你實際上只需要一些基本的(customer.subscription.created、customer.subscription.updated)
其他值應通過 API 請求訪問。訂閱物件不附帶客戶電子郵件,您需要將您的站點用戶與訂閱相關聯。這可以通過以下方式輕松訪問:
customer_id = event['data']['object']['customer']
customer = stripe.Customer.retrieve(
customer_id,
api_key=stripe_api_key
)
#User is my DB model
user = User.query.filter_by(email=customer.email).first()
通過 API 請求創建和更新其他資料庫資訊可能比嘗試傳遞事件更容易。
新的 celery 任務如下所示:
@celery_app.task()
def activate_subscription(customer_id, stripe_api_key):
customer = stripe.Customer.retrieve(
customer_id,
api_key=stripe_api_key
)
user = User.query.filter_by(email=customer.email).first()
user.membership_type = 'Premium'
db.session.commit()
return None
端點僅傳遞客戶 ID 而不是完整事件:
try:
event = stripe.Webhook.construct_event(
payload, sig_header, endpoint_secret
)
event_type = event['type']
celery_tasks.activate_subscription.apply_async(args=[customer_id, stripe_api_key])
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/361268.html
