본문 바로가기
딥러닝 with Python

[딥러닝 with Python] 조건부 GAN / CGAN / Conditional GAN

by CodeCrafter 2024. 6. 4.
반응형

[본 포스팅은 "만들면서 배우는 생성 AI 2탄" 을 참조했습니다]

 

이번에 알아볼 내용은 조건부 GAN (CGAN / Conditional GAN) 입니다.

 

 

 기존에 알아본 GAN 모형 (DC GAN, WGAN-GP)들은 훈련 세트와 유사한 사실적인 이미지를 재현해낼 수 있었지만, 생성하려는 이미지의 유형(남성 얼굴 또는 여성 얼굴, 벽돌의 크기 등)을 제어할 수는 없었습니다. 

 

 잠재 공간(Latent Space)에서 랜덤한 한 포인트를 선정해서 샘플링할 수는 있지만, 그렇다고 어떤 잠재 변수를 선택하면 어떤 특징을 변형할 수 있는 지를 알수는 없었습니다.

 

 이 문제를 해결한 것이 CGAN이 되겠습니다. 

 

CGAN의 구조는 아래와 같습니다.

 

 - 일반적인 GAN과 CGAN의 차이점은 레이블과 관련된 추가 저보를 생성자와 비평자 모두에게 추가적으로 전달한다는 것입니다. 생성자에서는 이 정보를 원핫 인코딩(One-Hot Encoding)된 벡터로 잠재 공간 샘플에 단순히 추가하고, 비평자에서는 레이블 정보를 RGB 이미지에 추가 채널로 추가합니다. 이를 정리해보면

 

  a) CGAN의 생성자

   * 입력 : 잠재 공간의 샘플(랜덤 노이즈) + 조건 정보(이미지의 클래스 레이블 등)

   * 이때 조건 정보는 원핫 인코딩된 벡터로 제공됩니다. 원핫 인코딩 된 벡터가 가지고 있는 클래스의 수 만큼 채널의 수가 늘어나게 됩니다.

   ex) 잠재공간 샘플의 차원 : 100, 레이블 개수 : 3  => 최종 입력 채널 수 : 103(=100+3)

  b) CGAN의 비평자

   * 입력 데이터(진짜 또는 가짜) + 조건 정보

   * 이때 조건 정보는 이미지에 추가 채널로 제공됩니다. 즉, 일반적인 RGB 이미지가 입력데이터라면, 여기에 조건으로 이루어진 n개의 채널(참 : 1, 거짓 : 0으로 이루어진 경우 n = 2)을 추가해 3 + n개의 채널로 만드는 겁니다. 추가되는 채널의 개수는 이미지 레이블의 클래스의 개수에 따라  정해집니다.

 ex) 입력 채널의 차원 : 3(RGB) , 레이블 개수 : 3 => 최종 입력 채널 수 6(=3+3)

 

이치럼 CGAN은 비평자가 이미지의 콘텐츠에 부과한 추가 정보를 참고할 수 있기 때문에 생성자는 비평자를 계속 속이려고 제공된 레이블과 출력이 일치하는지 확인해야 합니다.

 

 만약 생성자가 이미지 레이블과 일치하지 않는 완벽한 이미지를 생성했다면, 비평자는 간단하게 이미지와 레이블이 불일치하기 때문에 가짜임을 알아차릴 수 있는 것입니다.

 

이제 이 CGAN을 파이썬을 활용해서 구현해보겠습니다.

 

 

 

먼저 util 파일을 받아줍니다.

import sys

# 코랩의 경우 깃허브 저장소로부터 utils.py를 다운로드 합니다.
if 'google.colab' in sys.modules:
    !wget https://raw.githubusercontent.com/rickiepark/Generative_Deep_Learning_2nd_Edition/main/notebooks/utils.py
    !mkdir -p notebooks
    !mv utils.py notebooks

 

다음은 사용할 라이브러리를 로드해줍니다.

import numpy as np
import pandas as pd

import tensorflow as tf
from tensorflow.keras import (
    layers,
    models,
    callbacks,
    utils,
    metrics,
    optimizers,
)

from notebooks.utils import display, sample_batch

 

학습간 사용할 하이퍼 파라미터들을 정의해줍니다.

IMAGE_SIZE = 64
CHANNELS = 3
CLASSES = 2
BATCH_SIZE = 128
Z_DIM = 32
LEARNING_RATE = 0.00005
ADAM_BETA_1 = 0.5
ADAM_BETA_2 = 0.999
EPOCHS = 20
CRITIC_STEPS = 3
GP_WEIGHT = 10.0
LOAD_MODEL = False
ADAM_BETA_1 = 0.5
ADAM_BETA_2 = 0.9
LABEL = "Blond_Hair"

 

 

학습에 사용할 데이터를 준비해줍니다. 사용할 데이터는 CelebA 입니다.

# 코랩일 경우 노트북에서 celeba 데이터셋을 받습니다.
if 'google.colab' in sys.modules:
    # # 캐글-->Setttings-->API-->Create New Token에서
    # # kaggle.json 파일을 만들어 코랩에 업로드하세요.
    # from google.colab import files
    # files.upload()
    # !mkdir ~/.kaggle
    # !cp kaggle.json ~/.kaggle/
    # !chmod 600 ~/.kaggle/kaggle.json
    # # celeba 데이터셋을 다운로드하고 압축을 해제합니다.
    # !kaggle datasets download -d jessicali9530/celeba-dataset
    # 캐글에서 다운로드가 안 될 경우 역자의 드라이브에서 다운로드할 수 있습니다.
    import gdown
    gdown.download(id='15gJhiDBkltMQz3T97xG-fO4gXTKAWkSB')
    !unzip -q celeba-dataset.zip
    # output 디렉토리를 만듭니다.
    !mkdir output

 

 

해당 이미지가 가지고 있는 레이블의 종류를 확인해봅니다.

# 레이블 데이터셋 로드
attributes = pd.read_csv("./list_attr_celeba.csv")
print(attributes.columns)
attributes.head()

 

각 attributes는 1 또는 -1로 이진화 되어 있습니다. 

 

데이터를 활용가능하게 로드해줍니다.

# 데이터 로드
labels = attributes[LABEL].tolist()
int_labels = [x if x == 1 else 0 for x in labels]
train_data = utils.image_dataset_from_directory(
    "./img_align_celeba",
    labels=int_labels,
    color_mode="rgb",
    image_size=(IMAGE_SIZE, IMAGE_SIZE),
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=42,
    interpolation="bilinear",
)

 

 

데이터를 전처리하고, 훈련 세트의 몇개의 샘플을 출력해봅니다.

# 데이터 전처리
def preprocess(img):
    """
    이미지 정규화
    """
    img = (tf.cast(img, "float32") - 127.5) / 127.5
    return img


train = train_data.map(
    lambda x, y: (preprocess(x), tf.one_hot(y, depth=CLASSES))
)
# 훈련 세트에 있는 몇 개의 샘플 출력하기
train_sample = sample_batch(train)
display(train_sample, cmap=None)

 

 

이제 CGAN 모델을 구축해봅니다.

 

먼저 비평자(Crictic)입니다. WGAN-GP에서의 비평자 개념을 활용하여 더 안정적이고 잘 학습할 수 있도록 했습니다.

앞서 CGAN의 특징에 대해서 말씀드렸듯, 기존에 설명드린 GAN 모델과 비교했을때 label_input이 채널 단위로 추가된것을 확인할 수 있습니다. 

critic_input = layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, CHANNELS))
label_input = layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE, CLASSES))
x = layers.Concatenate(axis=-1)([critic_input, label_input])
x = layers.Conv2D(64, kernel_size=4, strides=2, padding="same")(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Conv2D(128, kernel_size=4, strides=2, padding="same")(x)
x = layers.LeakyReLU()(x)
x = layers.Dropout(0.3)(x)
x = layers.Conv2D(128, kernel_size=4, strides=2, padding="same")(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Dropout(0.3)(x)
x = layers.Conv2D(128, kernel_size=4, strides=2, padding="same")(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Dropout(0.3)(x)
x = layers.Conv2D(1, kernel_size=4, strides=1, padding="valid")(x)
critic_output = layers.Flatten()(x)

critic = models.Model([critic_input, label_input], critic_output)
critic.summary()

 

 

다음은 생성자 입니다.

생성자에도 label_input이 들어가며, 원핫 인코딩 된 레이블이 채널 단위로 추가됩니다.

generator_input = layers.Input(shape=(Z_DIM,))
label_input = layers.Input(shape=(CLASSES,))
x = layers.Concatenate(axis=-1)([generator_input, label_input])
x = layers.Reshape((1, 1, Z_DIM + CLASSES))(x)
x = layers.Conv2DTranspose(
    128, kernel_size=4, strides=1, padding="valid", use_bias=False
)(x)
x = layers.BatchNormalization(momentum=0.9)(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Conv2DTranspose(
    128, kernel_size=4, strides=2, padding="same", use_bias=False
)(x)
x = layers.BatchNormalization(momentum=0.9)(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Conv2DTranspose(
    128, kernel_size=4, strides=2, padding="same", use_bias=False
)(x)
x = layers.BatchNormalization(momentum=0.9)(x)
x = layers.LeakyReLU(0.2)(x)
x = layers.Conv2DTranspose(
    64, kernel_size=4, strides=2, padding="same", use_bias=False
)(x)
x = layers.BatchNormalization(momentum=0.9)(x)
x = layers.LeakyReLU(0.2)(x)
generator_output = layers.Conv2DTranspose(
    CHANNELS, kernel_size=4, strides=2, padding="same", activation="tanh"
)(x)
generator = models.Model([generator_input, label_input], generator_output)
generator.summary()

 

이제 위에서 정의한 비평자와 생성자를 연결하여 최종적인 CGAN 모델을 정의하겠습니다.

이번에도 Gradient Penalty를 추가했으며, 이를 호출할 때 전달할 원핫 인코딩된 레이블 채널이 설정되도록 하였습니다.

class ConditionalWGAN(models.Model):
    def __init__(self, critic, generator, latent_dim, critic_steps, gp_weight):
        super(ConditionalWGAN, self).__init__()
        self.critic = critic
        self.generator = generator
        self.latent_dim = latent_dim
        self.critic_steps = critic_steps
        self.gp_weight = gp_weight

    def compile(self, c_optimizer, g_optimizer):
        super(ConditionalWGAN, self).compile(run_eagerly=True)
        self.c_optimizer = c_optimizer
        self.g_optimizer = g_optimizer
        self.c_wass_loss_metric = metrics.Mean(name="c_wass_loss")
        self.c_gp_metric = metrics.Mean(name="c_gp")
        self.c_loss_metric = metrics.Mean(name="c_loss")
        self.g_loss_metric = metrics.Mean(name="g_loss")

    @property
    def metrics(self):
        return [
            self.c_loss_metric,
            self.c_wass_loss_metric,
            self.c_gp_metric,
            self.g_loss_metric,
        ]

    def gradient_penalty(
        self, batch_size, real_images, fake_images, image_one_hot_labels
    ):
        alpha = tf.random.normal([batch_size, 1, 1, 1], 0.0, 1.0)
        diff = fake_images - real_images
        interpolated = real_images + alpha * diff

        with tf.GradientTape() as gp_tape:
            gp_tape.watch(interpolated)
            pred = self.critic(
                [interpolated, image_one_hot_labels], training=True
            )

        grads = gp_tape.gradient(pred, [interpolated])[0]
        norm = tf.sqrt(tf.reduce_sum(tf.square(grads), axis=[1, 2, 3]))
        gp = tf.reduce_mean((norm - 1.0) ** 2)
        return gp

    def train_step(self, data):
        real_images, one_hot_labels = data

        image_one_hot_labels = one_hot_labels[:, None, None, :]
        image_one_hot_labels = tf.repeat(
            image_one_hot_labels, repeats=IMAGE_SIZE, axis=1
        )
        image_one_hot_labels = tf.repeat(
            image_one_hot_labels, repeats=IMAGE_SIZE, axis=2
        )

        batch_size = tf.shape(real_images)[0]

        for i in range(self.critic_steps):
            random_latent_vectors = tf.random.normal(
                shape=(batch_size, self.latent_dim)
            )

            with tf.GradientTape() as tape:
                fake_images = self.generator(
                    [random_latent_vectors, one_hot_labels], training=True
                )

                fake_predictions = self.critic(
                    [fake_images, image_one_hot_labels], training=True
                )
                real_predictions = self.critic(
                    [real_images, image_one_hot_labels], training=True
                )

                c_wass_loss = tf.reduce_mean(fake_predictions) - tf.reduce_mean(
                    real_predictions
                )
                c_gp = self.gradient_penalty(
                    batch_size, real_images, fake_images, image_one_hot_labels
                )
                c_loss = c_wass_loss + c_gp * self.gp_weight

            c_gradient = tape.gradient(c_loss, self.critic.trainable_variables)
            self.c_optimizer.apply_gradients(
                zip(c_gradient, self.critic.trainable_variables)
            )

        random_latent_vectors = tf.random.normal(
            shape=(batch_size, self.latent_dim)
        )

        with tf.GradientTape() as tape:
            fake_images = self.generator(
                [random_latent_vectors, one_hot_labels], training=True
            )
            fake_predictions = self.critic(
                [fake_images, image_one_hot_labels], training=True
            )
            g_loss = -tf.reduce_mean(fake_predictions)

        gen_gradient = tape.gradient(g_loss, self.generator.trainable_variables)
        self.g_optimizer.apply_gradients(
            zip(gen_gradient, self.generator.trainable_variables)
        )

        self.c_loss_metric.update_state(c_loss)
        self.c_wass_loss_metric.update_state(c_wass_loss)
        self.c_gp_metric.update_state(c_gp)
        self.g_loss_metric.update_state(g_loss)

        return {m.name: m.result() for m in self.metrics}

 

이제 CGAN 모델을 인스턴스화 해줍니다.

# GAN 만들기
cgan = ConditionalWGAN(
    critic=critic,
    generator=generator,
    latent_dim=Z_DIM,
    critic_steps=CRITIC_STEPS,
    gp_weight=GP_WEIGHT,
)

 

 

이제 GAN 모델을 학습시켜줍니다.

# GAN 컴파일
cgan.compile(
    c_optimizer=optimizers.Adam(
        learning_rate=LEARNING_RATE, beta_1=ADAM_BETA_1, beta_2=ADAM_BETA_2
    ),
    g_optimizer=optimizers.Adam(
        learning_rate=LEARNING_RATE, beta_1=ADAM_BETA_1, beta_2=ADAM_BETA_2
    ),
)

 

이번에 활용할 레이블은 금발 여부 ([1,0] : 금발 아님 / [0,1] 금발 맞음) 입니다.

# 모델 저장 체크포인트 만들기
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint/checkpoint.ckpt",
    save_weights_only=True,
    save_freq="epoch",
    verbose=0,
)

tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")


class ImageGenerator(callbacks.Callback):
    def __init__(self, num_img, latent_dim):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs=None):
        random_latent_vectors = tf.random.normal(
            shape=(self.num_img, self.latent_dim)
        )
        # 0 레이블
        zero_label = np.repeat([[1, 0]], self.num_img, axis=0)
        generated_images = self.model.generator(
            [random_latent_vectors, zero_label]
        )
        generated_images = generated_images * 127.5 + 127.5
        generated_images = generated_images.numpy()
        if epoch % 100 == 0: # 출력 횟수를 줄이기 위해
            display(
                generated_images,
                save_to="./output/generated_img_%03d_label_0.png" % (epoch),
                cmap=None,
            )

        # 1 레이블
        one_label = np.repeat([[0, 1]], self.num_img, axis=0)
        generated_images = self.model.generator(
            [random_latent_vectors, one_label]
        )
        generated_images = generated_images * 127.5 + 127.5
        generated_images = generated_images.numpy()
        if epoch % 100 == 0: # 출력 횟수를 줄이기 위해
            display(
                generated_images,
                save_to="./output/generated_img_%03d_label_1.png" % (epoch),
                cmap=None,
            )

 

history = cgan.fit(
    train,
    epochs=EPOCHS * 100,
    steps_per_epoch=1,
    callbacks=[
        model_checkpoint_callback,
        tensorboard_callback,
        ImageGenerator(num_img=10, latent_dim=Z_DIM),
    ],
)

 

학습이 거듭될수록 점차 실제와 유사한 이미지가 나오게 됨을 알 수 있습니다.

 

 

이제 모델을 저장합니다.

# 최종 모델 저장
generator.save("./models/generator")
critic.save("./models/critic")

 

 

이를 활용해서 새로운 이미지를 생성해봅니다.

# 0 레이블
z_sample = np.random.normal(size=(10, Z_DIM))
class_label = np.repeat([[1, 0]], 10, axis=0)
imgs = cgan.generator.predict([z_sample, class_label])
display(imgs, cmap=None)

# 1 레이블
z_sample = np.random.normal(size=(10, Z_DIM))
class_label = np.repeat([[0, 1]], 10, axis=0)
imgs = cgan.generator.predict([z_sample, class_label])
display(imgs, cmap=None)

반응형

댓글