基于3.2版本分支,
BypassMergeSortShuffleWriter
簡化流程圖

示例
也就是說每個ShuffleMapTask都會對應著一個FileSegment,每個FileSegment可視作一個臨時檔案,接著這些FileSegment中對應的檔案又會合并到一份DataFile中,通過IndexFile記錄每個磁區在DataFile中的起始偏移量,

這種Shuffle寫檔案方式避免了大量小檔案給檔案系統造成壓力的情況,先是每個ShuffleMapTask對應一個檔案的方式,再將這些檔案合并到一份資料檔案中,并索引檔案記錄了每個磁區在資料檔案中的偏移量,能夠做到隨機訪問指定RDD磁區的資料,
有興趣可詳讀BypassMergeSortShuffleWriter#write的實作,這里我給出關鍵注釋:
// BypassMergeSortShuffleWriter.java
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
.createMapOutputWriter(shuffleId, mapId, numPartitions);
try {
// 沒有需要寫的記錄,直接提交,結束
if (!records.hasNext()) {
partitionLengths = mapOutputWriter.commitAllPartitions(
ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
// 為每個RDD磁區創建臨時的ShuffleBlock,它包含blockId和一個檔案句柄,檔案名即blockId
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
// 為每個RDD磁區都創建一個writer
// getDiskWriter即新建writer
DiskBlockObjectWriter writer =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
if (partitionChecksums.length > 0) {
writer.setChecksum(partitionChecksums[i]);
}
partitionWriters[i] = writer;
}
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
// 將每條記錄計算出所屬的RDD磁區,并呼叫對應磁區的writer
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
try (DiskBlockObjectWriter writer = partitionWriters[i]) {
// 提交并獲取對應的FileSegment
// 提交實際上就是強制flush,并記錄將當前寫入前位置和寫入長度記錄到FileSegment中
partitionWriterSegments[i] = writer.commitAndGet();
}
}
// 這里將所有上邊FileSegment中的資料檔案都合并到一份檔案中
// 為合并后的檔案生成一個索引檔案,該檔案記錄了每個磁區在合并后檔案的起始偏移量
partitionLengths = writePartitionedData(mapOutputWriter);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(), partitionLengths, mapId);
} catch (Exception e) {
// ...
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423709.html
標籤:其他
