我是 Spark 的新手,我遇到了一個愚蠢的“最佳方法是什么”的問題。基本上,我有一個我想回圈的 map(dict)。在每次迭代期間,我想使用正則運算式搜索 spark 資料框中的一列rlike,并將 dict 的鍵分配給一個新列withColumn
maps = {"groceries": ["hot chocolate", "milk", "sugar", "flour"],
"laundry": ["soap", "detergent", "fabric softener"]
}
資料樣本如下所示
-------------------- -----------
| id|item_bought|
-------------------- -----------
|uiq7Zq52Bww4pZXc3xri| Soap|
|fpJatwxTeObcbuJH25UI| Detergent|
|MdK1q5gBygIGFYyvbz8J| Milk|
-------------------- -----------
我想得到一個看起來像這樣的資料框:
-------------------- ----------- ---------
| id|item_bought| class|
-------------------- ----------- ---------
|uiq7Zq52Bww4pZXc3xri| Soap| Laundry|
|fpJatwxTeObcbuJH25UI| Detergent| Laundry|
|MdK1q5gBygIGFYyvbz8J| Milk|Groceries|
-------------------- ----------- ---------
我有超過 1 億條記錄,我想要一種使用 Spark 最佳實踐(分布式計算)的方法。想到的一種方法是遍歷地圖并使用rlikeorstr.contains進行正則運算式搜索,如下所示:
for key, value in maps.items():
pattern = '|'.join([f'(?i){x}' for x in value]). # ignore case
df.withColumn("class", col("item_bought").rlike(pattern))
但這會回傳true或false用于 rlike 搜索。我想用key值替換真或假。
另外,考慮到我有 100M(最多 150M)條記錄,回圈遍歷地圖是最好的方法嗎?
編輯
如果items_boughtindf有特殊字符(或一些額外的文本)怎么辦?
-------------------- ----------------
| id| item_bought|
-------------------- ----------------
|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg|
|fpJatwxTeObcbuJH25UI|Detergent x.ju2i|
|MdK1q5gBygIGFYyvbz8J| Milk|
-------------------- ----------------
我不想先清理文本,只需根據正則運算式關鍵字搜索分配類
uj5u.com熱心網友回復:
根據您的情況,我會將地圖變成資料框。我假設生成的資料框會相對較小。使用外播加入。這樣做的目的是將小 df 分配給每個作業節點,從而避免洗牌。
#Create df from maps
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought')).withColumn('item_bought',explode('item_bought')).withColumn('item_bought', initcap('item_bought'))
#Broadcast join
df.join(broadcast(df_ref), how='left', on='item_bought').show()
----------- -------------------- ---------
|item_bought| id| class|
----------- -------------------- ---------
| Soap|uiq7Zq52Bww4pZXc3xri| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J|groceries|
----------- -------------------- ---------
按照您的編輯
df_ref = spark.createDataFrame(maps.items(), schema =('class','item_bought1')).withColumn('item_bought1',explode('item_bought1')).withColumn('item_bought1', initcap('item_bought1'))
df.withColumn('item_bought1',regexp_extract('item_bought','^[A-Za-z] ',0)).join(broadcast(df_ref), how='left', on='item_bought1').show()
------------ -------------------- ---------------- ---------
|item_bought1| id| item_bought| class|
------------ -------------------- ---------------- ---------
| Soap|uiq7Zq52Bww4pZXc3xri| Soap| laundry|
| Detergent|fpJatwxTeObcbuJH25UI| Detergent| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
| Soap|uiq7Zq52Bww4pZXc3xri| Soap -&ju10kg| laundry|
| Detergent|fpJatwxTeObcbuJH25UI|Detergent x.ju2i| laundry|
| Milk|MdK1q5gBygIGFYyvbz8J| Milk|groceries|
------------ -------- ------------- - ----------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/447320.html
