Ai
1 Star 1 Fork 0

LEVSONGSW/DeepLearnLog

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.py 6.51 KB
一键复制 编辑 原始数据 按行查看 历史
LEVSONGSW 提交于 2025-11-18 09:36 +08:00 . Ignore Warning
# %%
import torch
from transformer_RoPE import transformer, LabelSmoothing
from datasetsTransformers import MyData
from datasets import load_dataset
from torch.utils.data import DataLoader
import logging
from buildVocabulary import TokenizerSRC, TokenizerTAG
from torch.optim import Adam
import warnings
warnings.filterwarnings('ignore')
# %%
LOGGINGFILENAME = './position_embedding.txt'
loger = logging.getLogger("Position Embedding")
console = logging.StreamHandler()
file = logging.FileHandler(LOGGINGFILENAME)
loger.setLevel(logging.INFO)
console.setLevel(logging.INFO)
file.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)-9s - %(filename)-8s : %(lineno)s line - %(message)s")
console.setFormatter(formatter)
file.setFormatter(formatter)
loger.addHandler(console)
loger.addHandler(file)
# %%
BATCHSIZE = 16
DATAPATH = './Multi30K'
DATASETS = load_dataset(DATAPATH)
DATALISTNAME = ['train', 'validation', 'test']
SRCVOCAB = 10063
TAGVOCAB = 19351
DMODEL = 512
NLAYER = 6
NHEAD = 8
DFFDIM = 2048
DROPOUT = 0.1
EPOCHS = 200
DEVICE = torch.device( 'cuda'if torch.cuda.is_available() else 'cpu')
# %%
train_dataset = MyData('train', DATASETS, DATALISTNAME)
val_dataset = MyData('validation', DATASETS, DATALISTNAME)
test_dataset = MyData('test', DATASETS, DATALISTNAME)
tokenizerSRC = TokenizerSRC("./TokenizerFile/en_word2idx.json", "./TokenizerFile/en_idx2word.json")
tokenizerTAG = TokenizerTAG("./TokenizerFile/de_word2idx.json", "./TokenizerFile/de_idx2word.json")
# %%
def subsequent_mask(size):
"Mask out subsequent positions."
attn_shape = (1, size, size)
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
torch.uint8
)
return subsequent_mask == 0
# %%
def get_mask(src, tgt=None, pad=0):
src_mask = (src != pad).unsqueeze(-2)
if tgt is not None:
tgt_i = tgt[:, :-1]
tgt_y = tgt[:, 1:]
tgt_mask = (tgt_i != pad).unsqueeze(-2)
tgt_mask = tgt_mask & subsequent_mask(tgt_i.size(-1)).type_as(
tgt_mask.data
)
# notoken = (tgt_y != pad).data.sum()
return (src, src_mask, tgt_i, tgt_y, tgt_mask)
return (src, src_mask)
def requireToken(data):
srcSeq = [i[0] for i in data]
tagSeq = [i[1] for i in data]
src_tokenizer = tokenizerSRC.encode_batch(texts=srcSeq, max_length=100)
src_tokenizer = torch.tensor(src_tokenizer)
tag_tokenizer = tokenizerTAG.encode_batch(tagSeq, max_length=100)
tag_tokenizer = torch.tensor(tag_tokenizer)
src, src_mask, tgt_i, tgt_y, tgt_mask = get_mask(src_tokenizer, tag_tokenizer)
return src, src_mask, tgt_i, tgt_y, tgt_mask
# %%
train_dataload = DataLoader(train_dataset, batch_size=BATCHSIZE, num_workers=4, shuffle=True, drop_last=True, collate_fn=requireToken)
val_dataload = DataLoader(val_dataset, batch_size=BATCHSIZE, num_workers=4, shuffle=False, drop_last=False, collate_fn=requireToken)
train_dataload = DataLoader(test_dataset, batch_size=BATCHSIZE, num_workers=4, shuffle=False, drop_last=False, collate_fn=requireToken)
# %%
# src_vocab, tag_vocab, d_model=512, N=6, h=8, d_ff=2048, dropout=0.1
model = transformer(SRCVOCAB, TAGVOCAB, DMODEL, NHEAD, NHEAD, DFFDIM, DROPOUT)
model.to(DEVICE)
optimizer = Adam(model.parameters(), lr=0.0001)
loss_fun = LabelSmoothing(TAGVOCAB, 0)
# %%
def valModel(model, val_dataload, epoch):
model.eval()
loss_items = 0
count = 0
for ids, (src, src_mask, tgt_i, tgt_y, tgt_mask) in enumerate(val_dataload):
src = src.to(DEVICE)
tgt_i = tgt_i.to(DEVICE)
src_mask = src_mask.to(DEVICE)
tgt_mask = tgt_mask.to(DEVICE)
# tgt_y = tgt_y.to(DEVICE)
output = model(src, tgt_i, src_mask, tgt_mask)
loss = loss_fun(output.to('cpu'), tgt_y)
loss_items += loss
count += 1
loger.info(f"Val-Epoch:{epoch} Steps:{ids} Loss:{loss_items.to('cpu').item()/count}")
return loss_items.item()/count
# %%
# ! 训练 由于GPU显存不够 这里没有通过验证集评估,只采用训练集的损失函数来保存
for epoch in range(EPOCHS):
loss_items = 0
count = 0
best_val = 15000
for ids, (src, src_mask, tgt_i, tgt_y, tgt_mask) in enumerate(train_dataload):
model.train()
optimizer.zero_grad()
src = src.to(DEVICE)
tgt_i = tgt_i.to(DEVICE)
src_mask = src_mask.to(DEVICE)
tgt_mask = tgt_mask.to(DEVICE)
tgt_y = tgt_y.to(DEVICE)
output = model(src, tgt_i, src_mask, tgt_mask)
loss = loss_fun(output, tgt_y)
loss.backward()
optimizer.step()
loss_items += loss
count += 1
loger.info(f"Train-Epoch:{epoch} Steps:{ids} Loss:{loss_items.to('cpu').item()/count}")
# val_loss = valModel(model, val_dataload, epoch)
if best_val > loss_items.to('cpu').item()/count:
best_val = loss_items.to('cpu').item()/count
torch.save(model.state_dict(), "./model/best.pth")
torch.save(model.state_dict(), "./model/last.pth")
# %%
# ! 模型推理部分,transformer 循环使用decoder部分进行推理
def test_decoder():
model_state_dict = torch.load("./model/best.pth")
model_eval = transformer(SRCVOCAB, TAGVOCAB, DMODEL, NHEAD, NHEAD, DFFDIM, DROPOUT)
model_eval.load_state_dict(model_state_dict)
model_eval.to(DEVICE)
testData = DATASETS['test']
srcSeq = testData['en'][0]
tagSeq = testData['de'][0]
src_tokenizer = tokenizerSRC.encode(text=srcSeq, max_length=100)
src_tokenizer = torch.tensor(src_tokenizer).unsqueeze(0)
tag_tokenizer = tokenizerTAG.encode(tagSeq, max_length=100)
tag_tokenizer = torch.tensor(tag_tokenizer).unsqueeze(0)
src, src_mask, tgt_i, _, tgt_mask = get_mask(src_tokenizer, tag_tokenizer)
tgt_mask = None
tgt_i_input = tgt_i[:, 0].unsqueeze(0)
src = src.to(DEVICE)
tgt_i_input = tgt_i_input.to(DEVICE)
src_mask = src_mask.to(DEVICE)
output = model_eval(src, tgt_i_input, src_mask, tgt_mask)
tgt_i_input = torch.cat([tgt_i_input, torch.argmax(output, dim=2).long()], dim=-1)
for i in range(100-1):
tgt_mask = (torch.triu(torch.ones((tgt_i_input.size(1), tgt_i_input.size(1)), device=DEVICE))==1).transpose(0, 1)
tgt_mask = tgt_mask.unsqueeze(0)
output = model_eval(src, tgt_i_input, src_mask, tgt_mask)
output = torch.argmax(output, dim=2).long()[:,-1].unsqueeze(0)
tgt_i_input = torch.cat([tgt_i_input, output], dim=-1)
if output == 19350:
break
text = tokenizerTAG.decode(tgt_i_input[0].to('cpu').tolist())
print(f"tagSeq:{tagSeq}")
print(f"outText:{text}")
# %%
test_decoder()
# %%
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/levsongsw/deep-learn-log.git
git@gitee.com:levsongsw/deep-learn-log.git
levsongsw
deep-learn-log
DeepLearnLog
master

搜索帮助