當一塊GPU不夠用時,我們就需要使用多卡進行并行訓練,其中多卡并行可分為資料并行和模型并行,具體區別如下圖所示:

由于模型并行比較少用,這里只對資料并行進行記錄,對于pytorch,有兩種方式可以進行資料并行:資料并行(DataParallel, DP)和分布式資料并行(DistributedDataParallel, DDP),
在多卡訓練的實作上,DP與DDP的思路是相似的:
1、每張卡都復制一個有相同引數的模型副本,
2、每次迭代,每張卡分別輸入不同批次資料,分別計算梯度,
3、DP與DDP的主要不同在于接下來的多卡通信:
DP的多卡互動實作在一個行程之中,它將一張卡視為主卡,維護單獨模型優化器,所有卡計算完梯度后,主卡匯聚其它卡的梯度進行平均并用優化器更新模型引數,再將模型引數更新至其它卡上,
DDP則分別為每張卡創建一個行程,每個行程相應的卡上都獨立維護模型和優化器,在每次每張卡計算完梯度之后,行程之間以NCLL(NVIDIA GPU通信)為通信后端,使各卡獲取其它卡的梯度,各卡對獲取的梯度進行平均,然后執行后續的引數更新,由于每張卡上的模型與優化器引數在初始化時就保持一致,而每次迭代的平均梯度也保持一致,那么即使沒有進行引數復制,所有卡的模型引數也是保持一致的,
Pytorch官方推薦我們使用DDP,DP經過我的實驗,兩塊GPU甚至比一塊還慢,當然不同模型可能有不同的結果,下面分別對DP和DDP進行記錄,
DP
Pytorch的DP實作多GPU訓練十分簡單,只需在單GPU的基礎上加一行代碼即可,以下是一個DEMO的代碼,
import torch from torch import nn from torch.optim import Adam from torch.nn.parallel import DataParallel class DEMO_model(nn.Module): def __init__(self, in_size, out_size): super().__init__() self.fc = nn.Linear(in_size, out_size) def forward(self, inp): outp = self.fc(inp) print(inp.shape, outp.device) return outp model = DEMO_model(10, 5).to('cuda') model = DataParallel(model, device_ids=[0, 1]) # 額外加這一行 adam = Adam(model.parameters()) # 進行訓練 for i in range(1): x = torch.rand([128, 10]) # 獲取訓練資料,無需指定設備 y = model(x) # 自動均勻劃分資料批量并分配至各GPU,輸出結果y會聚集到GPU0中 loss = torch.norm(y) loss.backward() adam.step()
其中model = DataParallel(model, device_ids=[0, 1])這行將模型復制到0,1號GPU上,輸入資料x無需指定設備,它將會被均勻分配至各塊GPU模型,進行前向傳播,之后各塊GPU的輸出再合并到GPU0中,得到輸出y,輸出y在GPU0中計算損失,并進行反向傳播計算梯度、優化器更新引數,
DDP
為了對分布式編程有基本概念,首先使用pytorch內部的方法實作一個多行程程式,再使用DDP模塊實作模型的分布式訓練,
Pytorch分布式基礎
首先使用pytorch內部的方法撰寫一個多行程程式作為撰寫分布式訓練的基礎,
import os, torch import torch.multiprocessing as mp import torch.distributed as dist def run(rank, size): tensor = torch.tensor([1,2,3,4], device='cuda:'+str(rank)) # ——1—— group = dist.new_group(range(size)) # ——2—— dist.all_reduce(tensor=tensor, group=group, op=dist.ReduceOp.SUM) # ——3—— print(str(rank)+ ': ' + str(tensor) + '\n') def ini_process(rank, size, fn, backend = 'nccl'): os.environ['MASTER_ADDR'] = '127.0.0.1' # ——4—— os.environ['MASTER_PORT'] = '1234' dist.init_process_group(backend, rank=rank, world_size=size) # ——5—— fn(rank, size) # ——6—— if __name__ == '__main__': # ——7—— mp.set_start_method('spawn') # ——8—— size = 2 # ——9—— ps = [] for rank in range(size): p = mp.Process(target=ini_process, args=(rank, size, run)) # ——10—— p.start() ps.append(p) for p in ps: # ——11—— p.join()
以上代碼主行程創建了兩個子行程,子行程之間使用NCCL后端進行通信,每個子行程各占用一個GPU資源,實作了所有GPU張量求和的功能,細節注釋如下:
1、為每個子行程定義相同名稱的張量,并分別分配至不同的GPU,從而能進行后續的GPU間通信,
2、定義一個通信組,用于后面的all_reduce通信操作,
3、all_reduce操作以及其它通信方式請看下圖:

4、定義編號(rank)為0的ip和埠地址,讓每個子行程都知道,ip和埠地址可以隨意定義,不沖突即可,如果不設定,子行程在涉及行程通信時會出錯,
5、初始化子行程組,定義行程間的通信后端(還有GLOO、MPI,只有NCCL支持GPU間通信)、子行程rank、子行程數量,只有當該函式在size個行程中被呼叫時,各行程才會繼續從這里執行下去,這個函式統一了各子行程后續代碼的開始時間,
6、執行子行程代碼,
7、由于創建子行程會執行本程式,因此主行程的執行需要放在__main__里,防止子行程執行,
8、開始創建子行程的方式:spawn、fork,windows默認spawn,linux默認fork,具體區別請百度,
9、由于是以NCCL為通信后端的分布式訓練,如果不同行程中相同名稱的張量在同一GPU上,當這個張量進行行程間通信時就會出錯,為了防止出錯,限制每張卡獨占一個行程,每個行程獨占一張卡,這里有兩張卡,所以最多只能創建兩個行程,
10、創建子行程,傳入子行程的初始化方法,及子行程呼叫該方法的引數,
11、等待子行程全部運行完畢后再退出主行程,
輸出結果如下:

正是各行程保存在不同GPU上的張量的廣播求和(all_reduce)的結果,
參考: https://pytorch.org/tutorials/intermediate/dist_tuto.html
Pytorch分布式訓練DEMO
我們實際上可以根據上面的分布式基礎寫一個分布式訓練,但由于不知道pytorch如何實作GPU間模型梯度的求和,即官方教程中所謂的ring_reduce(沒找到相關API),時間原因,就不再去搜索相關方法了,這里僅記錄pytorh內部的分布式模型訓練,即利用DDP模塊實作,Pytorch版本1.12.1,
import torch,os import torch.distributed as dist import torch.multiprocessing as mp import torch.optim as optim from torch.nn.parallel import DistributedDataParallel as DDP from torch import nn def example(rank, world_size): dist.init_process_group("nccl", rank=rank, world_size=world_size) # ——1—— model = nn.Linear(2, 1, False).to(rank) if rank == 0: # ——2—— model.load_state_dict(torch.load('model_weight')) # model_stat = torch.load('model_weight', {'cuda:0':'cuda:%d'%rank}) #這樣讀取保險一點 # model.load_state_dict(model_stat) opt = optim.Adam(model.parameters(), lr=0.0001) # ——3—— opt_stat = torch.load('opt_weight', {'cuda:0':'cuda:%d'%rank}) # ——4—— opt.load_state_dict(opt_stat) # ——5—— ddp_model = DDP(model, device_ids=[rank])# ——6 inp = torch.tensor([[1.,2]]).to(rank) # ——7—— labels = torch.tensor([[5.]]).to(rank) outp = ddp_model(inp) loss = torch.mean((outp - labels)**2) opt.zero_grad() loss.backward() # ——8—— opt.step() # ——9 if rank == 0:# ——10—— torch.save(model.state_dict(), 'model_weight') torch.save(opt.state_dict(), 'opt_weight') if __name__=="__main__": os.environ["MASTER_ADDR"] = "localhost"# ——11—— os.environ["MASTER_PORT"] = "29500" world_size = 2 mp.spawn(example, args=(world_size,), nprocs=world_size, join=True) # ——12——
以上代碼包含模型在多GPU上讀取權重、進行分布式訓練、保存權重等程序,細節注釋如下:
1、初始化行程組,由于使用GPU通信,后端應該寫為NCCL,不過經過實驗,即使錯寫為gloo,DDP內部也會自動使用NCCL作為通信模塊,
2、由于后面使用DDP包裹模型進行訓練,其內部會自動將所有rank的模型權重同步為rank 0的權重,因此我們只需在rank 0上讀取模型權重即可,這是基于Pytorch版本1.12.1,低級版本似乎沒有這個特性,需要在不同rank分別匯入權重,則load需要傳入map_location,如下面注釋的兩行代碼所示,
3、這里創建model的優化器,而不是創建用ddp包裹后的ddp_model的優化器,是為了兼容單GPU訓練,讀取優化器權重更方便,
4、將優化器權重讀取至該行程占用的GPU,如果沒有map_location引數,load會將權重讀取到原本保存它時的設備,
5、優化器獲取權重,經過實驗,即使權重不在優化器所在的GPU,權重也會遷移過去而不會報錯,當然load直接讀取到相應GPU會減少資料傳輸,
6、DDP包裹模型,為模型復制一個副本到相應GPU中,所有rank的模型副本會與rank 0保持一致,注意,DDP并不復制模型優化器的副本,因此各行程的優化器需要我們在初始化時保持一致,權重要么不讀取,要么都讀取,
7、這里開始模型的訓練,資料需轉移到相應的GPU設備,
8、在backward中,所有行程的模型計算梯度后,會進行平均(不是相加),也就是說,DDP在backward函式添加了hook,所有行程的模型梯度的ring_reduce將在這里執行,這個可以通過給各行程模型分別輸入不同的資料進行驗證,backward后這些模型有相同的梯度,且驗算的確是所有行程梯度的平均,此外,還可以驗證backward函式會阻斷(block)各行程使用梯度,只有當所有行程都完成backward之后,各行程才能讀取和使用梯度,這保證了所有行程在梯度上的一致性,
9、各行程優化器使用梯度更新其模型副本權重,由于初始化時各行程模型、優化器權重一致,每次反向傳播梯度也保持一致,則所有行程的模型在整個訓練程序中都能保持一致,
10、由于所有行程權重保持一致,我們只需通過一個行程保存即可,
11、定義rank 0的IP和埠,使用mp.spawn,只需在主行程中定義即可,無需分別在子行程中定義,
12、創建子行程,傳入:子行程呼叫的函式(該函式第一個引數必須是rank)、子行程函式的引數(除了rank引數外)、子行程數、是否等待所有子行程創建完畢再開始執行,
參考: https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/514065.html
標籤:Python
下一篇:關于scrapy的代理問題
