原文地址https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-index/
介紹
SQL Server的Bulk load默認為串行,這意味著例如,一個BULK INSERT陳述句將生成一個執行緒將資料插入表中,但是,對于并發負載,您可以使用多個批量插入陳述句插入同一張表,前提是需要閱讀多個檔案,
考慮要求所在的情景:
- 從大檔案加載資料(比如,超過 20 GB)
- 拆分檔案不是一個選項,因為它將是整個大容量負載操作中的一個額外步驟,
- 每個傳入的資料檔案大小不同,因此很難識別大塊數(將檔案拆分為)并動態定義為每個大塊執行的批量插入陳述句,
- 要加載的多個檔案跨越多個 GB(例如超過 20 GB 及以上),每個GB 包含數百萬條記錄,
在這種情況下,使用 Apache Spark是并行批量資料加載到 SQL 表的流行方法之一,
在本文中,我們使用 Azure Databricks spark engine使用單個輸入檔案將資料以并行流(多個執行緒將資料加載到表中)插入 SQL Server,目標表可能是Heap、Clustered Index或Clustered Columnstore Index,本文旨在展示如何利用Spark提供的高度分布式框架,在加載到 SQL Server或 Azure SQL的聚集列存盤索引表之前仔細對資料磁區,
本文中分享的最有趣的觀察是展示使用Spark默認配置時列存盤表的行組質量降低,以及如何通過高效使用Spark磁區來提高質量,從本質上講,提高行組質量是決定查詢性能的重要因素,
環境設定
資料集:
- 單張表的一個自定義資料集,一個 27 GB 的 CSV 檔案,110 M 記錄,共 36 列,其中列的型別有int, nvarchar, datetime等,
資料庫:
- Azure SQL Database – Business Critical, Gen5 80vCores
ELT 平臺:
- Azure Databricks – 6.6 (includes Apache Spark 2.4.5, Scala 2.11)
- Standard_DS3_v2 14.0 GB Memory, 4 Cores, 0.75 DBU (8 Worker Nodes Max)
存盤:
- Azure Data Lake Storage Gen2
先決條件:
在進一步瀏覽本文之前,請花一些時間了解此處將資料加載到聚集列存盤表中的概述:Data Loading performance considerations with Clustered Columnstore indexes
在此測驗中,資料從位于 Azure Data Lake Storage Gen 2的 CSV 檔案中加載,CSV 檔案大小為 27 GB,有 110 M 記錄,有 36 列,這是一個帶有隨機資料的自定義資料集,
批量加載或預處理(ELT\ETL)的典型架構看起來與下圖相似:

使用BULK INSERTS
在第一次測驗中,單個BULK INSERT用于將資料加載到帶有聚集列存盤索引的 Azure SQL 表中,這里沒有意外,根據所使用的 BATCHSIZE,它花了 30 多分鐘才完成,請記住,BULK INSERT是一個單一的執行緒操作,因此單個流會讀取并將其寫入表中,從而降低負載吞吐量,


使用Azure Databricks
為了實作寫入到 SQL Server和讀取ADLS (Azure Data Lake Storage) Gen 2的最大并發性和高吞吐量,Azure Databricks 被選為平臺的選擇,盡管我們還有其他選擇,即 Azure Data Factory或其他基于Spark引擎的平臺,
使用Azure Databricks加載資料的優點是 Spark 引擎通過專用的 Spark API并行讀取輸入檔案,這些 API將使用一定數量的磁區,這些磁區映射到單個或多個輸入檔案,映射是在檔案的一部分或整個檔案上完成的,資料讀入Spark DataFrame or, DataSet or RDD (Resilient Distributed Dataset) ,在這種情況下,資料被加載到DataFrame中,然后進行轉換(設定與目標表匹配的DataFrame schema),然后資料準備寫入 SQL 表,
要將DataFrame中的資料寫入 SQL Server中,必須使用Microsoft's Apache Spark SQL Connector,這是一個高性能的連接器,使您能夠在大資料分析中使用事務資料,和持久化結果用于即席查詢或報告,連接器允許您使用任何 SQL Server(本地資料庫或云中)作為 Spark 作業的輸入資料源或輸出目標,
GitHub repo: Fast Data Loading in Azure SQL DB using Azure Databricks
請注意,目標表具有聚集列存盤索引,以實作高負載吞吐量,但是,您也可以將資料加載到Heap,這也將提供良好的負載性能,對于本文的相關性,我們只討論加載到列存盤表,我們使用不同的 BATCHSIZE 值將資料加載到Clustered Columnstore Index中 -請參閱此檔案,了解 BATCHSIZE 在批量加載到聚集列存盤索引表期間的影響,
以下是Clustered Columnstore Index上的資料加載測驗運行,BATCHSIZE為 102400 和 1048576:

請注意,我們正在使用 Azure Databricks使用的默認并行和磁區,并將資料直接推至 SQL Server聚集列存盤索引表,我們沒有調整 Azure Databricks使用的任何默認配置,無論所定義的批次大小,我們所有的測驗都大致在同一時間完成,
將資料加載到 SQL 中的 32 個并發執行緒是由于上述已提供的資料磚群集的大小,該集群最多有 8 個節點,每個節點有 4 個內核,即 8*4 = 32 個內核,最多可運行 32 個并發執行緒,
查看行組(Row Groups)
有關我們使用 BATCHSIZE 1048576 插入資料的表格,以下是在 SQL 中創建的行組數:
行組總數:
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 216
行組的質量:
SELECT * FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576')

在這種情況下,我們只有一個delta store在OPEN狀態 (total_rows = 3810) 和 215 行組處于壓縮狀態, 這是有道理的, 因為如果插入的批次大小是>102400 行, 資料不再delta store存盤, 而是直接插入一個壓縮行組的列存盤,在這種情況下,壓縮狀態中的所有行組都有 >102400 條記錄,現在,有關行組的問題是:
為什么我們有216行組?
為什么當我們的BatchSize設定為 1048576 時,每個行組的行數不同?
請注意,每個行組的資料大約等于上述結果集中的 500,000 條記錄,
這兩個問題的答案是 Azure Databricks Spark引擎對資料磁區控制了寫入聚集列存盤索引表行組的資料行數,讓我們來看看 Azure Databricks為有關資料集創建的磁區數:
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216
因此,我們為資料集創建了 216 個磁區,請記住,這些是磁區的默認數,每個磁區都有大約 500000 條記錄,
# Number of records in each partition from pyspark.sql.functions import spark_partition_id df_gl.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show(10000)

將Spark磁區中的記錄數與行組中的記錄數進行比較,您就會發現它們是相等的,甚至磁區數也等于行組數,因此,從某種意義上說,1048576 的 BATCHSIZE 正被每個磁區中的行數過度拉大,
sqldbconnection = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbconn") sqldbuser = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbuser") sqldbpwd = dbutils.secrets.get(scope = "sqldb-secrets", key = "sqldbpwd") servername = "jdbc:sqlserver://" + sqldbconnection url = servername + ";" + "database_name=" + <Your Database Name> + ";" table_name = "<Your Table Name>" # Write data to SQL table with BatchSize 1048576 df_gl.write \ .format("com.microsoft.sqlserver.jdbc.spark") \ .mode("overwrite") \ .option("url", url) \ .option("dbtable", table_name) \ .option("user", sqldbuser) \ .option("password", sqldbpwd) \ .option("schemaCheckEnabled", False) \ .option("BatchSize", 1048576) \ .option("truncate", True) \ .save()
行組質量
行組質量由行組數和每個行組記錄決定,由于聚集列存盤索引通過掃描單行組的列段掃描表,則最大化每個行組中的行數可增強查詢性能,當行組具有大量行數時,資料壓碩訓改善,這意味著從磁盤中讀取的資料更少,為了獲得最佳的查詢性能,目標是最大限度地提高聚集列索引中每個行組的行數,行組最多可有 1048576 行,但是,需要注意的是,由于聚集列索引,行組必須至少有 102400 行才能實作性能提升,此外,請記住,行組的最大大小(100萬)可能在每一個情況下都達到,檔案行組大小不只是最大限制的一個因素,但受到以下因素的影響,
- 字典大小限制,即 16 MB
- 插入指定的批次大小
- 表的磁區方案,因為行組不跨磁區
- 記憶體壓力導致行組被修剪
- 索引重組,重建
話雖如此,現在一個重要的考慮是讓行組大小盡可能接近 100 萬條記錄,在此測驗中,由于每個行組的大小接近 500000 條記錄,我們有兩個選項可以達到約 100 萬條記錄的大小:
- 在Spark中,更改磁區數,使每個磁區盡可能接近 1048576 條記錄,
- 保持Spark磁區(默認值),一旦資料加載到表中,就運行 ALTER INDEX REORG,將多個壓縮行組組合成一組,
選項#1很容易在Python或Scala代碼中實作,該代碼將在Azure Databricks上運行,負載相當低,
選項#2是資料加載后需要采取的額外步驟,當然,這將消耗 SQL 上的額外 CPU ,并增加整個加載程序所需的時間,
為了保持本文的相關性,讓我們來討論更多關于Spark磁區,以及如何從其默認值及其在下一節的影響中更改它,
Spark Partitioning
Spark 引擎最典型的輸入源是一組檔案,這些檔案通過將每個節點上的適當磁區劃分為一個或多個 Spark API來讀取這些檔案,這是 Spark 的自動磁區,將用戶從確定磁區數量的憂慮中抽象出來,如果用戶想挑戰,就需控制磁區的配置,根據環境和環境設定計算的磁區的默認數通常適用于大多數情況下,但是,在某些情況下,更好地了解磁區是如何自動計算的,如果需要,用戶可以更改磁區計數,從而在性能上產生明顯差異,
注意:大型Spark群集可以生成大量并行執行緒,這可能導致 Azure SQL DB 上的記憶體授予爭議,由于記憶體超時,您必須留意這種可能性,以避免提前修剪,請參閱本文以了解更多詳細資訊,了解表的模式和行數等也可能對記憶體授予產生影響,
spark.sql.files.maxPartitionBytes是控制磁區大小的重要引數,默認設定為128 MB,它可以調整以控制磁區大小,因此也會更改由此產生的磁區數,
spark.default.parallelism這相當于worker nodes核心的總數,
最后,我們有coalesce()和repartition(),可用于增加/減少磁區數,甚至在資料已被讀入Spark,
只有當您想要減少磁區數時,才能使用coalesce() ,因為它不涉及資料的重排,請考慮此data frame的磁區數為 16,并且您希望將其增加到 32,因此您決定運行以下命令,
df = df.coalesce(32) print(df.rdd.getNumPartitions())
但是,磁區數量不會增加到 32 個,并且將保持在 16 個,因為coalesce()不涉及資料重排,這是一個性能優化的實作,因為無需昂貴的資料重排即可減少磁區,
如果您想將上述示例的磁區數減少到 8,則會獲得預期的結果,
df = df.coalesce(8) print(df.rdd.getNumPartitions())
這將合并資料并產生 8 個磁區,
repartition() 是另一個幫助調整磁區的函式,對于同一示例,您可以使用以下命令將資料放入 32 個磁區,
df = df.repartition(32) print(df.rdd.getNumPartitions())
最后,還有其他功能可以改變磁區數,其中是groupBy(), groupByKey(), reduceByKey() 和 join(),當在 DataFrame 上呼叫這些功能時,會導致跨機器或通常跨執行器對資料進行重排,最終在默認情況下將資料重新劃分為 200 個磁區,此默認 數字可以使用spark.sql.shuffle.partitions配置進行控制,
資料加載
現在,了解磁區在 Spark 中的作業原理以及如何更改磁區,是時候實施這些學習了,在上述實驗中,磁區數為 216(默認情況下),這是因為檔案的大小為 27 GB,因此將 27 GB 除以 128 MB(默認情況下由 Spark 定義的最大磁區位元組)提供了216 個磁區,
Spark重新磁區的影響
對 PySpark 代碼的更改是重新磁區資料并確保每個磁區現在有 1048576 行或接近它,為此,首先在DataFrame中獲取記錄數量,然后將其除以 1048576,此劃分的結果將是用于加載資料的磁區數,假設磁區數為n,但是,可能有一些磁區現在有 >=1048576 行,因此,為了確保每個磁區都<=1048576行,我們將磁區數作為n+1,使用n+1在磁區結果為 0 的情況下也很重要,在這種情況下,您將有一個磁區,
由于資料已加載到DataFrame中,而 Spark 默認已創建磁區,我們現在必須再次重新磁區資料,磁區數等于n+1,
# Get the number of partitions before re-partitioning print(df_gl.rdd.getNumPartitions()) 216 # Get the number of rows of DataFrame and get the number of partitions to be used. rows = df_gl.count() n_partitions = rows//1048576
# Re-Partition the DataFrame df_gl_repartitioned = df_gl.repartition(n_partitions+1) # Get the number of partitions after re-partitioning print(df_gl_repartitioned.rdd.getNumPartitions()) 105 # Get the partition id and count of partitions df_gl_repartitioned.withColumn("partitionId",spark_partition_id()).groupBy("partitionId").count().show(10000)

因此,在重新劃分磁區后,磁區數量從216 個減少到 105 (n+1),因此每個磁區現在都有接近1048576行,
此時,讓我們將資料再次寫入 SQL 表中,并驗證行組質量,這一次,每個行組的行數將接近每個磁區中的行數(略低于 1048576),讓我們看看下面:
重新磁區后的行組
SELECT COUNT(1) FROM sys.dm_db_column_store_row_group_physical_stats WHERE object_id = OBJECT_ID('largetable110M_1048576') 105
重新磁區后的行組質量

從本質上講,這次整體資料加載比之前慢了 2 秒,但行組的質量要好得多,行組數量減少到一半,行組幾乎已填滿到最大容量,請注意,由于DataFrame的重新劃分,將消耗額外的時間,這取決于資料幀的大小和磁區數,
請注意,您不會總是獲得每row_group 100 萬條記錄,它將取決于資料型別、列數等,以及之前討論的因素-請參閱sys.dm_db_column_store_row_group_physical_stats
關鍵點
- 建議在將資料批量加載到 SQL Server時使用BatchSize(無論是 CCI 還是Heap),但是,如果 Azure Databricks 或任何其他 Spark 引擎用于加載資料,則資料磁區在確定聚集列存盤索引中的行組質量方面起著重要作用,
- 使用BULK INSERT命令加載資料將遵守命令中提到的BATCHSIZE,除非其他因素影響插入行組的行數,
- Spark 中的資料磁區不應基于某些亂數,最好動態識別磁區數,并將n+1 用作磁區數,
- 由于聚集列存盤索引通過掃描單行組的列段掃描表,則最大化每個行組中的記錄數可增強查詢性能,為了獲得最佳的查詢性能,目標是最大限度地提高聚集列存盤索引中每個行組的行數,
- Azure Databricks的資料加載速度在很大程度上取決于選擇的集群型別及其配置,此外,請注意,到目前為止,Azure Databricks連接器僅支持Apache Spark 2.4.5,微軟已經發布了對Spark 3.0的支持,它目前在預覽版中,我們建議您在開發測驗環境中徹底測驗此連接器,
- 根據data frame的大小、列數、資料型別等,進行重新劃分的時間會有所不同,因此您必須從端端角度考慮這次對整體資料加載的考慮,
Azure Data Factory
這是一篇非常好的資料ETL文章,Spark和SQL Server列存盤表功能的組合,
Azure Data Factory是當前最成熟,功能最強大的ETL/ELT資料集成服務,其架構就是使用Spark作為計算引擎,

https://github.com/mrpaulandrew/A-Day-Full-of-Azure-Data-Factory
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/265829.html
標籤:其他
上一篇:SQL語言的奇怪想法
