我遇到了以下問題。 我試圖在Apache Beam(Python)上使用來自Google BigQuery的兩個表的INNER JOIN來處理一個特定情況。然而,我還沒有找到一個原生的方法來輕松處理它。
這個查詢輸出我將在Google BigQuery上填寫第三個表,對于這種情況,我真的需要在Google Dataflow上查詢它。第一個表(客戶)的關鍵是 "id "列,第二個表(購買)的關鍵是 "client_id "列。
1.表的例子(考慮'client_table.id = purchase_table.client_id'):
client_table
| id | name | country
|----|-------------|---------|
| 1 | 第一個用戶 | 美國 |
| 2 | 第二個用戶 | 美國
購買_表
| id | client_id | value
|----|-------------|---------|
| 1 | 1 | 15 |
| 2 | 1 | 120 |
| 3 | 2 | 190 |
2.我想開發的代碼(問題在'輸出'的第二行):
2.我想開發的代碼(問題在'輸出'的第二行):
options = {'project': PROJECT。
'runner': RUNNER。
'region': REGION,
'staging_location': 'gs://bucket/temp',
'temp_location': 'gs://bucket/temp',
'plate_location': 'gs://bucket/temp/test_join'}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)
query_results_1 = (
串列
| 'ReadFromBQ_1' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select id as client_id, name from client_table", use_standard_sql=True))
query_results_2 = (
管線
| 'ReadFromBQ_2' >> beam.io.Read(beam.io.ReadFromBigQuery(query="select * from purchase_table", use_standard_sql=True))
output = ( {'query_results_1':query_results_1,'query_results_2':query_results_2}| 'join' > .
| 'join' >> beam.GroupBy('client_id')
| 'writeToBQ' >> beam.io.WriteToBigQuery(
table=TABLE,
dataset=DATASET,
project=PROJECT,
schema=SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
pipeline.run()
3.在SQL中的等效期望輸出:
SELECT a.name, b.value * from client_table as a INNER JOIN purchase_table as b on a.id = b.client_id。
uj5u.com熱心網友回復:
你可以使用CoGroupByKey或側面輸入(作為一個廣播連接),這取決于你的鍵的數量。如果你有幾個鍵,每個鍵有很多元素,我建議使用廣播連接。
你需要做的第一件事是在BQ讀取后給你的PCollections添加一個鍵:
kv_1 = query_results_1 | Map(lambda x: (x["id"], x))
kv_2 = query_results_1 | Map(lambda x: (x["client_id"], x))
然后你可以直接做CoGBK或廣播連接。作為一個例子(因為這將更容易理解),我將使用本節課的代碼Beam College。請注意,在你的例子中,KV的值是一個字典,所以你需要做一些修改。
資料
jobs = [
("John", "Data Scientist"),
("Rebecca", "Full Stack Engineer"),
("John", "資料工程師"),
("Alice", "CEO"),
("Charles", "Web Designer"),
("Ruben", "技術作家")
]
興趣愛好 = [
("John", "Baseball"),
("麗貝卡", "足球"),
("約翰", "鋼琴"),
("Alice", "Photoshop"),
("Charles", "Coding"),
("Rebecca", "Acting"),
("麗貝卡", "閱讀")
]
與CGBK連接
def inner_join(element):
name = element[0].
作業 = element[1]["作業"]
hobbies = element[1]["hobbies"]
joined = [{"name": name,
"作業": 作業。
"obbie": hobbie}。
for jobs in jobs for hobbie in hobbies)
回傳 連接
jobs_create = p | "創建作業" >> Create(jobs)
hobbies_create = p | "Create Hobbies" >> Create(hobbies)
cogbk = {"jobs": jobs_create, "hobbies": hobbies_create} | CoGroupByKey()
join = cogbk | FlatMap(inner_join)
帶有側面輸入的廣播連接
def broadcast_inner_join(element, side_input):
name = element[0]
作業 = element[1]
hobbies = side_input.get(name, [])
joined = [{"name": name,
"作業":作業。
"愛好": hobbie}: hobbie
for hobbie in hobbies]。
返還Join
hobbies_create = (p | "Create Hobbies" >> Create(hobbies)
| beam.GroupByKey()
)
jobs_create = p | "創建作業" >> Create(jobs)
boardcast_join = jobs_create | FlatMap(broadcast_inner_join,
side_input=pvalue.AsDict(hobbies_create))
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/331275.html
標籤:
上一篇:當我更改元素的id時,通過ConstraintSet以編程方式添加障礙不起作用?
下一篇:無法搜刮懸停顯示的隱藏資料
