4 Star 3 Fork 2

Likelihood-Lab/GAN-IndexFuture

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
rdcgan.py 8.41 KB
一键复制 编辑 原始数据 按行查看 历史
Angela Luo 提交于 2019-07-31 11:38 . res-generator dc-discriminator
import numpy as np
from pandas import DataFrame
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from data_loader import load_data
from laplotter import LossAccPlotter
from visualization import plot
class ResBlock(nn.Module):
def __init__(self, inplanes, planes, downsample=None, upsample=None, gen=True):
super(ResBlock, self).__init__()
self.conv1 = nn.ConvTranspose1d(inplanes, planes, 3, 1, 1, bias=False) if gen else nn.Conv1d(inplanes, planes, 3, 1, 1, bias=False)
self.conv2 = nn.ConvTranspose1d(planes, planes, 3, 1, 1, bias=False) if gen else nn.Conv1d(planes, planes, 3, 1, 1, bias=False)
self.relu = nn.ReLU(inplace=True) if gen else nn.LeakyReLU(0.1, inplace=True)
self.bn = nn.BatchNorm1d(planes)
self.downsample = downsample
self.upsample = upsample
def forward(self, x):
res = x
x = self.conv1(x)
x = self.bn(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn(x)
if self.downsample is not None:
res = self.downsample(res)
if self.upsample is not None:
res = self.upsample(res)
x += res
return self.relu(x)
class Generator(nn.Module):
def __init__(self, latent_dim, block, layers, ngf=32):
super(Generator, self).__init__()
self.inplanes = ngf*8
self.conv1 = nn.ConvTranspose1d(latent_dim, ngf*8, 4, 1, 0, bias=False)
self.bn = nn.BatchNorm1d(ngf*8)
self.layer1 = self._make_layer(block, ngf*8, layers[0])
self.layer2 = self._make_layer(block, ngf*4, layers[1])
self.layer3 = self._make_layer(block, ngf*2, layers[2])
self.layer4 = self._make_layer(block, ngf, layers[3])
self.conv2 = nn.ConvTranspose1d(ngf, 240, 3, 1, 1, bias=False)
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
elif isinstance(m, nn.BatchNorm1d):
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
def _make_layer(self, block, planes, blocks):
downsample = None
if self.inplanes != planes:
downsample = nn.Sequential(
nn.Conv1d(self.inplanes, planes, 1, 2, 2, bias=False),
nn.BatchNorm1d(planes)
)
layers = []
layers.append(block(self.inplanes, planes, downsample=downsample))
self.inplanes = planes
for i in range(1, blocks):
layers.append(block(self.inplanes, planes, downsample=downsample))
return nn.Sequential(*layers)
def forward(self, x):
x = self.conv1(x)
x = self.bn(x)
x = F.relu(x, inplace=True)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.conv2(x)
return torch.tanh(x)
class Discriminator(nn.Module):
def __init__(self, block, layers, ndf=32):
super(Discriminator, self).__init__()
self.model = nn.Sequential(
# input is 240 * 4
nn.Conv1d(240, ndf, 3, 1, 1, bias=False),
nn.LeakyReLU(0.1, inplace=True),
# state size. (ndf) x 4
nn.Conv1d(ndf, ndf*2, 3, 1, 1, bias=False),
nn.BatchNorm1d(ndf*2),
nn.LeakyReLU(0.1, inplace=True),
# state size. (ndf*2) x 4
nn.Conv1d(ndf*2, ndf*4, 3, 1, 1, bias=False),
nn.BatchNorm1d(ndf*4),
nn.LeakyReLU(0.1, inplace=True),
# state size. (ndf*4) x 4
nn.Conv1d(ndf*4, 1, 4, 1, 0, bias=False),
nn.Sigmoid()
)
def forward(self, x):
return self.model(x)
def visualize(instrument, tensor_data, epoch):
dimage = tensor_data.data[:1][0].numpy()
# print(dimage)
df = DataFrame(dimage, columns=['high','low','open','close'])
plot(df, epoch, instrument=instrument, generate=True, v=False)
def train(instrument, batch_size, latent_dim, epochs, mode=None):
data_loader = load_data(instrument, batch_size)
generator = Generator(latent_dim, ResBlock, [1,1,1,1])
discriminator = Discriminator(ResBlock, [1,1,1,1])
# print("Generator's state_dict:")
# for param_tensor in generator.state_dict():
# print(param_tensor, "\t", generator.state_dict()[param_tensor].size())
# print("Discriminator's state_dict:")
# for param_tensor in discriminator.state_dict():
# print(param_tensor, "\t", discriminator.state_dict()[param_tensor].size())
g_optimizer = optim.Adam(generator.parameters(), lr=0.002, betas=(0.5,0.999))
d_optimizer = optim.Adam(discriminator.parameters(), lr=0.002, betas=(0.5,0.999))
loss = nn.BCELoss()
if not os.path.exists("./plots/"+instrument):
os.makedirs("./plots/"+instrument)
plotter = LossAccPlotter(
save_to_filepath="./loss/"+instrument+"/loss.png",
show_regressions=False,
show_acc_plot=False,
show_averages=False,
show_plot_window=True,
x_label="Epoch")
if not os.path.exists("./model/"+instrument):
os.makedirs("./model/"+instrument)
if not os.path.exists("./loss/"+instrument):
os.makedirs("./loss/"+instrument)
epoch = 0
d_loss_list = []
g_loss_list = []
while True:
for num_batch, real_data in enumerate(data_loader):
############################
# (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
###########################
for i in range(2):
real_data_d = next(iter(data_loader))
size = real_data_d.size(0)
y_real = Variable(torch.ones(size, 1, 1))
y_fake = Variable(torch.zeros(size, 1, 1))
real_data = Variable(real_data_d.float())
fake_data = Variable(torch.from_numpy(
np.random.normal(0,0.2,(size, latent_dim, 1))).float()
)
fake_gen = generator(fake_data).detach()
prediction_real = discriminator(real_data)
loss_real = loss(prediction_real, y_real)
prediction_fake = discriminator(fake_gen)
loss_fake = loss(prediction_fake, y_fake)
d_loss = loss_real + loss_fake
d_optimizer.zero_grad()
d_loss.backward()
d_optimizer.step()
############################
# (2) Update G network: maximize log(D(G(z)))
###########################
size = real_data.size(0)
y_real = Variable(torch.ones(size, 1, 1))
fake_data = Variable(torch.from_numpy(
np.random.normal(0,0.2,(size, latent_dim, 1))).float()
)
fake_gen = generator(fake_data)
prediction = discriminator(fake_gen)
g_loss = loss(prediction, y_real)
g_optimizer.zero_grad()
g_loss.backward()
g_optimizer.step()
if num_batch % 20 == 0:
print("epoch: %d, num_batch: %d, d-loss: %.4f, g-loss: %.4f"
% (epoch, num_batch, d_loss.data.numpy(), g_loss.data.numpy()))
visualize(instrument, fake_gen, epoch)
plotter.add_values(epoch,
loss_train=g_loss.item(),
loss_val=d_loss.item())
d_loss_list.append(d_loss.item())
g_loss_list.append(g_loss.item())
if epoch % 20 == 0:
torch.save(generator, "./model/"+instrument+"/generator_epoch_"+str(epoch)+".model")
torch.save(discriminator, "./model/"+instrument+"/discriminator_epoch_"+str(epoch)+".model")
if epoch % 1000 == 0:
d_loss_np = np.array(d_loss_list)
np.save("./loss/"+instrument+"/d_loss_epoch_"+str(epoch)+".npy", d_loss_np)
g_loss_np = np.array(g_loss_list)
np.save("./loss/"+instrument+"/g_loss_epoch_"+str(epoch)+".npy", g_loss_np)
epoch += 1
if mode == "test" and epoch == epochs:
break
if __name__ == "__main__":
instrument = 'IF'
batch_size = 64
epochs = 10
latent_dim = 100
mode = "train"
train(instrument, batch_size, latent_dim, epochs, mode=mode)
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/likelihoodlab/GAN-IndexFuture.git
git@gitee.com:likelihoodlab/GAN-IndexFuture.git
likelihoodlab
GAN-IndexFuture
GAN-IndexFuture
anqi

搜索帮助

A270a887 8829481 3d7a4017 8829481