我有一個列RESULT,每個列都有長度為 11 的數字,并且相同的模式是:
RESULT: string (nullable = true)

現在,我想執行以下操作并更新一個新列,最后將添加一個額外的數字。下面顯示的示例是第一個數字03600024145
注意:我不想將表的格式更改為 Pandas,而是在 Pyspark 資料框中執行所有操作。
- 將奇數位數相加:0 6 0 2 1 5 = 14。
- 將結果乘以 3:14 × 3 = 42。
- 將偶數位相加:3 0 0 4 4 = 11。
- 將兩個結果相加:42 11 = 53。
- 計算校驗位,取(53 / 10)的余數,也稱為(53 modulo 10),如果不是0,則減去10,因此校驗位值為7。即(53 / 10 ) = 5 余數 3; 10 - 3 = 7。
- 最后加上這個校驗位。所以數字變成
036000241457
因此,如果將此邏輯應用于整個列,結果將變為 UPDATED RESULT

進一步澄清邏輯:https : //en.wikipedia.org/wiki/Check_digit#UPC
有一個類似的 python 代碼,但在第 5 步有點不同:python: create check digit function
uj5u.com熱心網友回復:
我們可以將邏輯轉換為 Spark 函式。
- 首先在不同位置提取數字并將它們轉換為整數。
- 然后分別對奇數和偶數位置求和。
- 將奇數乘以 3 并加上偶數和。
- 應用模運算。
- 將步驟4的結果減去10,然后應用模10,以模擬步驟4的結果為0時校驗位為0的行為。
- 最后,將
RESULT列與check digit.
作業示例
import pyspark.sql.functions as F
from pyspark.sql import Column
from typing import List
df = spark.createDataFrame([("03600024145",), ("01010101010",)], ("RESULT",))
def sum_digits(c: Column, pos: List[int]):
sum_col = F.lit(0)
for p in pos:
sum_col = sum_col F.substring(c, p, 1).cast("int")
return sum_col
def check_digit(c: Column) -> Column:
odd_sum = sum_digits(c, [1, 3, 5, 7, 9, 11])
even_sum = sum_digits(c, [2, 4, 6, 8, 10])
sum_result = (3 * odd_sum) even_sum
modulo = sum_result % 10
return (10 - modulo) % 10
df.withColumn("UPDATED_RESULT", F.concat(F.col("RESULT"), check_digit(F.col("RESULT")))).show()
輸出
----------- --------------
| RESULT|UPDATED_RESULT|
----------- --------------
|03600024145| 036000241457|
|01010101010| 010101010105|
----------- --------------
uj5u.com熱心網友回復:
使用用戶定義函式 (udf) 的解決方案。
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
df = spark.createDataFrame([('03600024145',), ('01010101010',)], ['RESULT'])
@udf(StringType())
def add_check_digit(val):
odd = sum(int(i) for i in val[::2])
even = sum(int(i) for i in val[1::2])
check_val = (odd * 3 even) % 10
return val str((10 - check_val) % 10)
df = df.withColumn('UPDATED_RESULT', add_check_digit(col('RESULT')))
df.show()
----------- --------------
| RESULT|UPDATED_RESULT|
----------- --------------
|03600024145| 036000241457|
|01010101010| 010101010105|
----------- --------------
uj5u.com熱心網友回復:
RESULT與使用一些高階函式相比,您可以將列拆分為一個數字陣列,transform并且 aggregate可以計算checkdigit連接到原始字串的值:
import pyspark.sql.functions as F
df1 = df.withColumn(
"digits",
F.expr("slice(split(RESULT, ''), 1, size(split(RESULT, '')) - 1)")
).withColumn(
"digits",
F.expr("transform(digits, (x, i) -> struct(int(x) as d, i 1 as i))")
).withColumn(
"odd_even",
F.expr(
"""aggregate(digits,
array(0, 0),
(acc, x) ->
IF (x.i%2 = 1,
array(acc[0] x.d, acc[1]),
array(acc[0], acc[1] x.d)
)
)""")
).withColumn(
"UPDATED RESULT",
F.concat(F.col("RESULT"), 10 - ((F.col("odd_even")[0] * 3 F.col("odd_even")[1]) % 10))
).select(
"RESULT", "UPDATED RESULT"
)
df1.show(truncate=False)
# ----------- --------------
#|RESULT |UPDATED RESULT|
# ----------- --------------
#|03600024145|036000241457 |
#|01010101010|010101010105 |
# ----------- --------------
說明:
- 第 1 步:拆分列并對結果陣列進行切片以洗掉最后一個空值。然后通過添加其索引來轉換陣列的每個元素。(示例
0 -> struct(0, 1)) - 第2步:使用我們在第一步中添加的索引,使用聚合、求和偶數和奇數位置數字
- 步驟3:計算校驗位并將其與結果列連接
您可以顯示所有中間列以了解邏輯。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/360776.html
