代码拉取完成,页面将自动刷新
import os
import time
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras import layers
from IPython import display
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline
from tensorflow import keras
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype('float32')
x_test = x_test.astype('float32')
#x_train = x_train / 255.
x_test = x_test / 255.
# Batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(x_train).\
shuffle(60000).batch(128)
plt.figure(figsize=(10, 10))
for images in train_dataset.take(1):
for i in range(9):
ax = plt.subplot(3, 3, i + 1)
plt.imshow(images[i,:,:,0].numpy().astype("uint8"), cmap='gray')
plt.axis("off")
normalization_layer = layers.experimental.preprocessing.Rescaling(scale= 1./255)
normalized_ds = train_dataset.map(lambda x: normalization_layer(x))
image_batch = next(iter(normalized_ds))
first_image = image_batch[0]
input_encoder = (28, 28, 1)
input_decoder = (2,)
def sampling_model(distribution_params):
mean, log_var = distribution_params
epsilon = K.random_normal(shape=K.shape(mean), mean=0., stddev=1.)
print(epsilon)
return mean + K.exp(log_var / 2) * epsilon
sampling_model
<function __main__.sampling_model(distribution_params)>
def encoder(input_encoder):
inputs = keras.Input(shape=input_encoder, name='input_layer')
x = layers.Conv2D(32, kernel_size=3, strides= 1, padding='same', name='conv_1')(inputs)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.LeakyReLU(name='lrelu_1')(x)
x = layers.Conv2D(64, kernel_size=3, strides= 2, padding='same', name='conv_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.LeakyReLU(name='lrelu_2')(x)
x = layers.Conv2D(64, 3, 2, padding='same', name='conv_3')(x)
x = layers.BatchNormalization(name='bn_3')(x)
x = layers.LeakyReLU(name='lrelu_3')(x)
x = layers.Conv2D(64, 3, 1, padding='same', name='conv_4')(x)
x = layers.BatchNormalization(name='bn_4')(x)
x = layers.LeakyReLU(name='lrelu_4')(x)
flatten = layers.Flatten()(x)
mean = layers.Dense(2, name='mean')(flatten)
log_var = layers.Dense(2, name='log_var')(flatten)
model = tf.keras.Model(inputs, (mean, log_var), name="Encoder")
return model
enc = encoder(input_encoder)
enc.output
(<KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'mean')>, <KerasTensor: shape=(None, 2) dtype=float32 (created by layer 'log_var')>)
# enc.save('vae-fashion-enc.h5')
input_1 = (2,)
input_2 = (2,)
def sampling(input_1,input_2):
#input1 = layers.Lambda(sampling_model, name='encoder_output')([mean, var])
mean = keras.Input(shape=input_1, name='input_layer1')
log_var = keras.Input(shape=input_2, name='input_layer2')
out = layers.Lambda(sampling_model, name='encoder_output')([mean, log_var])
enc_2 = tf.keras.Model([mean,log_var], out, name="Encoder_2")
return enc_2
final = sampling(input_1,input_2)
Tensor("encoder_output/random_normal:0", shape=(None, 2), dtype=float32)
# final.save('sampling.h5')
def decoder(input_decoder):
inputs = keras.Input(shape=input_decoder, name='input_layer')
x = layers.Dense(3136, name='dense_1')(inputs)
x = layers.Reshape((7, 7, 64), name='Reshape_Layer')(x)
# Block-1
x = layers.Conv2DTranspose(64, 3, strides= 1, padding='same',name='conv_transpose_1')(x)
x = layers.BatchNormalization(name='bn_1')(x)
x = layers.LeakyReLU(name='lrelu_1')(x)
# Block-2
x = layers.Conv2DTranspose(64, 3, strides= 2, padding='same', name='conv_transpose_2')(x)
x = layers.BatchNormalization(name='bn_2')(x)
x = layers.LeakyReLU(name='lrelu_2')(x)
# Block-3
x = layers.Conv2DTranspose(32, 3, 2, padding='same', name='conv_transpose_3')(x)
x = layers.BatchNormalization(name='bn_3')(x)
x = layers.LeakyReLU(name='lrelu_3')(x)
# Block-4
outputs = layers.Conv2DTranspose(1, 3, 1,padding='same', activation='sigmoid', name='conv_transpose_4')(x)
model = tf.keras.Model(inputs, outputs, name="Decoder")
return model
dec = decoder(input_decoder)
# dec.save('vae-fashion-dec.h5')
optimizer = tf.keras.optimizers.Adam(lr = 0.0005)
def mse_loss(y_true, y_pred):
r_loss = K.mean(K.square(y_true - y_pred), axis = [1,2,3])
return 1000 * r_loss
def kl_loss(mean, log_var):
kl_loss = -0.5 * K.sum(1 + var - K.square(mean) - K.exp(log_var), axis = 1)
return kl_loss
def vae_loss(y_true, y_pred, mean, log_var):
r_loss = mse_loss(y_true, y_pred)
kl_loss = kl_loss(mean, log_var)
return r_loss + kl_loss
# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images):
with tf.GradientTape() as encoder, tf.GradientTape() as decoder:
mean, log_var = enc(images, training=True)
latent = final([mean, log_var])
generated_images = dec(latent, training=True)
loss = vae_loss(images, generated_images, mean, log_var)
gradients_of_enc = encoder.gradient(loss, enc.trainable_variables)
gradients_of_dec = decoder.gradient(loss, dec.trainable_variables)
optimizer.apply_gradients(zip(gradients_of_enc, enc.trainable_variables))
optimizer.apply_gradients(zip(gradients_of_dec, dec.trainable_variables))
return loss
os.makedirs('tf_vae/fashion/training_weights', exist_ok=True)
os.makedirs('tf_vae/fashion/images', exist_ok=True)
def train(dataset, epochs):
for epoch in range(epochs):
start = time.time()
i = 0
loss_ = []
for image_batch in dataset:
i += 1
loss = train_step(image_batch)
#loss_.append(loss)
#print("Loss",np.mean(loss_))
seed = image_batch[:25]
display.clear_output(wait=True)
generate_and_save_images([enc,final,dec],
epoch + 1,
seed)
# Save the model every 15 epochs
#if (epoch + 1) % 15 == 0:
#checkpoint.save(file_prefix = checkpoint_prefix)
enc.save_weights('tf_vae/fashion/training_weights/enc_'+ str(epoch)+'.h5')
dec.save_weights('tf_vae/fashion/training_weights/dec_'+ str(epoch)+'.h5')
print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))
# Generate after the final epoch
display.clear_output(wait=True)
generate_and_save_images([enc,final,dec],
epochs,
seed)
def generate_and_save_images(model, epoch, test_input):
# Notice `training` is set to False.
# This is so all layers run in inference mode (batchnorm).
m, v = enc(test_input, training=False)
latent = final([m, v])
predictions = dec(latent, training=False)
print(predictions.shape)
fig = plt.figure(figsize=(4,4))
for i in range(predictions.shape[0]):
plt.subplot(5, 5, i+1)
plt.imshow(predictions[i, :, :, 0] * 255, cmap='gray')
plt.axis('off')
plt.savefig('tf_vae/fashion/images/image_at_epoch_{:d}.png'.format(epoch))
plt.show()
train(normalized_ds, 100)
(25, 28, 28, 1)
enc.load_weights('tf_vae/fashion/training_weights/enc_99.h5')
dec.load_weights('tf_vae/fashion/training_weights/dec_99.h5')
n_to_show = 5000
figsize = 12
example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]
m, v = enc.predict(example_images)
embeddings = final([m,v])
plt.figure(figsize=(figsize, figsize))
plt.scatter(embeddings[:, 0] , embeddings[:, 1], alpha=0.5, s=2)
plt.xlabel("Dimension-1", size=20)
plt.ylabel("Dimension-2", size=20)
plt.xticks(size=20)
plt.yticks(size=20)
plt.title("Projection of 2D Latent-Space (Fashion-MNIST)", size=20)
plt.show()
# Create dictionary of target classes
label_dict = {
0: 'T-shirt/top',
1: 'Trouser',
2: 'Pullover',
3: 'Dress',
4: 'Coat',
5: 'Sandal',
6: 'Shirt',
7: 'Sneaker',
8: 'Bag',
9: 'Ankle boot',
}
figsize = 15
x = np.random.normal(size = (10,2))
#x = np.random.uniform(size = (10,200))
reconstruct = dec.predict(x)
fig = plt.figure(figsize=(figsize, 10))
for i in range(10):
ax = fig.add_subplot(5, 5, i+1)
ax.axis('off')
ax.imshow(reconstruct[i, :,:,0]*255, cmap = 'gray')
figsize = 15
m, v = enc.predict(x_test[:25])
latent = final([m,v])
reconst = dec.predict(latent)
fig = plt.figure(figsize=(figsize, 10))
for i in range(25):
ax = fig.add_subplot(5, 5, i+1)
ax.axis('off')
ax.text(0.5, -0.15, str(label_dict[y_test[i]]), fontsize=10, ha='center', transform=ax.transAxes)
ax.imshow(reconst[i, :,:,0]*255, cmap = 'gray')
figsize = 15
min_x = min(embeddings[:, 0])
max_x = max(embeddings[:, 0])
min_y = min(embeddings[:, 1])
max_y = max(embeddings[:, 1])
x = np.random.uniform(min_x,max_x, size = (10,1))
y = np.random.uniform(min_y,max_y, size = (10,1))
z_grid = np.concatenate((x,y), axis=1)
reconst = dec.predict(z_grid)
fig = plt.figure(figsize=(figsize, 10))
for i in range(10):
ax = fig.add_subplot(5, 5, i+1)
ax.axis('off')
ax.text(0.5, -0.15, str(np.round(z_grid[i],1)), fontsize=10, ha='center', transform=ax.transAxes)
ax.imshow(reconst[i, :,:,0]*255, cmap = 'gray')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。