我正在將 Apache Beam 與 Python 一起使用,并且我有一個 ppl 檔案,它看起來像這樣:
FileExample.ppl:
{"name":"Julio"}
{"name":"Angel", "Age":35}
{"name":"Maria","cellphone":NULL}
{"name":NULL,"cellphone":"3451-14-12"}
等等...
我不需要為每一行而是為每個json拆分檔案(在真實檔案中,jsons不僅是一行,而且是多行且未定義的行數)。
然后我需要驗證每個 json 的內容(因為在檔案中有 6 種型別的 json,所有鍵都有值的,沒有的,等等)。之后,我需要為每種型別的 json 設定不同的 pcollection。我正在考慮使用 beam.flatmap() 來實作這最后一步,但首先我需要有這樣的東西:
jsons = pipeline | "splitElements" >> ReadFromText(file)
在此先感謝您,請記住,我是新手。
uj5u.com熱心網友回復:
ReadFromText總是一次讀取一行文本檔案;如果您的 json 物件跨行拆分,您將不得不進行不同型別的讀取。
一種選擇是在 DoFn 中完整讀取每個檔案,例如
with beam.Pipeline() as p:
readable_files = (
p
| beam.Create([...set of files to read...]). # or use fileio.MatchAll
| fileio.ReadMatches)
file_contents = paths | beam.ParDo(ReadFileDoFn())
在哪里ReadFileDoFn可以使用相同的底層庫ReadFromText,例如
class ReadFileDoFn(beam.DoFn):
def process(self, readable_file):
with readable_file.open() as handle:
yield handle.read()
這將產生一個 PCollection,其元素是每個檔案的全部內容。現在要將您的文本檔案拆分為單個 json 物件,您可以執行以下操作
def text_blob_to_json_objects(text):
# Turns a concatenated set of objects like '{...} {...}' into
# a single json array '[{...}, {...}]'.
as_json_array = '[%s]' % re.sub(r'}\S*{', '},{', text, re.M)
# Returns the parsed array.
return json.loads(as_json_array)
file_contents | beam.FlatMap(text_blob_to_json_objects)
然后,您可以使用多輸出 DoFn來分離出各種型別。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/431225.html
上一篇:獲取JSON的多個屬性
