PyTorch訓練中Dataset多執行緒加載資料,而不是在DataLoader
背景與需求
現在做深度學習的越來越多人都有用PyTorch,他容易上手,而且API相對TF友好的不要太多,今天就給大家帶來最近PyTorch訓練的一些小小的心得,
大家做機器學習、深度學習都恨不得機器卡越多越好,這樣可以跑得越快,道理好像也很直白,大家都懂,實際上我們在訓練的時候很大一部分制約我們的訓練的速度快慢被IO限制住了,然面CPU的利用率卻不高,就算有8卡了,然而GPU的利用率卻長期處理低水平,不能發揮設備本應該有的水平,所以我一直在想,有什么辦法能加快IO的讀取,當然最直截的就換SSD,那上速度會直接上去了,那如果是我們在服務器或者是普通的電腦就沒有辦法呢嗎?
而且經常用PyTorch的人應該會發現,如果我們把DataLoader的num_workers設定比較大的時候,在訓練啟動時會等待比較久,而且在每一個epoch之間的切換也是需要等挺久的(更換,加載資料),
如果是一個程式員的話,肯定會想到多執行緒、多行程,這是否會能加速我們訓練的IO?答案是肯定的,
今天給大家帶來的就是,多執行緒讀取資料的實體,本次測驗不含訓練部分,只是對Dataset, DataLoader資料加載的部分進行測驗,
PyTorch DataLoader會產生一個index然后Dataset再進行讀取,如果一個batch_size=128的話,那就要產生128次的資料除錯,并讀取,
我的想法就很簡單,我想要不我就直接在Dataset就生成好所需的Batches,這樣在DataLoader的batch_size=1的話,那也是對應一個batch的資料,而我在Dataset的可以用執行緒去加載資料,這樣應該能提高讀取的效率,
有了想法就是干了,
平時我們重要Dataset的結構如下,這里用到了albumentations作為資料處理的庫,而不是torchvision的transforms,其它沒有什么區別的
def default_loader(path):
return Image.open(path).convert('RGB')
class AlbumentationsDatasetList(Dataset):
"""
Data processing using albumentation same as torchvision transforms
"""
def __init__(self, imgs, transform=None, loader=default_loader, percentage=1):
# here can control the dataset size percentage
img_num = int(len(imgs) * percentage)
self.imgs = imgs[:img_num]
self.transform = transform
self.loader = loader
def __getitem__(self, index):
fn = self.imgs[index]
img = self.loader(fn)
if self.transform is not None:
image_np = np.array(img)
augmented = self.transform(image=image_np)
img = augmented['image']
return img
def __len__(self):
return len(self.imgs)
方法的實作
說干就干,把多執行緒加進來進行改造Dataset,下面來看一下代碼,代碼加入了一些細節,所以會比較長,但結構還是跟上面的是一樣的,只是Dataset就已經把batches都處理好了,在加載資料后,是把他們都stack在一起,這樣就可以形成[N, C, W, H]結構的資料了,
注意:如果drop_last=False的話,那么最后的一個batch的數量一般不會與batch_size相同,所以在DataLoader的里batch_size要設定成1,還有DataLoader設定成1后,實際加載的資料是[1, N, C, W, H],所以在用的時候要squeeze一下,
class AlbumentationsDatasetList(Dataset):
def __init__(self,
images,
batch_num,
percentage=1,
transform=None,
multi_load=True,
shuffle=True,
seed=None,
drop_last=False,
num_workers=4,
loader=default_loader) -> None:
#==============================================
# Set seed
#==============================================
if seed is None:
self.seed = np.random.randint(0, 1e-6, 1)
else:
self.seed = seed
random.seed(self.seed)
self.images = images
self.batch_num = batch_num # use batch_num instead of batch_size, same thing
self.percentage = percentage
self.transform = transform
self.multi_load = multi_load
self.shuffle = shuffle
self.drop_last = drop_last
self.num_workers = num_workers # Dataset num_workers
self.loader = loader
self.batches = self._create_batches()
self.batches = self._get_len_batches(self.percentage)
def _get_len_batches(self, percentage):
"""
Description:
- you could control how many batches you want to use for training or validating
indices sort, so that could keep the batches got in order from originla batches
Parameters:
- percentage: float, range [0, 1]
Return
- numpy array of the new bags
"""
batch_num = int(len(self.batches) * percentage)
indices = random.sample(list(range(len(self.batches))), batch_num)
indices.sort()
new_batches = np.array(self.batches, dtype='object')[indices]
return new_batches
def _create_batches(self,):
if self.shuffle:
random.shuffle(self.images)
batches = []
ranges = list(range(0, len(self.images), self.batch_num))
for i in ranges[:-1]:
batch = self.images[i:i + self.batch_num]
batches.append(batch)
#== Drop last ===============================================
last_batch = self.images[ranges[-1]:]
if len(last_batch) == self.batch_num:
batches.append(last_batch)
elif self.drop_last:
pass
else:
batches.append(last_batch)
return batches
def __getitem__(self, index):
batch = self.batches[index]
#== Stack all images, become a 4 dimensional tensor ===============
if self.multi_load:
batch_images = self._multi_loader(batch)
else:
batch_images = []
for image in batch:
img = self._load_transform(image)
batch_images.append(img)
batch_images_tensor = torch.stack(batch_images, dim=0)
return batch_images_tensor
def _load_transform(self, tile):
img = self.loader(tile)
if self.transform is not None:
image_np = np.array(img)
augmented = self.transform(image=image_np)
img = augmented['image']
return img
def _multi_loader(self, tiles):
images = []
executor = ThreadPoolExecutor(max_workers=self.num_workers)
results = executor.map(self._load_transform, tiles)
executor.shutdown()
for result in results:
images.append(result)
return images
def __len__(self):
return len(self.batches)
代碼與資料測驗
接下來就是拿資料進行測驗了,這里還設定了multi_load的引數,這樣我們可以方便控制是否用多執行緒與否,這樣我們就可以對比一下在相同的機器,相同的資料下,多執行緒加載資料是否比單執行緒快,
-
測驗的目的:
- 1,是否多執行緒多單執行緒快;
- 2,多執行緒能比單線路快多少;
- 3,找到這機器最快(或者比較全適)的越引數,可作為其它機器的參考,
-
測驗平臺:Window10
-
CPU:Intel Core i7-9850H @ 2.60GHz
-
RAM: 32 GB
-
測驗的資料:是5000張影像,全部都是3通道RBG,8位的512x512像素影像,影像格式是.PNG,
-
測驗方法:
-
超引數如下:搜索空間為1024
-
multi_loads = [True, False] prefetch_factors = list(range(0, 17, 2))[1:] # [2, 4, 6, 8, 10, 12, 14, 16] dataset_workers = list(range(0, 17, 2))[1:] dataloader_workers = list(range(0, 17, 2))[1:]
-
-
利用grid search方法,每一個搜索空間都對Dataset, DataLoader設定不同的引數,而且每輪資料都是讀完、并處理完5000張影像,drop_last=False
-
資料增強:只做了resize,normalize
-
下面是全部的測驗代碼,
albumentations_valid = album.Compose([
album.Resize(480, 480),
album.Normalize(mean=[0.7347, 0.4894, 0.6820, ], std=[0.1747, 0.2223, 0.1535, ]),
ToTensorV2(),
])
from utils import get_specified_files
path = r"xxxxx"
images = get_specified_files(path, suffixes=[".png"], recursive=True) # glob.glob
images = images[:5000]
print(len(images))
results = []
log_file = open(r"grid_search_log.txt", mode='a', encoding='utf-8')
multi_loads = [True, False]
prefetch_factors = list(range(0, 17, 2))[1:] # [2, 4, 6, 8, 10, 12, 14, 16]
dataset_workers = list(range(0, 17, 2))[1:]
dataloader_workers = list(range(0, 17, 2))[1:]
for multi_load in multi_loads:
for prefetch_factor in prefetch_factors:
for dataset_worker in dataset_workers:
for dataloader_worker in dataloader_workers:
multi_load = multi_load
if multi_load:
prefetch_factor = prefetch_factor
else:
prefetch_factor = prefetch_factor
dataloader_worker = dataloader_worker
train_dataset = AEDataset(images,
batch_num=128,
percentage=1,
transform=albumentations_valid,
multi_load=multi_load,
shuffle=True,
seed=0,
drop_last=False,
num_workers=dataset_worker,
)
train_loader = DataLoader(dataset=train_dataset,
batch_size=1,
shuffle=False,
num_workers=dataloader_worker,
pin_memory=True,
prefetch_factor=prefetch_factor,
persistent_workers=False)
print("Start loading")
start_time = time.time()
for i, (batches) in enumerate(train_loader):
i+1
elapse = time.time() - start_time
print(f"multi_load: {multi_load}, prefetch_factors: {prefetch_factor}, dataset_workers: {dataset_worker}, data_loader_workers: {dataloader_worker}, elapse: {elapse:.4f}")
log_file.write(f"multi_load: {multi_load}, prefetch_factors: {prefetch_factor}, dataset_workers: {dataset_worker}, data_loader_workers: {dataloader_worker}, elapse: {elapse:.4f}\n")
測驗結果
回到我們上面的測驗目標
測驗的目的:
- 1,是否多執行緒多單執行緒快;
- 2,多執行緒能比單執行緒快多少;
- 3,找到這臺機器最快(或者比較全適)的越引數,可作為其它機器的參考,
我們帶著這3個問題,看一下下面的測驗結果:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
path = "C:/Users/jasne/Desktop/grid_search_multi_load.csv"
df = pd.read_csv(path)
df.head()
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 0 | True | 14 | 14 | 2 | 19.9746 |
| 1 | True | 14 | 10 | 2 | 19.9816 |
| 2 | True | 14 | 12 | 2 | 20.0205 |
| 3 | True | 8 | 10 | 2 | 20.0514 |
| 4 | True | 14 | 16 | 2 | 20.0943 |
Max elapse
也是我們平時用的普通load的方法,時間是72.28秒
df[df["elapse"]==df["elapse"].max()]
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 1024 | False | 1 | 1 | 1 | 72.2857 |
Multi Load Max elapse
多執行緒時最慢的時間
multi_load = df[df["multi_load"]==True]
multi_load[multi_load["elapse"]==multi_load["elapse"].max()]
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 1023 | True | 6 | 14 | 16 | 48.3309 |
Min elapse
相差的倍數的計算公式為
(
max
?
min
)
/
min
(\text{max} - \text{min}) / \text{min}
(max?min)/min
時間是19.97秒,比最長的時間少了 52.31秒,快了2.6倍的時間,所以可以看出用multi_load肯定是比single load要快的,
多執行緒的時間,也受prefetch_factors, dataset_workers, dataloader_workers的影響,而且影響還是比較大的,
多執行緒時,最快與最慢的相差1.42倍
df[df["elapse"]==df["elapse"].min()]
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 0 | True | 14 | 14 | 2 | 19.9746 |
下面來看是否 data_loader_workers越大越好?
dataloader_workers = multi_load[(multi_load["prefetch_factors"]==2) & (multi_load["dataset_workers"]==2)]
dataloader_workers.sort_values("data_loader_workers", inplace=True)
dataloader_workers
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 376 | True | 2 | 2 | 2 | 28.6076 |
| 102 | True | 2 | 2 | 4 | 24.4866 |
| 144 | True | 2 | 2 | 6 | 26.3106 |
| 410 | True | 2 | 2 | 8 | 30.3909 |
| 536 | True | 2 | 2 | 10 | 33.2621 |
| 724 | True | 2 | 2 | 12 | 36.9114 |
| 946 | True | 2 | 2 | 14 | 41.3437 |
| 986 | True | 2 | 2 | 16 | 44.4443 |
plt.figure(figsize=(8, 5))
plt.scatter(dataloader_workers["data_loader_workers"], dataloader_workers["elapse"])
plt.show()

從圖上可以看出,dataloader_workers并非越大越好,dataloader_workers=4時是在2-8之間是比較好的選擇,隨著dataloader_workers的增加,所需要的時間也呈線性的增加,
下面來看是否 dataset_workers越大越好
dataset_workers = multi_load[(multi_load["prefetch_factors"]==2) & (multi_load["data_loader_workers"]==2)]
dataset_workers.sort_values("dataset_workers", inplace=True)
dataset_workers
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 376 | True | 2 | 2 | 2 | 28.6076 |
| 75 | True | 2 | 4 | 2 | 23.5092 |
| 52 | True | 2 | 6 | 2 | 22.4270 |
| 49 | True | 2 | 8 | 2 | 22.2465 |
| 26 | True | 2 | 10 | 2 | 21.7578 |
| 37 | True | 2 | 12 | 2 | 22.0112 |
| 46 | True | 2 | 14 | 2 | 22.1947 |
| 35 | True | 2 | 16 | 2 | 21.9832 |
plt.figure(figsize=(8, 5))
plt.scatter(dataset_workers["dataset_workers"], dataset_workers["elapse"])
plt.show()

從圖上可以看出,dataset_workers增加也可以明顯減少資料加載所需要時間,但是當dataset_workers超過10后,不再呈現出減少的趨勢,當達到12、14時有一點點上降,由于測驗平臺有限,這里所應該讓測驗一下dataset_workers達到128或者更高的數之間,是否會達到更少的資料加載時間,
下面來看是否 prefetch_factors越大越好
prefetch_factors = multi_load[(multi_load["dataset_workers"]==2) & (multi_load["data_loader_workers"]==2)]
prefetch_factors.sort_values("prefetch_factors", inplace=True)
prefetch_factors
?
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 376 | True | 2 | 2 | 2 | 28.6076 |
| 289 | True | 4 | 2 | 2 | 27.7318 |
| 309 | True | 6 | 2 | 2 | 28.0899 |
| 141 | True | 8 | 2 | 2 | 26.2518 |
| 378 | True | 10 | 2 | 2 | 28.6515 |
| 332 | True | 12 | 2 | 2 | 28.2445 |
| 135 | True | 14 | 2 | 2 | 26.0284 |
| 134 | True | 16 | 2 | 2 | 26.0025 |
plt.figure(figsize=(8, 5))
plt.scatter(prefetch_factors["prefetch_factors"], prefetch_factors["elapse"])
plt.show()

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-UUp7MHiu-1634438695527)(C:/Users/jasne/Desktop/Untitled/output_18_0.png)]
從圖上可以看出,prefetch_factors似乎好像越大,加載的時間越少,但似乎也相差不多,最多的時間與最小的時間相差也僅為2.6秒,
prefetch_factors的外一個篩選條件
prefetch_factors = multi_load[(multi_load["dataset_workers"]==10) & (multi_load["data_loader_workers"]==4)]
prefetch_factors.sort_values("prefetch_factors", inplace=True)
prefetch_factors
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 70 | True | 2 | 10 | 4 | 23.3808 |
| 103 | True | 4 | 10 | 4 | 24.4975 |
| 108 | True | 6 | 10 | 4 | 24.6660 |
| 53 | True | 8 | 10 | 4 | 22.5058 |
| 90 | True | 10 | 10 | 4 | 24.1555 |
| 92 | True | 12 | 10 | 4 | 24.1825 |
| 39 | True | 14 | 10 | 4 | 22.0710 |
| 120 | True | 16 | 10 | 4 | 25.0829 |
plt.figure(figsize=(8, 5))
plt.scatter(prefetch_factors["prefetch_factors"], prefetch_factors["elapse"])
plt.show()

從圖上可以看出,prefetch_factors數量似乎對加載時間的影響似乎不太明顯,最多的時間與最小的時間相差也僅為2.6秒,
| multi_load | prefetch_factors | dataset_workers | data_loader_workers | elapse | |
|---|---|---|---|---|---|
| 70 | True | 2 | 10 | 4 | 23.3808 |
| 103 | True | 4 | 10 | 4 | 24.4975 |
| 108 | True | 6 | 10 | 4 | 24.6660 |
| 53 | True | 8 | 10 | 4 | 22.5058 |
| 90 | True | 10 | 10 | 4 | 24.1555 |
| 92 | True | 12 | 10 | 4 | 24.1825 |
| 39 | True | 14 | 10 | 4 | 22.0710 |
| 120 | True | 16 | 10 | 4 | 25.0829 |
plt.figure(figsize=(8, 5))
plt.scatter(prefetch_factors["prefetch_factors"],
prefetch_factors["elapse"])plt.show()

從圖上可以看出,prefetch_factors數量似乎對加載時間的影響似乎不太明顯,最多的時間與最小的時間相差也僅為2.6秒,
結論
- 多執行緒加載資料肯定是比單執行緒快的?
- 這點是不用質疑的,單從計算機的運行方式就可以得出這個結論,這也是并行的優勢,
- 多執行緒能比單執行緒快多少?
- 從上面的結果,我們看到,當選用合適的超引數時,多執行緒加載相同的資料與相同的處理方法,比單執行緒快了52.31秒,快了2.6倍有多,就算是最不好的引數,多線和最長的加載時間為48.33秒,也比單執行緒的72.28秒,快差不多0.5倍,
- 找到這臺機器最快(或者比較全適)的越引數,可作為其它機器的參考
- dataset_workers 越大越好,但達到了一個臨界值后,不會再增加了,本測驗平臺的值為10
- data_loader_workers,不是越大越好,本測驗平臺最好的值為4,在4左右的值都是較好的參考值,然后隨著此引數的數量的增加,所需要的時間也呈線性的增漲,這也說明了PyTorch大data_loader_workers啟動需要等待更久的時間
- prefetch_factors的數量似乎對資料的加載時間影響不大,但最好不要是1,
本次測驗沒有監測記憶體還有CPU的使用率,但在程序中觀察了一下,CPU使用率基本都可以達到100%,也可以把這些引數也監測起來,形成更多的超引數,以便參考,
注意:由于在訓練的程序中也是需要利用CPU的,所以盡量不要太多的dataset_workers,盡量不要把CPU都使用到100%,而造成死機,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/320975.html
標籤:AI
