我在處理其中包含 GlueJob 任務的 StateMachine (Step Function) 的并發運行時遇到了很多問題。
狀態機由 Lambda 啟動,該 Lambda 由 FIFO SQS 佇列觸發。
lambda 獲取訊息,檢查有多少狀態機實體正在運行,如果此數量低于 GlueJob 并發運行閾值,它會啟動狀態機。
我遇到的問題是這個檢查大部分時間都失敗了。盡管沒有足夠的并發性可供我的 GlueJob 使用,但狀態機仍會啟動。顯然,SQS 佇列傳遞給 lambda 的訊息會得到處理,因此如果狀態機因此而失敗,則該訊息將永遠消失(除非我捕獲例外并將新訊息發送回佇列)。
我相信這種行為是由于我的 lambda 處理訊息的速度(盡管它是一個 FIFO 佇列,所以一次只有 1 條訊息),以及我的檢查器無法跟上的事實。
我在這里和那里實施了一些 time.sleep() 以查看情況是否有所好轉,但沒有實質性改善。
我想問你是否曾經遇到過這樣的問題,以及你是如何以編程方式解決它們的。
提前致謝!
這是我的檢查器:
def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):
if next_token:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
else:
response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')
cnt = len(response['executions'])
if cnt > 0 and 'nextToken' in response:
return get_running_workflows(cnt, response['nextToken'])
return cnt
uj5u.com熱心網友回復:
您將遇到這種方法的問題,因為啟動新流程的呼叫可能不會立即導致list_executions()顯示新號碼。在請求新的作業流開始和作業流實際開始之間可能有幾秒鐘的時間。據我所知, API 呼叫沒有強一致性保證。list_executions()
您需要具有強一致性的東西,而DynamoDB 原子計數器是解決此問題的絕佳解決方案。亞馬遜發布了一篇博文,詳細介紹了 DynamoDB 在這個確切場景中的使用。要點是您將嘗試在 DynamoDB 中增加一個原子計數器,limit如果它會導致計數器超過某個值,則該運算式會導致增量失敗。捕獲該故障/例外是您的 Lambda 函式知道將訊息發送回佇列的方式。然后在作業流結束時呼叫另一個 Lambda 函式來遞減計數器。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/420360.html
標籤:
上一篇:通過組件URL訪問門戶時,Angular重定向到403
下一篇:無法應用yum更新或安裝任何東西
