我正在構建一個 Dataflow 管道,該管道從我的 Cloud Storage 存盤桶中讀取一個 CSV 檔案(包含 250,000 行),修改每一行的值,然后將修改后的內容寫入同一個存盤桶中的新 CSV。使用下面的代碼,我可以讀取和修改原始檔案的內容,但是當我嘗試在 GCS 中寫入新檔案的內容時,出現以下錯誤:
google.api_core.exceptions.TooManyRequests: 429 POST https://storage.googleapis.com/upload/storage/v1/b/my-bucket/o?uploadType=multipart: {
"error": {
"code": 429,
"message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
"errors": [
{
"message": "The rate of change requests to the object my-bucket/product-codes/URL_test_codes.csv exceeds the rate limit. Please reduce the rate of create, update, and delete requests.",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
]
}
}
: ('Request failed with status code', 429, 'Expected one of', <HTTPStatus.OK: 200>) [while running 'Store Output File']
我在資料流中的代碼:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import traceback
import sys
import pandas as pd
from cryptography.fernet import Fernet
import google.auth
from google.cloud import storage
fernet_secret = 'aD4t9MlsHLdHyuFKhoyhy9_eLKDfe8eyVSD3tu8KzoP='
bucket = 'my-bucket'
inputFile = f'gs://{bucket}/product-codes/test_codes.csv'
outputFile = 'product-codes/URL_test_codes.csv'
#Pipeline Logic
def product_codes_pipeline(project, env, region='us-central1'):
options = PipelineOptions(
streaming=False,
project=project,
region=region,
staging_location="gs://my-bucket-dataflows/Templates/staging",
temp_location="gs://my-bucket-dataflows/Templates/temp",
template_location="gs://my-bucket-dataflows/Templates/Generate_Product_Codes.py",
subnetwork='https://www.googleapis.com/compute/v1/projects/{}/regions/us-central1/subnetworks/{}-private'.format(project, env)
)
# Transform function
def genURLs(code):
f = Fernet(fernet_secret)
encoded = code.encode()
encrypted = f.encrypt(encoded)
decrypted = f.decrypt(encrypted.decode().encode())
decoded = decrypted.decode()
if code != decoded:
print(f'Error: Code {code} and decoded code {decoded} do not match')
sys.exit(1)
url = 'https://some-url.com/redeem/product-code=' encrypted.decode()
return url
class WriteCSVFIle(beam.DoFn):
def __init__(self, bucket_name):
self.bucket_name = bucket_name
def start_bundle(self):
self.client = storage.Client()
def process(self, urls):
df = pd.DataFrame([urls], columns=['URL'])
bucket = self.client.get_bucket(self.bucket_name)
bucket.blob(f'{outputFile}').upload_from_string(df.to_csv(index=False), 'text/csv')
# End function
p = beam.Pipeline(options=options)
(p | 'Read Input CSV' >> beam.io.ReadFromText(inputFile, skip_header_lines=1)
| 'Map Codes' >> beam.Map(genURLs)
| 'Store Output File' >> beam.ParDo(WriteCSVFIle(bucket)))
p.run()
該代碼URL_test_codes.csv在我的存盤桶中生成,但該檔案僅包含一行(不包括“URL”標頭),這告訴我我的代碼在處理每一行時正在寫入/覆寫檔案。有沒有辦法批量寫入整個檔案的內容,而不是發出一系列更新檔案的請求?我是 Python/Dataflow 的新手,因此非常感謝任何幫助。
uj5u.com熱心網友回復:
讓我們指出問題:明顯的一個是來自 GCS 方面的配額問題,反映在“429”錯誤代碼上。但正如您所指出的,這源于固有問題,這與您嘗試將資料寫入 blob 的方式更相關。
由于 Beam 管道會生成元素的并行集合,因此當您將元素添加到 PCollection 時,將為每個元素執行每個管道步驟,換句話說,您的 ParDo 函式將嘗試向您的輸出檔案中的每個元素寫入一次內容收集。
因此,您的 WriteCSVFIle 函式存在一些問題。例如,為了將您的 PCollection 寫入 GCS,最好使用專注于撰寫整個 PCollection 的單獨管道任務,如下所示:
首先,您可以匯入 Apache Beam 中已包含的此函式:
from apache_beam.io import WriteToText
然后,您在管道的末尾使用它:
| 'Write PCollection to Bucket' >> WriteToText('gs://{0}/{1}'.format(bucket_name, outputFile))
使用此選項,您無需創建存盤客戶端或參考 blob,該函式只需要接收 GCS URI,它將寫入最終結果,您可以根據檔案中的引數進行調整.
有了這個,您只需要處理在您的 WriteCSVFIle 函式中創建的資料框。每個管道步驟都會創建一個新的 PCollection,因此如果 Dataframe-creator 函式應該從 URL 的 PCollection 接收一個元素,那么根據您當前的邏輯,從 Dataframe 函式產生的新 PCollection 元素每個 url 將具有 1 個資料幀,但因為它看起來考慮到“URL”是資料框中唯一的列,您只想從 genURLs 中寫入結果,也許直接從 genURLs 到 WriteToText 可以輸出您要查找的內容。
無論哪種方式,您都可以相應地調整您的管道,但至少通過 WriteToText 轉換,它會負責將您的整個最終 PCollection 寫入您的 Cloud Storage 存盤桶。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/410619.html
標籤:
