代码拉取完成,页面将自动刷新
# %%
import os
from pyexpat import model
from turtle import mode
import torch
import torch.distributed
import torch.nn as nn
import torch.nn.functional as F
from torch.amp.autocast_mode import autocast
from torch.utils.data import Dataset, DataLoader
# %%
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
# %%
# ! 学习资料:https://github.com/chunhuizhang/pytorch_distribute_tutorials/blob/main/tutorials/ddp_gpus_torchrun.py
# %%
# ! !!!!!!!!!!!!!!!!!
# ? 该类的作用是什么?
import torch.multiprocessing as mp
# %%
def ddp_setup():
init_process_group(backend="nccl")
torch.cuda.set_device(int(os.environ['LOCAL_RANK']))
# %%
class models(nn.Module):
def __init__(self, input_dim:int, hidden_dim:int, output_dim:int):
super(models, self).__init__()
self.input = nn.Linear(input_dim, hidden_dim)
self.output = nn.Linear(hidden_dim, output_dim)
def forward(self, x:torch.Tensor):
print(f"For Forward-Input Device:{x.device}")
print(f"For Forward-Input Size:{x.size()}")
return self.output(self.input(x))
# %%
class dataset(Dataset):
def __init__(self, rowdim:int, input_dim:int):
super(dataset, self).__init__()
self.data = torch.rand(rowdim, input_dim)
self.label = torch.randint(0, 2,(rowdim,))
def __getitem__(self, idx):
return self.data[idx], self.label[idx]
def __len__(self):
return len(self.data)
# %%
class Trainer():
def __init__(self, model:torch.nn.Module, trainDataloader: DataLoader, optimizer:torch.optim.Optimizer):
self.gpuId = int(os.environ['LOCAL_RANK'])
self.trainDataloader = trainDataloader
self.optimizer = optimizer
print(f"gpu_id:{self.gpuId}")
self.model = DDP(model.to(self.gpuId), device_ids=[self.gpuId])
def _run_batch(self, input: torch.Tensor, label: torch.Tensor):
self.optimizer.zero_grad()
with autocast(input.device.type):
output = self.model(input)
loss = F.binary_cross_entropy_with_logits(output, label.unsqueeze(-1))
loss.backward()
self.optimizer.step()
return loss
def _run_epoch(self, epoch: int):
batchSize = len(next(enumerate(self.trainDataloader))[1][0])
print(f"GPU:{self.gpuId}, Epoch:{epoch} | Batchsize:{batchSize} | Step:{len(self.trainDataloader)}")
item_loss = 0
self.trainDataloader.sampler.set_epoch(epoch)
for ids, data in enumerate(iterable=self.trainDataloader):
input = data[0].to(self.gpuId)
label = data[1].to(torch.float16).to(self.gpuId)
loss = self._run_batch(input, label)
item_loss += loss.to("cpu").item()
# 从这里可以发现,权重已经从cuda:0广播到其他cuda上。
print(f"GPU:{self.gpuId}, Epoch:{epoch} | Input Bias:{self.model.module.input.bias[0]}")
return item_loss / len(self.trainDataloader)
def train(self, countEpoch:int):
for epoch in range(countEpoch):
epochloss = self._run_epoch(epoch)
print(f"Epoch:{epoch} Loss Mean:{epochloss}")
# %%
# net = models(32, 32, 1)
# net.input.bias
# %%
def main(counEpoch: int, batchSize: int):
ddp_setup()
print(f"Get Rank:{torch.distributed.get_rank()}")
# ! 增加torch.distributed.barrier()
# if torch.distributed.get_rank() != 0:
if int(os.environ['LOCAL_RANK']) != 0:
print(f"Main-Rank:{os.environ['LOCAL_RANK']}")
torch.distributed.barrier()
trainDatasets = dataset(32, 32)
# if torch.distributed.get_rank == 0:
if int(os.environ['LOCAL_RANK']) == 0:
print(f"Main-Rank:{os.environ['LOCAL_RANK']}")
torch.distributed.barrier()
# ? num_workers pin_memory 在这里有影响吗?
'''
# * 解答 num_works与DistributedSampler是不冲突的, 首先DistributedSampler已经将数据分好组, 然后多进层加载数据
# * 解答 pin_memory=True是指将数据放入CPU的pinned memory中, 而不是pageable memory. Cuda从pinned memory加载数据要比pageable memory快.
'''
trainDataloader = DataLoader(trainDatasets, batch_size=batchSize, num_workers=4, shuffle=False, sampler=DistributedSampler(trainDatasets), pin_memory=True)
net = models(32, 32, 1)
optimzer = torch.optim.Adam(net.parameters(), lr=0.001)
trainer = Trainer(net,trainDataloader,optimzer)
trainer.train(counEpoch)
# ! 作用是什么?销毁当前进程的分布式进程组,释放与分布式通信相关的资源
print(f"END-GPU:{os.environ['LOCAL_RANK']}, Input Bias:{net.input.bias[0]}")
destroy_process_group()
# %%
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description="DDP Simple Example!")
parser.add_argument('--max_epochs', type=int, help="Total Epoches!")
parser.add_argument("--batch_size", default=32, type=int, help="Input bathc size!")
args = parser.parse_args()
world_size = torch.cuda.device_count()
print(f"world_size:{world_size}")
# mp.spawn(main, args=(world_size, args.max_epochs, args.batch_size), nprocs=world_size)
main(args.max_epochs, args.batch_size)
# torchrun --nproc-per-node=2 DDP_torchrun.py --max_epochs 5 --batch_size 32 多卡
# python -m torch.distributed.launch --use-env --nproc-per-node=2 DDP_torchrun.py --max_epochs 5 --batch_size 32 多卡 等价torchrun
# orchrun DDP_torchrun.py --max_epochs 5 --batch_size 32 单卡
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。