본문 바로가기
딥러닝 with Python

[생성 AI] 변이형 오토인코더(Variational Auto Encoder) (2/2)

by CodeCrafter 2024. 5. 29.
반응형

[해당 포스팅은 "만들면서 배우는 생성 AI 2탄"을 참조했습니다]

 

지난번에 알아본 개념을 바탕으로 이번에는 코드를 통해 실습을 해보겠습니다.

 

이번에 실습할 데이터는 패션 MNIST 데이터입니다.

 

 

 

Fashion MNIST 데이터셋은 운동화, 셔츠, 샌들과 같은 작은 이미지들의 모음을 말하며, 기본 MNIST 데이터셋과 같이 10개의 범주(Category)로 구성되어 있는 데이터셋입니다. 

 

데이터의 개수는 총 70,000개 이며, 그 예시는 아래와 같습니다.

 

 

 

이제 코랩 노트북을 활용해 Fashion MNIST 데이터를 활용한 Variational Auto Encoder를 구현해보겠습니다.

 

 

먼저, 깃허브에 저장된 utils 파일을 다운로드 하기 위해 다음과 같이 실행해줍니다.

 

깃허브 주소 : https://github.com/rickiepark/Generative_Deep_Learning_2nd_Edition (만들면서 배우는 생성 AI 2탄)

import sys

# 코랩의 경우 깃허브 저장소로부터 utils.py를 다운로드 합니다.
if 'google.colab' in sys.modules:
    !wget https://raw.githubusercontent.com/rickiepark/Generative_Deep_Learning_2nd_Edition/main/notebooks/utils.py
    !mkdir -p notebooks
    !mv utils.py notebooks

 

이제 필요한 라이브러리를 불러와줍니다. 이번에도 사용할 딥러닝 툴은 텐서플로우가 되겠습니다.

import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow.keras.backend as K
from tensorflow.keras import (
    layers,
    models,
    datasets,
    callbacks,
    losses,
    optimizers,
    metrics,
)

from scipy.stats import norm

from notebooks.utils import display

 

 

학습 전에 사용할 하이퍼 파라미터에 대해서 다음과 같이 정의해줍니다.

IMAGE_SIZE = 32
BATCH_SIZE = 100
VALIDATION_SPLIT = 0.2
EMBEDDING_DIM = 2
EPOCHS = 5
BETA = 500

 

 

 

이제 학습에 사용할 Fashion MNIST 데이터를 다운로드 해줍니다.

# 데이터 로드
(x_train, y_train), (x_test, y_test) = datasets.fashion_mnist.load_data()

 

다음은 학습을 용이하게 위해 데이터를 전처리 해줍니다.

# 데이터 전처리
def preprocess(imgs):
    """
    이미지 정규화 및 크기 변경
    """
    imgs = imgs.astype("float32") / 255.0
    imgs = np.pad(imgs, ((0, 0), (2, 2), (2, 2)), constant_values=0.0)
    imgs = np.expand_dims(imgs, -1)
    return imgs


x_train = preprocess(x_train)
x_test = preprocess(x_test)

 

train 데이터의 일부를 시각화해주면 아래와 같습니다.

# 훈련 세트의 일부 의류 항목 표시
display(x_train)

 

 

이제 변이형 오토 인코더 모델을 코딩해보겠습니다.

 

먼저, 변이형 오토인코더가 이미지를 잠재 공간 상 포인트 주변의 다변량 정규 분포(Multivariate Normal Distribution)에 매핑하기 위해 사용할 z_mean과 z_sigma를 구하고 최종적으로

 

z ( = z_mean + z_sigma*epsilon)를 구하는 함수를 정의해보겠습니다.

* 이때, z_sigma = exp(z_log_var * 0.5) 이며, epsilon은 N(0,1) 분포를 따릅니다. 

*  z _sigma(σ)와 z_log_var(log( σ ^2))의 관계는,  σ = exp(log( σ)) = exp(2log(σ)/2) = exp(log( σ^2)/2) 입니다.

 

class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = K.random_normal(shape=(batch, dim))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

 

 

이제 인코더를 정의해보겠습니다.

# 인코더
encoder_input = layers.Input(
    shape=(IMAGE_SIZE, IMAGE_SIZE, 1), name="encoder_input"
)
x = layers.Conv2D(32, (3, 3), strides=2, activation="relu", padding="same")(
    encoder_input
)
x = layers.Conv2D(64, (3, 3), strides=2, activation="relu", padding="same")(x)
x = layers.Conv2D(128, (3, 3), strides=2, activation="relu", padding="same")(x)
shape_before_flattening = K.int_shape(x)[1:]  # 디코더에 필요합니다!

x = layers.Flatten()(x)
z_mean = layers.Dense(EMBEDDING_DIM, name="z_mean")(x)
z_log_var = layers.Dense(EMBEDDING_DIM, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])

encoder = models.Model(encoder_input, [z_mean, z_log_var, z], name="encoder")
encoder.summary()

 

정의된 인코더의 구조는 아래와 같이  3개의 2d convolution kernel을 거쳐서 최종 2048개(= 4x4x128)의 노드로 flatten 되고, 이후 앞서 정의한 z_mean과 z_log_var로 구성된 2개의 노드를 구성하겠습니다.

 

이제 디코더를 정의해보겠습니다.

# 디코더
decoder_input = layers.Input(shape=(EMBEDDING_DIM,), name="decoder_input")
x = layers.Dense(np.prod(shape_before_flattening))(decoder_input)
x = layers.Reshape(shape_before_flattening)(x)
x = layers.Conv2DTranspose(
    128, (3, 3), strides=2, activation="relu", padding="same"
)(x)
x = layers.Conv2DTranspose(
    64, (3, 3), strides=2, activation="relu", padding="same"
)(x)
x = layers.Conv2DTranspose(
    32, (3, 3), strides=2, activation="relu", padding="same"
)(x)
decoder_output = layers.Conv2D(
    1,
    (3, 3),
    strides=1,
    activation="sigmoid",
    padding="same",
    name="decoder_output",
)(x)

decoder = models.Model(decoder_input, decoder_output)
decoder.summary()

 

정의된 디코더의 구조는 아래와 같으며 인코더의 구조를 반대로 만들었다고 보시면 되겠습니다.

크기를 줄이고 채널수를 늘렸던 conv2d를, 크기를 키우고 채널수를 줄이는 Transpose conv2d로 변형한 부분과 순서를 제외하고는 동일한 구조라고 보시면되겠습니다.

 

 

이제 최종적인 VAE 함수를 정의해보겠습니다.

class VAE(models.Model):
    def __init__(self, encoder, decoder, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = metrics.Mean(
            name="reconstruction_loss"
        )
        self.kl_loss_tracker = metrics.Mean(name="kl_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
        ]

    def call(self, inputs):
        """특정 입력에서 모델을 호출합니다."""
        z_mean, z_log_var, z = encoder(inputs)
        reconstruction = decoder(z)
        return z_mean, z_log_var, reconstruction

    def train_step(self, data):
        """훈련 스텝을 실행합니다."""
        with tf.GradientTape() as tape:
            z_mean, z_log_var, reconstruction = self(data)
            reconstruction_loss = tf.reduce_mean(
                BETA
                * losses.binary_crossentropy(
                    data, reconstruction, axis=(1, 2, 3)
                )
            )
            kl_loss = tf.reduce_mean(
                tf.reduce_sum(
                    -0.5
                    * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                    axis=1,
                )
            )
            total_loss = reconstruction_loss + kl_loss

        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))

        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.kl_loss_tracker.update_state(kl_loss)

        return {m.name: m.result() for m in self.metrics}

    def test_step(self, data):
        """Step run during validation."""
        if isinstance(data, tuple):
            data = data[0]

        z_mean, z_log_var, reconstruction = self(data)
        reconstruction_loss = tf.reduce_mean(
            BETA
            * losses.binary_crossentropy(data, reconstruction, axis=(1, 2, 3))
        )
        kl_loss = tf.reduce_mean(
            tf.reduce_sum(
                -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)),
                axis=1,
            )
        )
        total_loss = reconstruction_loss + kl_loss

        return {
            "loss": total_loss,
            "reconstruction_loss": reconstruction_loss,
            "kl_loss": kl_loss,
        }

    def get_config(self):
        return {}

1)  이때 재매개변수화 트릭(Reparameterization Trick)을 활용합니다.

 * 재매개변수화 트릭(Reparameterization Trick)이란, 파라미터 z_mean과 z_log_var로 정의된 정규본포에서 직접 샘플링하는 대신 표준 정규 분포를 따르는 epsilon을 샘플링한 다음 올바른 평균과 분산을 갖도록 샘플을 수동으로 조정하는 방법입니다. 

 * 이를 통해 그레이디언트를 역전파 시킬 수 있기에 중요하며, 층의 모든 무작위성을 변수 epsilon에 포함함으로써 층 입력에 대한 출력의 편도함수를 결정론적(즉, 랜덤한 epsilon과 무관함)으로 표시할 수 있습니다. 이는 이 층에서 역전파를 위해 필수적입니다.

 

2) 또한, 위 코드에서는 손실함수를 정의하고 있는데요.

 

- 먼저 재구성 손실(Reconstruction Loss)입니다. 

 * 이는 Auto Encoder에서 활용한 것으로, Binary CorssEntropy를 활용해 각 픽셀 단위로 원본과 재구서된 데이터 간의 차이를 측정하며, 이를 평균하여 최종 손실값을 얻는 논리를 가지고 있습니다.

 * 앞에 곱해진 Beta는 학습 가능한 파라미터로, 재구성 손실과 아래에서 설명한 KL Divergnece 손실간의 균형을 위함입니다. 재구성 손실이 너무 크면 모델이 원본 재현에만 가까워지게 학습할 것이고, 반대로 재구성 손실이 너무 작아지면 원본과는 너무 다른 데이터가 나오게 되기 때문입니다. 

 

- 다음은 Variational Auto Encoder에서 활용하는 KL Divergence Loss 입니다.

 * KL Divergence 는 Kullback - Leibler Divergence(쿨백-라이블러 발산)이라고 하며, 한 확률분포가 다른 분포와 얼마나 다른지를 측정하는 도구를 말합니다.

 *VAE에서 평균이 z_mean이고 분산이 z_log_var인 정규 분포가 표준 정규분포와 얼마나 다른지를 측정해야하 하기에 이를 활용하는데요. 그 식은 아래와 같습니다.

 * kl_loss = -0.5*sum(1+z_log_var - z_mean^2 - exp(z_log_var))

 * 즉, 정규분포 q(zㅣx)와 p(z) 사이의 KL 발산을 계산한 것으로, 여기서 q(zㅣx)는 평균이 z_mean 이고 분산이 exp(z_log_var)인 정규 분포를 / p(z)는 표준 정규 분포인 N(0,1)을 의미합니다.

* 위 KL Divergence 에   q(zㅣx)  (= N(z_mean, exp(z_log_var)) 과 p(z) (= N (0,1) ) 을 넣어서 계산하면 아래와 같습니다.

 (아래 식은 잠재 공간의 모든 차원에 대해서 수행 되며, z_mean = 0, z_log_var = 0일때 kl_loss가 최소가 됩니다.)

* 위 식의 앞 -0.5는 KL Divergence를 최소화하는 방향으로 손실을 조정하는 것을 의미합니다. 

* 이 KL Divergence는 샘플을 표준 정규분포(z_mean = 0, z_log_var=0)에서 크게 벗어난 z_mean과 z_log_var 변수로 인코딩하는 네트워크에 Penalty를 가하는 것입니다. 즉, 정규분포에 가까운 변수로 인코딩 되도록 강제하는 것입니다.

 

*이와 같은 KL Divergence 항을 추가하면 학습에 도움이 되는 이유는 아래와 같습니다.

1) 잠재공간에서 포인트를 선택할 때 사용할 수 있는 잘 정의된 분포(표준 정규 분포)를 가지게 되기 때문입니다.

2) 이 항이 모든 인코딩 된 분포를 표준 정규 분포에 가깝게 되도록 강제합니다. 이에 따라 포인트 군집 사이에 큰 간격이 생길 가능성이 적습니다. 대신 인코더는 원점 주변의 공간을 대칭적이고 효과적으로 사용하려고 합니다. 

 

 

 

이제 정의된 오토인코더 함수를 인스턴스화 시켜 주겠습니다.

이때, 위에서 정의한 인코더와 디코더를 해당 함수 내에서 작동하도록 연결해주겠습니다.

# 변이형 오토인코더 생성
vae = VAE(encoder, decoder)

 

 

이제 변이형 오토인코더를 훈련시켜보겠습니다.

 

먼저, 학습에 사용할 옵티마이저를 정의하겠습니다.

# 변이형 오토인코더 컴파일
optimizer = optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer)

 

 다음은 모델의 학습 결과를 저장할 체크포인트를 만들어주겠습니다. 저장 로직은 validation loss가 최소가 되는 epoch의 weight를 저장하는 것입니다.

# 모델 저장 체크포인트 생성
model_checkpoint_callback = callbacks.ModelCheckpoint(
    filepath="./checkpoint",
    save_weights_only=False,
    save_freq="epoch",
    monitor="loss",
    mode="min",
    save_best_only=True,
    verbose=0,
)
tensorboard_callback = callbacks.TensorBoard(log_dir="./logs")

 

이제 인스턴스화 된 vae에 데이터 및 정의된 하이퍼 파라미터, callback 조건을 넣어서 학습시켜보겠습니다.

vae.fit(
    x_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    shuffle=True,
    validation_data=(x_test, x_test),
    callbacks=[model_checkpoint_callback, tensorboard_callback],
)

 

 

이제 학습된 최종 모델을 저장하고 테스트 데이터를 활용해 재구성해보겠습니다. 

# 최종 모델 저장
vae.save("./models/vae")
encoder.save("./models/encoder")
decoder.save("./models/decoder")
# 테스트셋의 일부를 선택합니다.
n_to_predict = 5000
example_images = x_test[:n_to_predict]
example_labels = y_test[:n_to_predict]
# 오토인코더 예측을 만들고 출력합니다.
z_mean, z_log_var, reconstructions = vae.predict(example_images)
print("실제 의류 아이템")
display(example_images)
print("재구성 이미지")
display(reconstructions)

 

* 위에 있는 그림이 테스트 데이터의 실제이미지이고, 그 아래 있는 그림이 Variational Auto Encoder를 활용해 재구성한 이미지입니다. 

 

다음은 임베딩 벡터들의 예시를 확인해보겠습니다. 먼저 수치적으로 확인해보면 아래와 같습니다.

# 임베딩의 몇 가지 예
print(z[:10])

 

이제 이 임베딩 벡터들으 2차원 공간상에서 시각화해보겠습니다.

# 2D 공간에서 인코딩된 포인트 표시
figsize = 8

plt.figure(figsize=(figsize, figsize))
plt.scatter(z[:, 0], z[:, 1], c="black", alpha=0.5, s=3)
plt.show()

 

 

 

이제 이렇게 학습된 디코더를 활용해서 임의 포인트를 선정해 생성을 해보겠습니다.

# 표준 정규 분포에서 잠재 공간의 일부 포인트를 샘플링합니다.
grid_width, grid_height = (6, 3)
z_sample = np.random.normal(size=(grid_width * grid_height, 2))
# 샘플링된 포인트 디코딩
reconstructions = decoder.predict(z_sample)
# 원본 임베딩과 샘플링된 임베딩을 p값으로 변환하기
p = norm.cdf(z)
p_sample = norm.cdf(z_sample)
# 그래프를 그립니다....
figsize = 8
plt.figure(figsize=(figsize, figsize))

# ... 원본 임베딩 ...
plt.scatter(z[:, 0], z[:, 1], c="black", alpha=0.5, s=2)

# ... 잠재 공간에 새로 생성된 포인트
plt.scatter(z_sample[:, 0], z_sample[:, 1], c="#00B0F0", alpha=1, s=40)
plt.show()

# 디코딩된 이미지 그리드를 추가합니다.
fig = plt.figure(figsize=(figsize, grid_height * 2))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

for i in range(grid_width * grid_height):
    ax = fig.add_subplot(grid_height, grid_width, i + 1)
    ax.axis("off")
    ax.text(
        0.5,
        -0.35,
        str(np.round(z_sample[i, :], 1)),
        fontsize=10,
        ha="center",
        transform=ax.transAxes,
    )
    ax.imshow(reconstructions[i, :, :], cmap="Greys")

 

생성 결과 기존 데이터 (검은색 포인트)와는 다른 생성된 데이터(하늘색)의 데이터가 나왔음을 알 수 있습니다.

 

생성된 하늘색 데이터를 바탕으로 디코더를 활용해 이미지를 생성해보면 아래와 같습니다. 기존 데이터에는 없는 새로운 그림들이 생성되었음을 알 수 있습니다.

 

 

그렇다면, 레이블의 종류에 따라 임베딩 벡터들의 분포에 대해서 알아보겠습니다.

# 레이블(의류 종류)에 따라 임베딩에 색상을 지정합니다.
figsize = 8
fig = plt.figure(figsize=(figsize * 2, figsize))
ax = fig.add_subplot(1, 2, 1)
plot_1 = ax.scatter(
    z[:, 0], z[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=3
)
plt.colorbar(plot_1)
ax = fig.add_subplot(1, 2, 2)
plot_2 = ax.scatter(
    p[:, 0], p[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=3
)
plt.show()

* 왼쪽 결과는 의류 종류별로 잠재 공간의 포인트를 색으로 나타낸 것으로,이를 보시면 어느 한 종류의 데이터가 극단적으로 우세하게 나타나고 있지않습니다. 

* 오른쪽 결과는 잠재 공간을 p-값으로 변환한 것입니다. 이 그래프에서 색깔마다 거의 비슷한 영역을 차지하고 있는 것을 알 수 있습니다. 

 

# 레이블(의류 종류)에 따라 임베딩에 색상을 지정합니다.
figsize = 12
grid_size = 15
plt.figure(figsize=(figsize, figsize))
plt.scatter(
    p[:, 0], p[:, 1], cmap="rainbow", c=example_labels, alpha=0.8, s=300
)
plt.colorbar()

x = norm.ppf(np.linspace(0, 1, grid_size))
y = norm.ppf(np.linspace(1, 0, grid_size))
xv, yv = np.meshgrid(x, y)
xv = xv.flatten()
yv = yv.flatten()
grid = np.array(list(zip(xv, yv)))

reconstructions = decoder.predict(grid)
# plt.scatter(grid[:, 0], grid[:, 1], c="black", alpha=1, s=10)
plt.show()

fig = plt.figure(figsize=(figsize, figsize))
fig.subplots_adjust(hspace=0.4, wspace=0.4)
for i in range(grid_size**2):
    ax = fig.add_subplot(grid_size, grid_size, i + 1)
    ax.axis("off")
    ax.imshow(reconstructions[i, :, :], cmap="Greys")

반응형

댓글