我有一個名為的資料框timeDF,其架構如下:
root
|-- Id: long (nullable = true)
|-- Model: timestamp (nullable = true)
|-- Prevision: timestamp (nullable = true)
我想在年底增加一個新行timeDF通過將2個Calendar物件c1及c2到Timestamp。我知道我可以通過首先將它們轉換為Timestamp這樣來做到這一點:
val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)
但是,我不知道如何將這些變數timeDF作為新行寫入,以及如何讓 spark 增加Id列值?
我應該List使用t1and創建一個物件并t2從這個串列中創建一個臨時資料幀,然后合并兩個資料幀嗎?如果是這樣,我如何管理該Id列?這么簡單的操作是不是太亂了?
有人可以解釋一下嗎?
謝謝。
uj5u.com熱心網友回復:
簡而言之,這是您可以嘗試的解決方案:
- 攝取您的檔案。
- 使用您的資料和
unionByName(). - 更正標識。
- 清理。
創建額外記錄
首先,您從頭開始創建額外的記錄。當您混合多種型別時,我使用了 POJO,這是代碼:
List<ModelPrevisionRecord> data = new ArrayList<>();
ModelPrevisionRecord b = new ModelPrevisionRecord(
-1L,
new Timestamp(System.currentTimeMillis()),
new Timestamp(System.currentTimeMillis()));
data.add(b);
Dataset<ModelPrevisionRecord> ds = spark.createDataset(data,
Encoders.bean(ModelPrevisionRecord.class));
timeDf = timeDf.unionByName(ds.toDF());
ModelPrevisionRecord 是一個非常基本的 POJO:
package net.jgp.labs.spark.l999_scrapbook.l000;
import java.sql.Timestamp;
public class ModelPrevisionRecord {
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Timestamp getModel() {
return model;
}
public void setModel(Timestamp model) {
this.model = model;
}
public Timestamp getPrevision() {
return prevision;
}
public void setPrevision(Timestamp prevision) {
this.prevision = prevision;
}
private long id;
private Timestamp model;
private Timestamp prevision;
public ModelPrevisionRecord(long id, Timestamp model, Timestamp prevision) {
this.id = id;
this.model = model;
this.prevision = prevision;
}
}
更正 ID
id 是 -1,所以 id 是創建一個新列,id2,具有正確的 id:
timeDf = timeDf.withColumn("id2",
when(
col("id").$eq$eq$eq(-1), timeDf.agg(max("id")).head().getLong(0) 1)
.otherwise(col("id")));
清理資料框
最后,清理你的資料框:
timeDf = timeDf.drop("id").withColumnRenamed("id2", "id");
重要筆記
- 此解決方案僅在您一次添加一條記錄時才有效,否則,您最終將擁有相同的 ID。
- 你可以在這里看到整個例子:https : //github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l999_scrapbook/l000,它可能更容易克隆...
uj5u.com熱心網友回復:
如果您的第一個資料框可以按 ID 排序并且您需要逐行添加行,您可以在串列中找到最大 ID:
long max = timeDF.agg(functions.max("Id")).head().getLong(0);
然后通過聯合遞增并將其添加到您的資料幀中。為此,請遵循以下示例,其中年齡可以充當 id。people.json是 spark 示例中的檔案。
Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
df.show();
long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max 1, "test"));
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();
uj5u.com熱心網友回復:
我試過這個,但我不知道為什么,在列印保存的表時,它只保留最后 2 行,所有其他行都被洗掉。
這就是我初始化增量表的方式:
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timestampDF.write.partitionBy(partition_by)
.format(write_format)
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
這就是我向其中添加新專案的方式
def addTimeToData(model: Calendar, target: Calendar): Unit = {
var timeDF = spark.read
.format("delta")
.load("/mnt/path/to/folder")
val modelTS = new Timestamp(model.getTimeInMillis)
val targetTS = new Timestamp(target.getTimeInMillis)
var id: Long = 0
if (!timeDF.head(1).isEmpty) {
id = timeDF.agg(max("Id")).head().getLong(0) 1
}
val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var newTimeDF = spark.createDataFrame(newTime, schema)
val unionTimeDF = timeDF.union(newTimeDF)
timeDF = unionTimeDF
unionTimeDF.show
val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timeDF.write.partitionBy("Model")
.format("delta")
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
}
I'm not very familiar with delta tables so I don't know if I can just use SQL on it to add values like so :
spark.sql("INSERT INTO 'myTable' VALUES (" id ", " modelTS ", " previsionTS ")");
And I don't if just putting the timestamps variable like so will work.
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/345677.html
