In [None]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns
import json
import math
import random
import numpy as np
import scipy as sp
import scipy.stats as st
import scipy.integrate as integrate
from scipy.stats import multivariate_normal
from sklearn import linear_model
from sklearn.utils.testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
import statsmodels.api as sm
from matplotlib.colors import LogNorm
import pickle

from joblib import Parallel, delayed
import multiprocessing
from collections import namedtuple
from itertools import count

import cProfile
from datetime import datetime

sns.set_style("whitegrid")
sns.set_palette("colorblind")
palette = sns.color_palette()
figsize = (15,8)
legend_fontsize = 16

from matplotlib import rc
rc('font',**{'family':'sans-serif'})
rc('text', usetex=True)
rc('text.latex',preamble=r'\usepackage[utf8]{inputenc}')
rc('text.latex',preamble=r'\usepackage[russian]{babel}')
rc('figure', **{'dpi': 300})

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.autograd import Variable
from torchvision.utils import save_image

import torchvision.datasets as datasets
from torchvision.utils import make_grid

## GAN: сначала наивно

In [None]:
## загружаем MNIST
image_size = 28
image_shape = (1, image_size, image_size)

cuda = True if torch.cuda.is_available() else False
Tensor = torch.cuda.FloatTensor if cuda else torch.FloatTensor

batch_size = 64
dataloader = torch.utils.data.DataLoader(
    datasets.MNIST(
        "data/mnist",
        train=True,
        download=True,
        transform=transforms.Compose(
            [transforms.Resize(image_size), transforms.ToTensor(), transforms.Normalize([0.5], [0.5])]
        ),
    ),
    batch_size=batch_size,
    shuffle=True,
)

In [None]:
class Generator(nn.Module):
    def __init__(self, noise_dim=100):
        super(Generator, self).__init__()

        def block(in_feat, out_feat, normalize=True):
            layers = [nn.Linear(in_feat, out_feat)]
            if normalize:
                layers.append(nn.BatchNorm1d(out_feat, 0.8))
            layers.append(nn.LeakyReLU(0.2, inplace=True))
            return layers

        self.model = nn.Sequential(
            *block(noise_dim, 128, normalize=False),
            *block(128, 256),
            *block(256, 512),
            *block(512, 1024),
            nn.Linear(1024, int(np.prod(image_shape))),
            nn.Tanh()
        )

    def forward(self, z):
        img = self.model(z)
        img = img.view(img.size(0), *image_shape)
        return img

In [None]:
class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(int(np.prod(image_shape)), 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 1),
            nn.Sigmoid(),
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)

        return validity

In [None]:
noise_dim = 100

adversarial_loss = torch.nn.BCELoss()
generator = Generator(noise_dim=noise_dim)
discriminator = Discriminator()

if cuda:
    generator.cuda()
    discriminator.cuda()
    adversarial_loss.cuda()

In [None]:
## Оптимизаторы и их параметры
lr, beta1, beta2 = 0.0002, 0.5, 0.999
optimizer_G = torch.optim.Adam(generator.parameters(), lr=lr, betas=(beta1, beta2))
optimizer_D = torch.optim.Adam(discriminator.parameters(), lr=lr, betas=(beta1, beta2))

In [None]:
def train(dataloader, do_epoch=None, num_epochs=10, sample_dir="data/images/gan", generate_every=100):
    d_losses, g_losses = [], []
    for epoch in range(num_epochs):
        for i, (imgs, _) in enumerate(dataloader):
            gen_imgs, d_loss, g_loss = do_epoch(imgs, i)
            if d_loss is not None and g_loss is not None:
                d_losses.append(d_loss.item())
                g_losses.append(g_loss.item())
            batches_done = epoch * len(dataloader) + i
            if batches_done % generate_every == 0:
                print("\t...epoch %d/%d\tbatch %d/%d\tD loss: %.6f\tG loss: %.6f" % \
                      (epoch, num_epochs, i, len(dataloader), d_losses[-1], g_losses[-1]))
                save_image(gen_imgs.data[:25], "%s/%05d.png" % (sample_dir, batches_done), nrow=5, normalize=True)
    return d_losses, g_losses

In [None]:
def do_epoch_gan(imgs, i):
    # целевые переменные (0-1) для настоящих и фейковых картинок
    valid = Variable(Tensor(imgs.size(0), 1).fill_(1.0), requires_grad=False)
    fake = Variable(Tensor(imgs.size(0), 1).fill_(0.0), requires_grad=False)

    # вход
    real_imgs = Variable(imgs.type(Tensor))

    # обучаем генератор
    optimizer_G.zero_grad()

    # порождаем шум
    z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], noise_dim))))

    # порождаем фейковые картинки
    gen_imgs = generator(z)

    # а вот и adversarial loss для генератора
    g_loss = adversarial_loss(discriminator(gen_imgs), valid)

    g_loss.backward()
    optimizer_G.step()

    # А теперь обучаем дискриминатор
    optimizer_D.zero_grad()
    # Функции потерь на настоящих и фейковых картинках
    real_loss = adversarial_loss(discriminator(real_imgs), valid)
    fake_loss = adversarial_loss(discriminator(gen_imgs.detach()), fake)
    d_loss = (real_loss + fake_loss) / 2
    d_loss.backward()
    optimizer_D.step()
    return gen_imgs, d_loss, g_loss

In [None]:
d_losses, g_losses = train(dataloader, do_epoch_gan, num_epochs=20, sample_dir="data/images/test", generate_every=500)

In [None]:
def plot_losses(d_losses, g_losses):
    fig = plt.figure(figsize=(12, 8))
    ax = fig.add_subplot(111)

    n_epochs = len(d_losses) - 1
    x_train = np.linspace(0, n_epochs, len(d_losses))
    x_test = np.arange(n_epochs + 1)

    ax.plot(x_train, d_losses, label='Ошибка дискриминатора')
    ax.plot(x_test, g_losses, label='Ошибка генератора')
    ax.legend()
    plt.xlabel('Эпоха обучения')
    plt.ylabel('Ошибка')

In [None]:
plot_losses(d_losses, g_losses)

![Url](results_gan.gif "GAN")

## LSGAN

In [None]:
class LSGANSimpleDiscriminator(nn.Module):
    def __init__(self):
        super(LSGANSimpleDiscriminator, self).__init__()

        self.model = nn.Sequential(
            nn.Linear(int(np.prod(image_shape)), 512),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(512, 256),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Linear(256, 1), ## для least squares сигмоид не нужен
        )

    def forward(self, img):
        img_flat = img.view(img.size(0), -1)
        validity = self.model(img_flat)

        return validity

In [None]:
noise_dim = 100

## по сути это единственная разница, дальше можно то же самое сделать
ls_advloss = torch.nn.MSELoss()

ls_G = Generator(noise_dim=noise_dim)
ls_D = LSGANSimpleDiscriminator()

if cuda:
    ls_G.cuda()
    ls_D.cuda()
    ls_advloss.cuda()

## Оптимизаторы и их параметры
lr, beta1, beta2 = 0.0002, 0.5, 0.999
ls_opt_G = torch.optim.Adam(ls_G.parameters(), lr=lr, betas=(beta1, beta2))
ls_opt_D = torch.optim.Adam(ls_D.parameters(), lr=lr, betas=(beta1, beta2))

In [None]:
def do_epoch_lsgan(imgs, i):
    # целевые переменные (0-1) для настоящих и фейковых картинок
    valid = Variable(Tensor(imgs.shape[0], 1).fill_(1.0), requires_grad=False)
    fake = Variable(Tensor(imgs.shape[0], 1).fill_(0.0), requires_grad=False)

    # вход
    real_imgs = Variable(imgs.type(Tensor))

    # обучаем генератор
    ls_opt_G.zero_grad()

    # порождаем шум
    z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], noise_dim))))

    # порождаем фейковые картинки
    gen_imgs = ls_G(z)
    
    # а вот и adversarial loss для генератора
    g_loss = ls_advloss(ls_D(gen_imgs), valid)

    g_loss.backward()
    ls_opt_G.step()

    # А теперь обучаем дискриминатор
    ls_opt_D.zero_grad()
    # Функции потерь на настоящих и фейковых картинках
    real_loss = ls_advloss(ls_D(real_imgs), valid)
    fake_loss = ls_advloss(ls_D(gen_imgs.detach()), fake)
    d_loss = (real_loss + fake_loss) / 2
    d_loss.backward()
    ls_opt_D.step()
    return gen_imgs, d_loss, g_loss

In [None]:
d_losses, g_losses = train(dataloader, do_epoch_lsgan, num_epochs=50, sample_dir="data/images/test", generate_every=500)

In [None]:
plot_losses(d_losses, g_losses)

![LSGAN](results_lsgan_simple.gif "Simple LSGAN")

In [None]:
class LSGANGenerator(nn.Module):
    def __init__(self, noise_dim=100, num_channels=1):
        super(LSGANGenerator, self).__init__()

        self.init_size = image_size // 4
        self.l1 = nn.Sequential(nn.Linear(noise_dim, 128 * self.init_size ** 2))

        self.conv_blocks = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 128, 3, stride=1, padding=1),
            nn.BatchNorm2d(128, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Upsample(scale_factor=2),
            nn.Conv2d(128, 64, 3, stride=1, padding=1),
            nn.BatchNorm2d(64, 0.8),
            nn.LeakyReLU(0.2, inplace=True),
            nn.Conv2d(64, num_channels, 3, stride=1, padding=1),
            nn.Tanh(),
        )

    def forward(self, z):
        out = self.l1(z)
        out = out.view(out.shape[0], 128, self.init_size, self.init_size)
        img = self.conv_blocks(out)
        return img

In [None]:
class LSGANDiscriminator(nn.Module):
    def __init__(self, num_channels=1):
        super(LSGANDiscriminator, self).__init__()

        def discriminator_block(in_filters, out_filters, bn=True):
            block = [nn.Conv2d(in_filters, out_filters, 3, 2, 1), nn.LeakyReLU(0.2, inplace=True), nn.Dropout2d(0.25)]
            if bn:
                block.append(nn.BatchNorm2d(out_filters, 0.8))
            return block

        self.model = nn.Sequential(
            *discriminator_block(num_channels, 16, bn=False),
            *discriminator_block(16, 32),
            *discriminator_block(32, 64),
            *discriminator_block(64, 128),
        )

        # Ширина и высота полученной карты признаков
        ds_size = image_size
        for _ in range(4): # четыре слоя
            ds_size = math.ceil(ds_size / 2.)
        self.adv_layer = nn.Linear(128 * (ds_size ** 2), 1)

    def forward(self, img):
        out = self.model(img)
        out = out.view(out.shape[0], -1)
        validity = self.adv_layer(out)

        return validity

In [None]:
ls_advloss = torch.nn.MSELoss()
ls_G = LSGANGenerator()
ls_D = LSGANDiscriminator()

if cuda:
    ls_G.cuda()
    ls_D.cuda()
    ls_advloss.cuda()

In [None]:
## Оптимизаторы и их параметры
lr, beta1, beta2 = 0.0002, 0.5, 0.999
ls_opt_G = torch.optim.Adam(ls_G.parameters(), lr=lr, betas=(beta1, beta2))
ls_opt_D = torch.optim.Adam(ls_D.parameters(), lr=lr, betas=(beta1, beta2))

In [None]:
d_losses, g_losses = train(dataloader, do_epoch_lsgan, num_epochs=50, sample_dir="data/images/test", generate_every=500)

In [None]:
plot_losses(d_losses, g_losses)

![LSGAN](results_lsgan.gif "Convolutional LSGAN")

## Wassershtein GAN

In [None]:
noise_dim = 100

w_G = Generator(noise_dim=noise_dim)
## для Wassershtein GAN тоже не нужен сигмоид
w_D = LSGANSimpleDiscriminator()

if cuda:
    w_G.cuda()
    w_D.cuda()

## Оптимизаторы и их параметры
lr, beta1, beta2 = 0.0002, 0.5, 0.999
w_opt_G = torch.optim.Adam(w_G.parameters(), lr=lr, betas=(beta1, beta2))
# w_opt_D = torch.optim.Adam(w_D.parameters(), lr=lr, betas=(beta1, beta2))
w_opt_D = torch.optim.RMSprop(w_D.parameters(), lr=0.00005)

In [None]:
def do_epoch_wgan(imgs, i, n_critic=5, d_clip=0.01):
    real_imgs = Variable(imgs.type(Tensor))

    w_opt_D.zero_grad()

    # Sample noise as generator input
    z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], noise_dim))))

    # Generate a batch of images
    fake_imgs = w_G(z).detach()
    
    # Adversarial loss для Wassershtein GAN
    loss_D = -torch.mean(w_D(real_imgs)) + torch.mean(w_D(fake_imgs))
    loss_D.backward()
    w_opt_D.step()

    # Clip weights of discriminator
    for p in discriminator.parameters():
        p.data.clamp_(-d_clip, d_clip)

    loss_G = None
    # Train the generator every n_critic iterations
    if i % n_critic == 0:
        w_opt_G.zero_grad()

        # Generate a batch of images
        gen_imgs = w_G(z)
        # Adversarial loss
        loss_G = -torch.mean(w_D(gen_imgs))
        loss_G.backward()
        w_opt_G.step()
    
    return fake_imgs, loss_D, loss_G

In [None]:
d_losses, g_losses = train(dataloader, do_epoch_wgan, num_epochs=50, sample_dir="data/images/test", generate_every=500)

![WGAN](results_wgan.gif "WGAN")

In [None]:
def compute_gradient_penalty(D, real_samples, fake_samples):
    # Случайные интерполяции между настоящими и фейковыми примерами
    alpha = Tensor(np.random.random((real_samples.size(0), 1, 1, 1)))
    interpolates = (alpha * real_samples + ((1 - alpha) * fake_samples)).requires_grad_(True)
    d_interpolates = D(interpolates)
    fake = Variable(Tensor(real_samples.shape[0], 1).fill_(1.0), requires_grad=False)
    # Get gradient w.r.t. interpolates
    gradients = autograd.grad(
        outputs=d_interpolates,
        inputs=interpolates,
        grad_outputs=fake,
        create_graph=True,
        retain_graph=True,
        only_inputs=True,
    )[0]
    gradients = gradients.view(gradients.size(0), -1)
    gradient_penalty = ((gradients.norm(2, dim=1) - 1) ** 2).mean()
    return gradient_penalty

In [None]:
noise_dim = 100

w_G = Generator(noise_dim=noise_dim)
## для Wassershtein GAN тоже не нужен сигмоид
w_D = LSGANSimpleDiscriminator()

if cuda:
    w_G.cuda()
    w_D.cuda()

## Оптимизаторы и их параметры
lr, beta1, beta2 = 0.0002, 0.5, 0.999
w_opt_G = torch.optim.Adam(w_G.parameters(), lr=lr, betas=(beta1, beta2))
# w_opt_D = torch.optim.Adam(w_D.parameters(), lr=lr, betas=(beta1, beta2))
w_opt_D = torch.optim.RMSprop(w_D.parameters(), lr=0.00005)

In [None]:
def do_epoch_wgan_gp(imgs, i, n_critic=5, d_clip=0.01, lambda_gp=10):
    real_imgs = Variable(imgs.type(Tensor))

    w_opt_D.zero_grad()

    # Sample noise as generator input
    z = Variable(Tensor(np.random.normal(0, 1, (imgs.shape[0], noise_dim))))

    # Generate a batch of images
    fake_imgs = w_G(z).detach()
    
    real_validity = w_D(real_imgs)
    fake_validity = w_D(fake_imgs)
    
    # вычисляем GP
    gradient_penalty = compute_gradient_penalty(w_D, real_imgs.data, fake_imgs.data)
    
    # Adversarial loss для Wassershtein GAN
    d_loss = -torch.mean(real_validity) + torch.mean(fake_validity) + lambda_gp * gradient_penalty
    d_loss.backward()
    w_opt_D.step()

    loss_G = None
    # Train the generator every n_critic iterations
    if i % n_critic == 0:
        w_opt_G.zero_grad()

        # Generate a batch of images
        gen_imgs = w_G(z)
        # Adversarial loss
        g_loss = -torch.mean(w_D(gen_imgs))
        g_loss.backward()
        w_opt_G.step()
    
    return fake_imgs, d_loss, g_loss

In [None]:
d_losses, g_losses = train(dataloader, do_epoch_wgan, num_epochs=50, sample_dir="data/images/test", generate_every=500)

![WGAN with gradient penalty](results_wgan_gp.gif "WGAN with gradient penalty")