考慮一個employees像這樣命名的火花資料框:
---------- -----
| name | age |
---------- -----
| John | 32 |
| Elizabeth| 28 |
| Eric | 41 |
---------- -----
和一個字串陣列state = ["LA", "AZ", "OH"],我想將此陣列附加df為一個新列,以便資料框看起來像:
---------- ----- -------
| name | age | state |
---------- ----- -------
| John | 32 | LA |
| Elizabeth| 28 | AZ |
| Eric | 41 | OH |
---------- ----- -------
我如何在 Scala(或 Java,幾乎相同)中實作這一點?我只看到了如何為網路上的所有行添加相同的值,在這里我希望為每一行添加不同的值。
謝謝 !:)
uj5u.com熱心網友回復:
由于 spark 在分布式模式下運行,您無法在帶有索引的陣列上添加基于列的值。假設火花運行有兩個工人,約翰和伊麗莎白交付給工人一個和埃里克交付給工人乙。事實上,當保存在資料幀中時,它們會分裂。工人不知道John、Elizabeth或Eric的索引是多少。您可以在普通的 java 單個程式中簡單地做您想做的事情。
在您的示例中,您需要將陣列轉換為資料框并用于join基于具有相同值的列合并兩個資料框。但是,您可以使用crossJoin在您的桌子上做笛卡爾積。
Dataset<Row> ndf = df.crossJoin(df2);
如果您只需要在同一資料幀上添加具有常量值的列或基于另一列的值,請使用withColumn如下:
Dataset<Row> ndf = df.withColumn("city",functions.lit(1));
Dataset<Row> ndf = df.withColumn("city",functions.rand());
Dataset<Row> ndf = df.withColumn("city",functions.col("name"));
最后,你可以像這樣使用 Atomic 來得到你想要的。我在火花單模式下測驗它。
public static void main(String[] args) {
System.setProperty("hadoop.home.dir", "H:\\work\\HadoopWinUtils\\");
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
List<String> city_array = Arrays.asList("LA", "AZ", "OH");
// Displays the content of the DataFrame to stdout
df.show();
df = df.withColumn("city",functions.col("name"));
AtomicInteger i= new AtomicInteger();
Dataset<Row> df3 = df.map((MapFunction<Row, Row>) value -> {
return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement()));
//return city_array.get(i.getAndIncrement());
}, RowEncoder.apply(df.schema()));
df3.show();
}
人們是
---- -------
| age| name|
---- -------
|null|Michael|
| 30| Andy|
| 19| Justin|
---- -------
結果是
---- ------- ----
| age| name|city|
---- ------- ----
|null|Michael| LA|
| 30| Andy| AZ|
| 19| Justin| OH|
---- ------- ----
uj5u.com熱心網友回復:
您可以在 pyspark 中嘗試類似的操作。
>>> _TRANSFORMED_DF_SCHEMA = StructType([
... StructField('name', StringType(), False),
... StructField('age', IntegerType(), False),
... StructField('id', IntegerType(), False),
... StructField('state', StringType(), False),
... ])
>>>
>>> state = ['LA', 'AZ', 'OH']
>>> data = (['John', 32], ['Eli', 28], ['Eric', 41])
>>> df = spark.createDataFrame(data, schema=['name', 'age'])
>>> rdd1 = df.rdd.zipWithIndex()
>>> df1 = rdd1.toDF()
>>> df1.show()
---------- ---
| _1| _2|
---------- ---
|[John, 32]| 0|
| [Eli, 28]| 1|
|[Eric, 41]| 2|
---------- ---
>>> df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id'))
>>> df_final.show()
---- --- ---
|name|age| id|
---- --- ---
|John| 32| 0|
| Eli| 28| 1|
|Eric| 41| 2|
---- --- ---
>>> def add_state(row_dict):
... new_dict = dict()
... new_dict['name'] = row_dict['name']
... new_dict['age'] = row_dict['age']
... new_dict['id'] = row_dict['id']
... new_dict['state'] = state[row_dict['id']]
... return new_dict
...
>>> df_rdd = df_final.rdd.map(add_state)
>>> df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA)
>>> df_final.show()
---- --- --- -----
|name|age| id|state|
---- --- --- -----
|John| 32| 0| LA|
| Eli| 28| 1| AZ|
|Eric| 41| 2| OH|
---- --- --- -----
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/331457.html
下一篇:如何處理期貨的尾遞回
