我正在嘗試通過 kinesis 傳輸流將 Punchh webhook JSON 資料流式傳輸到多個 S3 存盤桶,但它給了我這個錯誤。但是,當我流式傳輸 Iterable webhook 資料時,Iterable S3 存盤桶已成功填充來自 Iterable wehbook 發送的可交付流中的資料。
這是錯誤:
呼叫 PutRecord 操作時發生錯誤 (ValidationException):檢測到 1 個驗證錯誤:“deliveryStreamName”處的值“ data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder ”未能滿足約束:成員必須滿足正則運算式模式:[a- zA-Z0-9_.-]
注意:上面提到的錯誤的S3前綴值-粗體-類似于我已經成功測驗過的Iterable prefixes。
這是導致錯誤的流之一的 Punchh S3 前綴:
資料/著陸/供應商/punchh/punchh_to_dd/ex-points-reminder/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH }/動態的
這是 Punch S3 流目的地(注 X 用作占位符以隱藏不相關的真實地址):
arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/lou-punchh-ex-points-reminder-events
這是代理(后端)Lambda 的 Python 代碼(Punchh as Iterable 的類似代碼)。我有幾個動態 S3 存盤桶與它們各自的交付流配對:
import boto3
import json
import base64
import os
client = boto3.client('firehose', region_name= 'us-west-2')
delivery_stream_lou = {}
def lambda_handler(event, context):
try:
print(event)
print('<< EVENT BODY >>',event['event_name'])
try:
print('<< EVENT NAME >> ',event['event_name'])
eventName = event['event_name']
eventType = event['event_type']
if 'users' == eventName.lower():
#ADD firehose KINESIS for USERS
print('USERS FOUND')
delivery_stream_lou = os.environ['DSTREAMUSERS']
elif 'loyalty' in eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('LOYALTY FOUND')
delivery_stream_lou = os.environ['DSTREAMLOYALTY']
elif 'gift' == eventType.lower():
#ADD firehose KINESIS for LOYALTY
print('GIFT FOUND')
delivery_stream_lou = os.environ['DSTREAMGIFT']
elif 'redemptions' == eventName.lower():
#ADD firehose KINESIS for REDEMPTIONS
print('REDEMPTIONS FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEMPTIONS']
elif 'rewards' == eventName.lower():
#ADD firehose KINESIS for REWARDS
print('REWARDS FOUND')
delivery_stream_lou = os.environ['DSTREAMREWARDS']
elif 'redeemables' == eventName.lower():
#ADD firehose KINESIS for REDEEMABLES
print('REDEEMABLES FOUND')
delivery_stream_lou = os.environ['DSTREAMREDEEMABLES']
elif 'points_expiry_reminder' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT REMINDER FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTSTREMINDER']
print('<< DELIVERY STREAM LOU >> ', delivery_stream_lou)
elif 'points_expiry' == eventType.lower():
#ADD firehose KINESIS for EXPIRY PT REMINDER
print('EXPIRY PT FOUND')
delivery_stream_lou = os.environ['DSTREAMEXPOINTS']
elif 'signup_campaign' == eventType.lower():
#ADD firehose KINESIS for SIGNUP CAMPAIGN
print('SIGNUP FOUND')
delivery_stream_lou = os.environ['DSTREAMSIGNUP']
else:
print('<< ERROR - NO VALID EVENT NAME FOUND !! >>')
if delivery_stream_lou !='' and delivery_stream_lou is not None and delivery_stream_lou != 'null':
jsonData = json.dumps(event) '\n'
print('<< JSON DATA >> ',jsonData)
response = client.put_record(DeliveryStreamName=delivery_stream_lou, Record={'Data': jsonData})
print(response)
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"success"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed"})
}
except Exception as e:
print(str(e))
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({"message":"process failed 2"})
}
注意:除了標識不同 buck 前綴的不同 os.environment 變數之外,此 Punchh lambda 的代碼與 Iterable Lambda 的代碼幾乎相同。
謝謝你的幫助。
uj5u.com熱心網友回復:
根據錯誤訊息,python 代碼正在將data/landing/vendor/punchh/punchh_to_dd/ex_points_reminder值傳遞給請求中的deliveryStreamName引數。PutRecord我不認為那是您的交付流的名稱。檢查填充deliveryStreamName引數的環境變數的值
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/431198.html
