我正在嘗試創建一個函式,該函式接受列名并在處理資料后回傳 col。
我堅持的功能之一如下
如果/輸入中存在
- 拆分給定的輸入
/ - 拆分后選擇第一個元素和最后一個元素
- 如果第一個元素的長度是
3或10然后處理,否則將 col 值設定為 null - 如果最后一個元素的長度是
7或10然后處理,否則將 col 值設定為 null
如果/輸入中不存在
- 從輸入中獲取前 10 個字符
下面是我的功能。對 df 的任何直接處理也有幫助嗎?
def phone_number_processor(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
...
# if input has '/', then
# ?????
# if input doesn't have '/' then
col = F.substring(col, 1, 10) # get first 10 chars
...
return col
樣本輸出:
df.withColumn('PROCESSED_PHONE', phone_number_processor('PHONE')).show()
---------------- ---------------
| PHONE|PROCESSED_PHONE|
---------------- ---------------
| 1234567890| 1234567890| #-> as is
|123/2345/1234567| 1231234567| #-> first and last elements after split with '/'
| 123/1234567| 1231234567| #-> same as above
| 123/12345| null| #-> since length last element after split is != 7
| 1234/1234567| null| #-> since length first element after split is != 3
---------------- ---------------
PS。我嘗試使用 spark 函式 - contains,split但是我無法做我想做的事。我已經為此作業了很長一段時間,任何輸入/建議也受到贊賞。
uj5u.com熱心網友回復:
當您實際上可以僅使用 Spark 內置函式執行相同操作時,無需定義 UDF 函式。只需拆分列PHONE,然后在結果陣列的第一個和最后一個元素上使用一些when運算式即可獲得所需的輸出,如下所示:
from pyspark.sql import functions as F
df = spark.createDataFrame([("1234567890",), ("123/2345/1234567",), ("123/1234567",), ("123/12345",), ("1234/1234567",)], ["PHONE"])
df1 = df.withColumn("split", F.split("PHONE", "/")) \
.withColumn("first_part", F.element_at("split", 1)) \
.withColumn("last_part", F.element_at("split", -1)) \
.withColumn(
"PROCESSED_PHONE",
F.when(
F.size("split") == 1,
F.substring("first_part", 0, 10)
).otherwise(
F.concat(
F.when(F.length("first_part") == 3, F.col("first_part")),
F.when(F.length("last_part") == 7, F.col("last_part"))
)
)
).drop("first_part", "last_part", "split")
df1.show()
# ---------------- ---------------
#| PHONE|PROCESSED_PHONE|
# ---------------- ---------------
#| 1234567890| 1234567890|
#|123/2345/1234567| 1231234567|
#| 123/1234567| 1231234567|
#| 123/12345| null|
#| 1234/1234567| null|
# ---------------- ---------------
uj5u.com熱心網友回復:
根據@blackbishop 的回答,如果您有興趣,可以創建該功能
def phone_number_validator(col):
if isinstance(col, str):
col = F.col(col)
remove_unnecessary_chars = "[^0-9/]"
col = F.regexp_replace(col, remove_unnecessary_chars, '')
col = F.when(F.length(col) <= 10, '').otherwise(col) # ignore if length less than 10
split_col = F.split(col, '/')
first_part= F.element_at(split_col, 1)
last_part = F.element_at(split_col, -1)
col = F.when(
F.size(split_col) == 1 & F.length(col) >= 10,
F.substring(col, 1, 10) # get first 10 chars
).otherwise(
F.concat(
F.when(F.length(first_part) == 3, first_part),
F.when(F.length(last_part ) == 7, last_part ),
) # concatenate any string with a NULL, will result in NULL
)
return col
用法:
df = df.withColumn('PROCESSED_PHONE', phone_number_validator('PHONE'))
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/421903.html
標籤:
下一篇:PySpark找不到Kafka源
