概述
什么是Shuffle?
在講到Spark Shuffle實作機制之前,需要了解下什么是Shuffle,Shuffle按字面意思,也就是洗牌,把牌視作資料,那么洗的程序也就是按照某種規則改變資料的次序,接著發牌員發牌給玩家,發牌的程序就對應著通過網路I/O分發資料,
假設在一場牌局中,有一位發牌員和兩位玩家,一般情況下,發牌員在開始游戲之前,需要將牌隨機打亂,再按序發給兩位玩家,接著兩位玩家開始處理牌,發牌員洗牌、發牌、玩家拿牌這三個程序中,洗牌和拿牌在Spark中分別對應Shuffle Write, Shuffle Read,文章關注的則是Shuffle Write,
Shuffle發生在哪些實體上?
在Spark中,driver是負責切分task并序列化task,協調資源,并調度派發到executor上,driver不負責處理資料,具體資料處理是由executor完成的,也就是說Shuffle是發生在executor上的,回到上邊的牌局,這時候executor兼具發牌員和玩家兩個職能,一般大家和朋友打撲克,總不能有人專門洗牌發牌但不玩吧~在Spark中稍復雜些,并不是一個executor在Shuffle Write,而是每個executor都在做,直到所有executor都完成了Shuffle Write,都通知driver已完成,才會進入到下一個Stage,進行Shuffle Read,
為什么需要Shuffle?
在資料量小時,一般一個單體行程即可完成加工處理,但面對海量資料處理,一臺單體行程是難以勝任的,隨著互聯網發展,許多分布式計算框架被提出,這些框架總的來說,都是在多個分布式行程中處理不同的資料,在對資料處理程序中,時常需要:
- group, join資料:比如根據相同key聚類,每個分布式資料處理行程處理后,將特定key的資料發往特定的分布式行程上進行聚類;
- 資料傾斜時重分布:資料傾斜在少數分布式行程,導致其他行程空跑等待,既是浪費資源,也會影響整體處理效率,因此需要將資料發往其他分布式行程進行處理,
這兩種場景就涉及到如何“洗牌”,將資料按某種規則分布到其他行程中,在Spark中有哪些操作會觸發到Shuffle呢?
有哪些對RDD/DF的操作會觸發到Shuffle呢?
主要是這四類:
- .*ByKey: groupByKey, countByKey, reduceByKey等聚類演算法
- .*By: distributeBy, clusterBy等聚類演算法
- repartition: round robin重分布資料
- join: 可能觸發,當需要連接的資料廣播到各個executor時,就不會觸發shuffle,直接在記憶體中進行join
Spark中對Shuffle實作的演進歷史
這部分我倒是沒有細看,我開始接觸時Spark就已經迭代到3.2的版本了,只是了解到在2.0之前,Shuffle的實作變化很多,主要是為了解決非功能問題,有興趣可以了解一下,對解決日常非功能問題也有一定啟發,在2.0及之后版本,Shuffle Write的實作就已經穩定了,只有以下三種:
- UnsafeShuffleWriter
- BypassMergeSortShuffleWriter
- SortShuffleWriter
那么Spark是如何決定使用哪種實作的呢?
使用各個Shuffle Writer的條件
原始碼分析
在Spark driver構建RDD之間的血緣依賴時,便根據以下條件選擇構建具體的Shuffle依賴:
// SortShuffleManager
override def registerShuffle[K, V, C](
shuffleId: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, dependency)
}
}
接著在getWriter中,根據上個方法確定的handle,選擇對應的shuffle writer:
//
override def getWriter[K, V](
handle: ShuffleHandle,
mapId: Long,
context: TaskContext,
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = {
val mapTaskIds = taskIdMapsForShuffle.computeIfAbsent(
handle.shuffleId, _ => new OpenHashSet[Long](16))
mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }
val env = SparkEnv.get
handle match {
case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
new UnsafeShuffleWriter(
env.blockManager,
context.taskMemoryManager(),
unsafeShuffleHandle,
mapId,
context,
env.conf,
metrics,
shuffleExecutorComponents)
case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
new BypassMergeSortShuffleWriter(
env.blockManager,
bypassMergeSortHandle,
mapId,
env.conf,
metrics,
shuffleExecutorComponents)
case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
new SortShuffleWriter(
shuffleBlockResolver, other, mapId, context, shuffleExecutorComponents)
}
}
由此得到Handle與Shuffle Writer的關系:
| Handle | Writer |
|---|---|
| unsafeShuffleHandle | UnsafeShuffleWriter |
| bypassMergeSortHandle | BypassMergeSortShuffleHandle |
| BaseShuffleHandle | SortShuffleWriter |
需要進一步查看其中SortShuffleWriter#shouldBypassMergeSort, SortShuffleManager#canUseSerializedShuffle的實作,確認使用各個Handle的條件,
對于bypassMergeSortHandle 的使用條件有:
- 需要在父RDD沒有開啟mapSideCombine1
- 磁區數量 <= shuffle.sort.baypass.merge.threshold(默認200)的情況下才適用
// SortShuffleWriter
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
對于unsafeShuffleHandle的使用條件有:
- 父RDD使用的序列化器需要支持重排序序列化物件2
- 父RDD沒有開啟mapSideCombine
- 磁區數量 <= 16777216(一般情況不會出現大于這個數量的磁區,這是個magic number,不必關注為什么是這個值,只需要知道1600萬+的磁區數量本身就不合理)
// SortShuffleManager
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}
題外話,RDD支持的序列化器有兩種:
| 序列化器 | 支持重排序序列化物件 |
|---|---|
| UnsafeRowSerializer | true |
| KryoSerializer | 取決于是否開啟了auto-reset,默認開啟,則true,用戶顯式定義關閉,則false,當關閉時,Kryo可能會存盤重復物件的參考,而不是往序列化流中寫入物件的序列化位元組,這會破壞物件的重排序2 |
小結
由此得到了Handle與Shuffle Writer的關系,在后續篇章中,將會講解三種Shuffle Writer的實作,建議從最簡單的BypassMergeSortShuffleWriter開始讀起,
mapSideCombine: 即在map端對資料進行合并,減少shuffle的資料量,以及減少reducer端處理的資料量, ??
重排序序列化物件:對序列化物件排序的結果,與排序序列化前原物件的結果一致,基于shuffle資料時需要序列化資料物件的背景,這是一種避免排序時反序列化開銷的技術, ?? ??
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423759.html
標籤:其他
