小檔案合并決議
執行代碼:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val t1 = "t1"
val basePath = "file:///tmp/hudi_data/"
val dataGen = new DataGenerator(Array("2020/03/11"))
// 生成隨機資料100條
val updates = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, t1).
// 每次寫入的資料都生成一個新的檔案
option("hoodie.parquet.small.file.limit", "0").
// 每次操作之后都會進行clustering操作
option("hoodie.clustering.inline", "true").
// 每4次提交就做一次clustering操作
option("hoodie.clustering.inline.max.commits", "4").
// 指定生成檔案最大大小
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1073741824").
// 指定小檔案大小限制,當檔案小于該值時,可用于被 clustering 操作
option("hoodie.clustering.plan.strategy.small.file.limit", "629145600").
mode(Append).
save(basePath+t1);
// 創建臨時視圖,查看當前表內資料總個數
spark.read.format("hudi").load(basePath+t1).createOrReplaceTempView("t1_table")
spark.sql("select count(*) from t1_table").show()
以上示例中,指定了進行 clustering 的觸發頻率:每4次提交就觸發一次,并指定了檔案相關大小:生成新檔案的最大大小、小檔案最小大小,
執行步驟:
1、生成資料,插入資料,
查看當前磁盤上的檔案:

查看表內資料個數:

查看 spark-web 上 該 sql 執行讀取的檔案個數:

所以,當前表中共100條資料,磁盤上生成一個資料檔案,在查詢該表資料時,只讀取了一個檔案,
2、重復上面操作兩次,
查看當前磁盤上的檔案:

查看表內資料個數:

查看 spark-web 上 該 sql 執行讀取的檔案個數:

所以,目前為止,我們提交了3次寫操作,每次生成1個資料檔案,共生成了3個資料檔案,當查詢所有的資料時,需要從3個檔案中讀取資料,
3、再進行一次資料插入:
查看當前磁盤上的檔案:

查看表內資料個數:

查看 spark-web 上 該 sql 執行讀取的檔案個數:

結論:
1、配置了hoodie.parquet.small.file.limit之后,每次提交新資料,都會生成一個資料檔案,
2、在 clustering 之前,每次讀取表所有資料的時候,都需要讀取所有檔案,
3、提交第4次資料之后,觸發了 clustering ,生成了一個更大的檔案,此時再讀取所有資料的時候,就只需要讀取合并后的大檔案即可,在.hoodie檔案夾下,也可以看到 replacecommit 的提交:

小檔案合并+sort columns決議
執行代碼:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val t1 = "t1"
val basePath = "file:///tmp/hudi_data/"
val dataGen = new DataGenerator(Array("2020/03/11"))
var a = 0;
for (a <- 1 to 8) {
val updates = convertToStringList(dataGen.generateInserts(10000))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 1));
df.write.format("org.apache.hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, t1).
// 每次寫入的資料都生成一個新的檔案
option("hoodie.parquet.small.file.limit", "0").
// 每次操作之后都會進行clustering操作
option("hoodie.clustering.inline", "true").
// 每4次提交就做一次clustering操作
option("hoodie.clustering.inline.max.commits", "8").
// 指定生成檔案最大大小
option("hoodie.clustering.plan.strategy.target.file.max.bytes", "1400000").
// 指定小檔案大小限制,當檔案小于該值時,可用于被 clustering 操作
option("hoodie.clustering.plan.strategy.small.file.limit", "1400000").
// 指定排序的列
option("hoodie.clustering.plan.strategy.sort.columns", "fare").
mode(Append).
save(basePath+t1);
// 創建臨時視圖,查看當前表內資料總個數
spark.read.format("hudi").load(basePath+t1).createOrReplaceTempView("t1_table")
spark.sql("select count(*) from t1_table where fare > 50").show()
}
執行代碼分析
該代碼比之前代碼修改了幾個地方:
1、增加了for回圈:
因為我們已經知道了在8次提交之后,小檔案會合并大檔案,所以一個for回圈,做8次提交,我們直接看結果就行,
2、增加了 hoodie.clustering.plan.strategy.sort.columns 配置:
這是本次主要的測驗點,該配置可以對指定的列進行排序,
即,當做 clustering 的時候,hudi 會重新讀取所有檔案,并根據指定的列做排序,這樣可以把相關的資料聚集在一起,可以做更好的查詢過濾(后面會演示說明),而我們要做的對比,就是以 fare 為條件查詢資料,觀察在 clustering 前后,hudi 會讀取的檔案個數,
我們想要的結果是,在 clustering 之前,由于沒有根據 fare 對資料任何處理,符合過濾條件的資料會分布在各個檔案,所以會讀取的檔案個數很多,過濾效果差,而在 clustering 之后,會根據 fare 列對資料做重新分布,符合過濾條件的資料較為集中,那么讀取的資料就會比較少,過濾效果較好,
3、修改了 hoodie.clustering.plan.strategy.target.file.max.bytes 和 hoodie.clustering.plan.strategy.small.file.limit
我們想測的是,clustering 前后過濾的效果,所以檔案個數不能夠被改變(否則4個檔案合并成1個檔案后,讀取資料時也只會讀取1個檔案,就看不出來sort是否有效果),所以這里把該值設定成兩個較為近似的值,使其既能夠觸發 clustering,又能夠在 clustering 前后檔案個數相同,
執行結果:
查看當前磁盤檔案:

查看第5次的sql過濾結果:

查看第6次的sql過濾結果:

查看第7次的sql過濾結果:

查看最后一次的sql過濾結果:

結論:
1、在 clustering 之前,過濾 fare 列時,會讀取所有的資料,
比如,在執行第5次過濾時,此時表總共有50000行資料,hudi就會掃描50000行資料;在執行第6次過濾時,此時表總共有60000行資料,hudi就會掃描60000行資料;在執行第7次過濾時,此時表總共有70000行資料,hudi就會掃描70000行資料,
2、在 clustering 之后,資料檔案個數不變的情況下(前后都是8個資料檔案),在第8次過濾時,能夠有效應用sort columns的重排列資料,將本應掃描80000行資料降低到只掃描了50405行資料,過濾效果明顯提升很多!!
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/356698.html
標籤:大數據
上一篇:~/和../之間的區別
