代码拉取完成,页面将自动刷新
# %%
import torch
import torch.distributed
import torch.nn as nn
import torch.nn.functional as F
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.checkpoint.state_dict import get_state_dict, StateDictOptions, set_state_dict
from torch.distributed.fsdp import CPUOffload, ShardingStrategy
from torch.distributed.fsdp import StateDictType, FullStateDictConfig, FullOptimStateDictConfig
from torch.distributed.fsdp.wrap import size_based_auto_wrap_policy
from torch.utils.data.distributed import DistributedSampler
from torch.utils.data import Dataset, DataLoader
import numpy as np
import random
import os
# %%
def set_seed(seed:int):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
# %%
def set_init():
torch.distributed.init_process_group('nccl')
torch.cuda.set_device(int(os.environ['LOCAL_RANK']))
# %%
class net(nn.Module):
def __init__(self, input_dim:int, hidden_dim:int, output_dim:int):
super(net, self).__init__()
self.input = nn.Linear(input_dim, hidden_dim)
self.output = nn.Linear(hidden_dim, output_dim)
def forward(self, x:torch.Tensor):
return self.output(self.input(x))
# %%
class datasets(Dataset):
def __init__(self, batch_size:int, input_dim:int):
self.data = torch.randn(batch_size, input_dim)
self.label = torch.randint(0, 2, (batch_size,))
def __getitem__(self, idx):
return self.data[idx], self.label[idx]
def __len__(self):
return len(self.label)
# %%
set_seed(42)
set_init()
net_model = net(16, 32,1)
fsdp_model = FSDP(net_model,
# auto_wrap_policy=size_based_auto_wrap_policy(min_num_params=1e6), 这个是当参数量大于1e6时,执行FSDP
sharding_strategy=ShardingStrategy.FULL_SHARD,
cpu_offload=CPUOffload(offload_params=True),
device_id=torch.cuda.current_device())
print(f"#######################{os.environ['LOCAL_RANK']}#######################")
# * 查看FSDP是否将模型层分片,分成不同的FSDP unit。通过运行最终输出的结果如下:
#######################1#######################
# _fsdp_wrapped_module._flat_param torch.Size([289])
#######################0#######################
# _fsdp_wrapped_module._flat_param torch.Size([289])
# * 参数量计算方式: 16 * 32(weight) + 32(bias) + 32 * 1(weight) + 1(bias) = 577。因为是奇数,所以有一个GPU进行了空白填充所以最终所有的参数量为 577 + 1 = 578。各个GPU上的参数量为578 / 2 = 289
for name, param in fsdp_model.named_parameters():
print(name, param.shape)
optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=0.0001)
loos_fun = torch.nn.BCEWithLogitsLoss()
dataset_train = datasets(16, 16)
dataloader_train = DataLoader(dataset_train,16, sampler=DistributedSampler(dataset_train, shuffle=False), pin_memory=True)
if torch.distributed.get_rank() == 0:
os.makedirs("/hy-tmp/deep-learn-log/DistrubuteParallel/model_ckpt", exist_ok=True)
torch.distributed.barrier()
for i in range(3):
loss_item = 0
for idx, (data, label) in enumerate(dataloader_train):
optimizer.zero_grad()
data = data.to(int(os.environ['LOCAL_RANK']))
label = label.to(torch.float16).to(int(os.environ['LOCAL_RANK'])).unsqueeze(-1)
print(f"Cuda:{os.environ['LOCAL_RANK']}, Data Batch Size:{data.shape}")
pre_label = fsdp_model(data)
loss = loos_fun(pre_label, label)
loss.backward()
optimizer.step()
loss_item += loss.to('cpu').item()
print(f"Cuda:{os.environ['LOCAL_RANK']}, Epoch:{i}, Loss:{loss_item}")
FSDP.set_state_dict_type(
fsdp_model,
StateDictType.FULL_STATE_DICT,
FullStateDictConfig(rank0_only=False),
FullOptimStateDictConfig(rank0_only=False)
)
state_dict = fsdp_model.state_dict()
original_osd = optimizer.state_dict()
optimizer_state_dict = FSDP.optim_state_dict(
fsdp_model,
optimizer,
optim_state_dict=original_osd
)
checkpoint = {
"model": state_dict,
"optimizer": optimizer_state_dict
}
torch.save(checkpoint,f'/hy-tmp/deep-learn-log/DistrubuteParallel/model_ckpt/fsdp_test_{int(os.environ['LOCAL_RANK'])}.pt')
checkpoint_load = torch.load(f'/hy-tmp/deep-learn-log/DistrubuteParallel/model_ckpt/fsdp_test_{int(os.environ['LOCAL_RANK'])}.pt', map_location="cpu")
net_model = net(16, 32,1)
fsdp_model = FSDP(net_model,
# auto_wrap_policy=size_based_auto_wrap_policy(min_num_params=1e6), 这个是当参数量大于1e6时,执行FSDP
sharding_strategy=ShardingStrategy.FULL_SHARD,
cpu_offload=CPUOffload(offload_params=True),
device_id=torch.cuda.current_device())
for name, param in fsdp_model.named_parameters():
print(name, param.shape)
# optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=0.0001)
# loos_fun = torch.nn.BCEWithLogitsLoss()
# dataset_train = datasets(16, 16)
# dataloader_train = DataLoader(dataset_train,16, sampler=DistributedSampler(dataset_train, shuffle=False), pin_memory=True)
FSDP.set_state_dict_type(
fsdp_model,
StateDictType.FULL_STATE_DICT,
FullStateDictConfig(rank0_only=False),
FullOptimStateDictConfig(rank0_only=False)
)
fsdp_model.load_state_dict(checkpoint_load['model'])
# optim_state_dict = FSDP.optim_state_dict_to_load(fsdp_model, optimizer, checkpoint['optimizer'])
# optimizer.load_state_dict(optim_state_dict)
fsdp_model.eval()
torch.distributed.barrier()
for i in range(3):
loss_item = 0
for idx, (data, label) in enumerate(dataloader_train):
data = data.to(int(os.environ['LOCAL_RANK']))
label = label.to(torch.float16).to(int(os.environ['LOCAL_RANK'])).unsqueeze(-1)
# print(f"Cuda:{os.environ['LOCAL_RANK']}, Data Batch Size:{data.shape}")
pre_label = fsdp_model(data)
loss = loos_fun(pre_label, label)
loss_item += loss.to('cpu').item()
print(f"Cuda:{os.environ['LOCAL_RANK']}, Epoch:{i}, Loss:{loss_item}")
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。