我有一個像這樣的Spark資料框架...
| ID | A | B | C | D | |
|---|---|---|---|---|---|
| id1 | 1 | 0 | 0 | id2 | 1 |
我想有一個新的資料框架,它是基于這個邏輯的......
我想有一個新的資料框架,它基于這樣的邏輯。
| ID | NewColumn |
|---|---|
| id1 | A,D |
id3
id4
我的努力:
A) 對于第一步,我想我將把整數轉換為列的名稱......。 因此,它將看起來像這樣......
ID
A
B
C
D
我正在嘗試使用UDF,但它沒有作業......
def CountSelect(colname, x)。
if x>0 :
return colname
else:
countUDF = UserDefinedFunction(CountSelect, T.StringType())
cols = inoutDF.columns
cols.remove("ID")
intermediateDF = inputDF.select("ID", *(countDF(c, col(c)).alias(c) for c in cols)
但它不作業...
你們誰能幫幫我?
B) 然后我將對所有的列使用字串連接函式
這部分應該比較容易,但如果你能把這兩個邏輯合并成一個更簡單的作業代碼,我真的會很感謝你。非常感謝
uj5u.com熱心網友回復:
我們的想法是在各列中標記出正數的行,并回傳各列的值。
你可以使用reduce來標記列并創建一個新的DataFrame,最后使用concat_ws來形成需要的值
@anky提供的一個更簡潔的解決方案
簡潔的解決方案 -sparkDF.withColumn("GreaterThanZero",F.concat_ws(" ,",*[F. when(F.col(col)>0,col) for col in to_concat] )
.select("id","GreaterThanZero") .show()
--- ---------------
| id|GreaterThanZero||。
--- ---------------
|id1| A,D|
|id2| B,D|
|id3| A,B,C|
|id4|A,D|
--- ---------------
資料準備
input_str = ""
id1 1 0 0 2
id2 0 3 0 1
id3 1 2 5 0
id4 4 0 0 1
"".split()
input_values = list(map(lambdax: x.strip() if x. strip() != 'null' else None, input_str>)
cols = list(map(lambda x: x.strip() if x. strip() != 'null' else None, "ID A B C D".split())
n = len(input_values)
n_cols = 5.
input_list = [tuple(input_values[i: i n_cols]) for i in range(0,n,n_cols)]
sparkDF = sql.createDataFrame(input_list, cols)
sparkDF.show()
--- --- --- --- ---
| id| a| b| c| d| 。
--- --- --- --- ---
|id1| 1| 0| 0| 2|
|id2| 0| 3| 0| 1|
|id3| 1| 2| 5| 0|
|id4| 4| 0| 0| 1|
--- --- --- --- ---
Reduce
to_check = ['id', 'A'/span>,'B'/span>,'C'/span>,'D'/span>]
sparkDF_marked = reduce(lambda df
, x: df.withColumn(x,F.when(F.col(x) > 0 ,x).otherwise(None)
if x != 'id' else df.withColumn(x,F.col(x) )
,to_check, sparkDF
)
sparkDF_marked.show()
--- ---- ---- ---- ----
| id| A| B| C| D||
--- ---- ---- ---- ----
|id1|A|null|null|D|
|id2|null|B|null|D|
|id3|A|B|C|無
|id4| A|null|null| D|
--- ---- ---- ---- ----
Concat
to_concat = ['A'/span>,'B'/span>,'C'/span>,'D'/span>]
sparkDF_marked.select(['id',F.concat_ws(',',*to_concat).alias('GreaterThanZero')].show()
--- ---------------
| id|GreaterThanZero||。
--- ---------------
|id1| A,D|
|id2| B,D|
|id3| A,B,C|
|id4|A,D|
--- ---------------
這個解決方案雖然可行,但有一些細微的差別,你需要注意,特別是reduce代碼段和to_check以及to_concat。
to_check可以很容易地被替換為 - sparkDF.columns的實際資料,但請讓我知道更大的資料集的性能。
uj5u.com熱心網友回復:
我已經用UDF解決了這個問題,讓我也把它貼在這里。
BTW:我做了一點修改,沒有把最后一列做成用", "隔開的字串,而是創建了一個串列,這對我的專案的下一步作業更有幫助。
from pyspark.sql.function import lit, col, UserDefinedFunction, array
import pyspark.sql.type as T
def MakeOne(colname, x)。
return colname if x> 0 else None >。
makeOneUDF = UserDefinedFunction(MakeOne, T.StringType())
cols = inputDF.columns
cols.remove("ID")
def MakeList(arr)。
return [a for a in arr if a is not None]
makeListUDF = UserDefinedFunction(MakeList, T.ArrayType(T.StringType()))
outputDF = (inputDF.select("ID", *(makeOneUDF(lit(c), col(c)).alias(c) for c in cols) )。 withColumn("NewColumn", makeListUDF(array(*cols) )).select("ID", "NewColumn")
同樣,NewColumn的型別是陣列型別或字串型別,它存盤了列名的串列。
| ID | NewColumn|
|------|----------|
| id1 | [A,D] |
| id2 | [B,D] |
| id3 | [A,B,C] |
| id4 | [A,D] |
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/328337.html
標籤:
上一篇:solidity資料型別(三)msg tx hash kill payable new
下一篇:如何解決從S3讀取時的火花錯誤
