이번에는 트랜스포머에 대해서 파이썬 코드로 하나씩 구현해가면서 결과를 확인해보겠습니다.
1. 트랜스포머(Transformer)
먼저 트랜스포머(Transformer)에 대해 간략하게 알아보겠습니다.
- 트랜스포머는 자연어 처리에서 혁신적인 변화를 준 모델로, "Attention is All you need"라는 논문에 소개된 딥러닝 구조를 말합니다.
- 특히, 번역, 요약, 문장 생성 등 다양한 태스크에서 굉장히 뛰어난 성능을 보였으며, 이를 활용해서 이미지처리 분야에서는 Vision Transformer라는 모델이 나와 기존의 CNN구조의 모델들의 성능을 압도할 정도로 중요한 역할을 해내고 있습니다.
- 특히, 트랜스포머는 Multi-head self attention을 활용해서 기존 RNN 기반의 순차적인 데이터 처리에서 벗어나 전체 문장을 한꺼번에 처리할 수 있는 모델을 구현하였습니다.
- 전체적인 트랜스포머의 구조는 아래와 같습니다.
- 트랜스포머 모델의 주요 구성요소와 이에 대한 설명은 아래와 같습니다.
1) 포지셔널 인코딩(Positional Encoding)
* 트랜스포머 모델은 순차적인 데이터를 직접적으로 처리하지 않기에, 입력 시퀀스의 각 단어에 위치 정보를 추가하기 위하여 포지셔널 인코딩을 사용하였습니다.
* 포지셔널 인코딩은 단어의 위치 정보를 제공하며, 이는 사인(Sine) 함수와 코사인(Cosine) 함수를 사용하여 계산됩니다.
** 짝수 인덱스 차원에 대해서는 사인(Sine) 함수를, 홀수 인덱스 차원에 대해서는 코사인(Cosine) 함수를 활용하여 위치정보를 인코딩하였습니다.
여기서
* 간단한 예시를 통해서 포지셔널 인코딩을 구현하는 코드와 그 결과를 확인해보겠습니다.
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
# 포지셔널 인코딩 함수 정의
def get_positional_encoding ( max_seq_length , d_model ) :
positional_encoding = np.zeros (( max_seq_length , d_model ))
position = np.arange ( 0 , max_seq_length ) .reshape ( -1 , 1 )
div_term = np.exp ( np.arange ( 0 , d_model , 2 ) * - ( np.log ( 10000.0 ) / d_model ))
positional_encoding [:, 0 :: 2 ] = np.sin ( position * div_term )
positional_encoding [:, 1 :: 2 ] = np.cos ( position * div_term )
return positional_encoding
# 포지셔널 인코딩 계산
max_seq_length = 50 # 최대 시퀀스 길이
d_model = 512 # 모델 차원 수
positional_encoding = get_positional_encoding ( max_seq_length , d_model )
# 포지셔널 인코딩 시각화
plt.figure ( figsize= ( 15 , 8 ))
sns.heatmap ( positional_encoding , cmap= 'coolwarm' , cbar= True )
plt.title ( 'Positional Encoding' )
plt.xlabel ( 'Embedding Dimensions' )
plt.ylabel ( 'Position' )
plt.show ()
* 이처럼 각 순서별로 고정된 값을 나타내게 하는 포지셔널 인코딩을 활용하여 트랜스포머 모델에서 위치 정보를 저장하도록 합니다.
2) 임베딩 레이어 (Embedding Layer)
* 입력 단어들을 고차원 벡터로 변환합니다. 입력과 출력 모두 임베딩 레이어를 통해 처리가 됩니다.
* 임베딩 레이어는 각 단어를 고정된 차원의 벡터로 변환하여 모델이 이해할 수 있도록 해줍니다.
3) 멀티 헤드 어텐션 (Multi Head Attention)
* 어텐션 메커니즘은 문장의 각 단어가 다른 단어들과 어떻게 관련되는지를 학습하게 해줍니다.
(주어진 단어가 문장 내 다른 단어들과 얼마나 관련이 있는지를 계산)
* 이때 멀티-헤드 어텐션은 여러 개의 어텐션 메커니즘을 병렬로 사용하여 다양한 표현을 학습할 수 있도록 해줍니다.
* 세 가지 입력인, 쿼리(Query), 키(Key), 값(Value)를 사용해 각 단어가 다른 단어들과 상호작용 하는 방법을 학습니다.
이미지 참조 : https://velog.io/@cha-suyeon/%EB%A9%80%ED%8B%B0-%ED%97%A4%EB%93%9C-%EC%96%B4%ED%85%90%EC%85%98Multi-head-Attention-%EA%B5%AC%ED%98%84%ED%95%98%EA%B8%B0
4) 피드포워드 네트워크(Feed Forward Network)
* 각 어텐션 출력 벡터에 대해 독립적으로 적용되는 두 개의 오나전 연결층으로 구성됩니다.
5) Add & Norm
* 각 서브 레이어의 출력을 입력(Addition)과 더한 후 정규화(Normalization) 해줍니다.
* 이는 스킵 커넥션(Skip Connection)을 활용하는 것으로 모델의 학습 간 파라미터 업데이트 간 안정성을 부여하고, 그레이디언트 소실 문제를 완화해줍니다.
위와 같은 모듈들이 전체 구조에서 보이는 것처럼 순서를 가지고 구성된 네트워크가 트랜스포머 입니다.
2. 파이썬 코드를 활용해서 구현한 트랜스포머(Transformer)
이번에는 파이썬 코드를 활용해서 트랜스포머를 구현해보겠습니다.
* 구현의 편의를 위해서 코드는 코랩 환경에서 구현할 수 있게 구성해보았으며
* 데이터는 Tateoba 데이터 셋을 활용해서 영어에서 프랑스어로 번역을 하는 모델을 구현해보았습니다.
https://paperswithcode.com/dataset/tatoeba
Papers with Code - Tatoeba Dataset
Tatoeba is a free collection of example sentences with translations geared towards foreign language learners. It is available in more than 400 languages. Its name comes from the Japanese phrase “tatoeba” (例えば), meaning “for example”. It is wr
paperswithcode.com
"Tatoeba는 외국어 학습자를 위한 번역 예문 모음집입니다. 400개 이상의 언어로 제공됩니다. 그 이름은 일본어 표현 "tatoeba" (例えば)에서 유래했으며, "예를 들어"라는 의미입니다. Tatoeba는 자원 봉사자 커뮤니티에 의해 작성되고 유지되며, 개방형 협업 모델을 통해 운영됩니다. 개인 기여자는 Tatoebans로 알려져 있습니다."
먼저 구현을 위해 필요한 라이브러리 들을 설치해줍니다.
! pip install torch torchtext spacy
! python -m spacy download en_core_web_sm
! python -m spacy download fr_core_news_sm
다음으로 데이터 셋을 다운로드 받아주고, 영어와 프랑스 중 문장으로 구성된 것들만을 추출해 줍니다.
import os
import urllib.request
# 데이터셋 다운로드
filename = "sentences.csv"
if not os.path.exists ( filename ):
print ( "Downloading dataset..." )
urllib.request.urlretrieve ( url , filename )
print ( "Download completed!" )
# 파일에서 필요한 부분만 추출
import pandas as pd
data = pd.read_csv ( filename , delimiter= '\t' , header= None , names= [ "id" , "lang" , "sentence" ])
en_sentences = data [ data [ "lang" ] == "eng" ][ "sentence" ] .tolist ()
fr_sentences = data [ data [ "lang" ] == "fra" ][ "sentence" ] .tolist ()
# 영어와 프랑스어 문장 쌍을 만듭니다.
en_fr_pairs = list ( zip ( en_sentences [: 10000 ], fr_sentences [: 10000 ])) # 10000개 문장 사용
# 데이터를 TSV 파일로 저장합니다.
with open ( "en_fr_data.tsv" , "w" ) as f :
f.write ( "src\ttrg\n" )
for en , fr in en_fr_pairs :
f.write ( f " { en } \t { fr } \n" )
이후 기본 전처리를 위한 spacy 라이브러리와 파이토치를 활용해서 전처리를 진행해줍니다.
(토크나이징, 필드 정의, 데이터 로더 생성)
import torch
import torch.nn as nn
import spacy
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer
from torch.utils.data import DataLoader
# spacy 모델 다운로드 및 로드
spacy_en = spacy.load ( "en_core_web_sm" )
spacy_fr = spacy.load ( "fr_core_news_sm" )
# 토크나이저 정의
def tokenize_en ( text ) :
return [ tok.text for tok in spacy_en.tokenizer ( text )]
def tokenize_fr ( text ) :
return [ tok.text for tok in spacy_fr.tokenizer ( text )]
# 필드 정의
SRC_TOKENIZER = get_tokenizer ( tokenize_en )
TRG_TOKENIZER = get_tokenizer ( tokenize_fr )
# 데이터셋 로드 및 토큰화
def yield_tokens ( data_iter , tokenizer ) :
for data_sample in data_iter :
yield tokenizer ( data_sample )
with open ( "en_fr_data.tsv" ) as f :
data = [ line.strip () .split ( "\t" ) for line in f.readlines ()[ 1 :]]
src_text = [ item [ 0 ] for item in data ]
trg_text = [ item [ 1 ] for item in data ]
SRC_VOCAB = build_vocab_from_iterator ( yield_tokens ( src_text , SRC_TOKENIZER ), specials= [ "<unk>" , "<pad>" , "<bos>" , "<eos>" ])
SRC_VOCAB.set_default_index ( SRC_VOCAB [ "<unk>" ])
TRG_VOCAB = build_vocab_from_iterator ( yield_tokens ( trg_text , TRG_TOKENIZER ), specials= [ "<unk>" , "<pad>" , "<bos>" , "<eos>" ])
TRG_VOCAB.set_default_index ( TRG_VOCAB [ "<unk>" ])
# 데이터셋 정의
def data_process ( src_text , trg_text , src_vocab , trg_vocab , src_tokenizer , trg_tokenizer ) :
data = []
for ( src_line , trg_line ) in zip ( src_text , trg_text ):
src_tensor = torch.tensor ([ src_vocab [ token ] for token in src_tokenizer ( src_line )], dtype=torch. long )
trg_tensor = torch.tensor ([ trg_vocab [ token ] for token in trg_tokenizer ( trg_line )], dtype=torch. long )
data.append (( src_tensor , trg_tensor ))
return data
train_data = data_process ( src_text , trg_text , SRC_VOCAB , TRG_VOCAB , SRC_TOKENIZER , TRG_TOKENIZER )
# 데이터 로더 생성
BATCH_SIZE = 64
PAD_IDX = SRC_VOCAB [ "<pad>" ]
def generate_batch ( data_batch ) :
src_batch , trg_batch = [], []
for ( src_item , trg_item ) in data_batch :
src_batch.append ( torch.cat ([ torch.tensor ([ SRC_VOCAB [ "<bos>" ]]), src_item , torch.tensor ([ SRC_VOCAB [ "<eos>" ]])], dim= 0 ))
trg_batch.append ( torch.cat ([ torch.tensor ([ TRG_VOCAB [ "<bos>" ]]), trg_item , torch.tensor ([ TRG_VOCAB [ "<eos>" ]])], dim= 0 ))
src_batch = nn.utils.rnn.pad_sequence ( src_batch , padding_value=PAD_IDX )
trg_batch = nn.utils.rnn.pad_sequence ( trg_batch , padding_value=PAD_IDX )
return src_batch , trg_batch
train_iter = DataLoader ( train_data , batch_size=BATCH_SIZE , shuffle= True , collate_fn=generate_batch )
학습 데이터가 크므로, GPU를 활용해줍니다. 코랩에서는 T4 GPU는 무료로도 활용하실 수 있습니다.
# GPU 사용 설정
device = torch.device ( "cuda" if torch.cuda.is_available () else "cpu" )
print ( f "Using device: { device } " )
이제 트랜스포머 모델을 정의해주고 학습을 진행해줍니다.
각 필요한 클래스들을 정의해주고, 학습에 필요한 하이퍼 파라미터들을 설정해줍니다.
# 트랜스포머 모델 정의
class PositionalEncoding ( nn . Module ) :
def __init__ ( self , d_model , max_seq_length ) :
super ( PositionalEncoding , self ) . __init__ ()
self .pe = self .create_positional_encoding ( d_model , max_seq_length )
def create_positional_encoding ( self , d_model , max_seq_length ) :
pe = torch.zeros ( max_seq_length , d_model )
position = torch.arange ( 0 , max_seq_length , dtype=torch. float ) .unsqueeze ( 1 )
div_term = torch.exp ( torch.arange ( 0 , d_model , 2 ) . float () * ( -np.log ( 10000.0 ) / d_model ))
pe [:, 0 :: 2 ] = torch.sin ( position * div_term )
pe [:, 1 :: 2 ] = torch.cos ( position * div_term )
pe = pe.unsqueeze ( 0 ) .transpose ( 0 , 1 )
return pe
def forward ( self , x ) :
x = x + self .pe [: x.size ( 0 ), :] .to ( x.device )
return x
class MultiHeadAttention ( nn . Module ) :
def __init__ ( self , d_model , nhead ) :
super ( MultiHeadAttention , self ) . __init__ ()
self .multihead_attn = nn.MultiheadAttention ( d_model , nhead )
def forward ( self , query , key , value , attn_mask = None ) :
attn_output , _ = self .multihead_attn ( query , key , value , attn_mask=attn_mask )
return attn_output
class FeedForward ( nn . Module ) :
def __init__ ( self , d_model , dim_feedforward ) :
super ( FeedForward , self ) . __init__ ()
self .fc1 = nn.Linear ( d_model , dim_feedforward )
self .fc2 = nn.Linear ( dim_feedforward , d_model )
self .relu = nn.ReLU ()
def forward ( self , x ) :
return self .fc2 ( self .relu ( self .fc1 ( x )))
class AddNorm ( nn . Module ) :
def __init__ ( self , d_model ) :
super ( AddNorm , self ) . __init__ ()
self .norm = nn.LayerNorm ( d_model )
def forward ( self , x , sublayer ) :
return self .norm ( x + sublayer )
class EncoderLayer ( nn . Module ) :
def __init__ ( self , d_model , nhead , dim_feedforward ) :
super ( EncoderLayer , self ) . __init__ ()
self .self_attn = MultiHeadAttention ( d_model , nhead )
self .feed_forward = FeedForward ( d_model , dim_feedforward )
self .add_norm_1 = AddNorm ( d_model )
self .add_norm_2 = AddNorm ( d_model )
def forward ( self , src ) :
src2 = self .add_norm_1 ( src , self .self_attn ( src , src , src ))
src = self .add_norm_2 ( src2 , self .feed_forward ( src2 ))
return src
class DecoderLayer ( nn . Module ) :
def __init__ ( self , d_model , nhead , dim_feedforward ) :
super ( DecoderLayer , self ) . __init__ ()
self .self_attn = MultiHeadAttention ( d_model , nhead )
self .multihead_attn = MultiHeadAttention ( d_model , nhead )
self .feed_forward = FeedForward ( d_model , dim_feedforward )
self .add_norm_1 = AddNorm ( d_model )
self .add_norm_2 = AddNorm ( d_model )
self .add_norm_3 = AddNorm ( d_model )
def forward ( self , tgt , memory , tgt_mask = None , memory_mask = None ) :
tgt2 = self .add_norm_1 ( tgt , self .self_attn ( tgt , tgt , tgt , attn_mask=tgt_mask ))
tgt2 = self .add_norm_2 ( tgt2 , self .multihead_attn ( tgt2 , memory , memory , attn_mask=memory_mask ))
tgt = self .add_norm_3 ( tgt2 , self .feed_forward ( tgt2 ))
return tgt
class Transformer ( nn . Module ) :
def __init__ ( self , d_model , nhead , num_encoder_layers , num_decoder_layers , dim_feedforward , input_vocab_size , target_vocab_size , max_seq_length ) :
super ( Transformer , self ) . __init__ ()
self .input_embedding = nn.Embedding ( input_vocab_size , d_model )
self .output_embedding = nn.Embedding ( target_vocab_size , d_model )
self .positional_encoding = PositionalEncoding ( d_model , max_seq_length )
self .encoder_layers = nn.ModuleList ([ EncoderLayer ( d_model , nhead , dim_feedforward ) for _ in range ( num_encoder_layers )])
self .decoder_layers = nn.ModuleList ([ DecoderLayer ( d_model , nhead , dim_feedforward ) for _ in range ( num_decoder_layers )])
self .linear = nn.Linear ( d_model , target_vocab_size )
def forward ( self , src , tgt , tgt_mask = None , memory_mask = None ) :
src = self .input_embedding ( src ) * np.sqrt ( d_model )
tgt = self .output_embedding ( tgt ) * np.sqrt ( d_model )
src = self .positional_encoding ( src )
tgt = self .positional_encoding ( tgt )
for layer in self .encoder_layers :
src = layer ( src )
memory = src
for layer in self .decoder_layers :
tgt = layer ( tgt , memory , tgt_mask=tgt_mask , memory_mask=memory_mask )
output = self .linear ( tgt )
return output
# Hyperparameters
d_model = 512
nhead = 8
num_encoder_layers = 6
num_decoder_layers = 6
dim_feedforward = 2048
input_vocab_size = len ( SRC_VOCAB )
target_vocab_size = len ( TRG_VOCAB )
max_seq_length = 100
# 모델 초기화
model = Transformer ( d_model , nhead , num_encoder_layers , num_decoder_layers , dim_feedforward , input_vocab_size , target_vocab_size , max_seq_length ) .to ( device )
criterion = nn.CrossEntropyLoss ( ignore_index=PAD_IDX )
optimizer = optim.Adam ( model.parameters (), lr= 0.0001 )
# 학습 루프
num_epochs = 10
for epoch in range ( num_epochs ):
model.train ()
total_loss = 0
for batch in train_iter :
src = batch [ 0 ] .to ( device )
tgt = batch [ 1 ] .to ( device )
optimizer.zero_grad ()
output = model ( src , tgt [: -1 , :])
loss = criterion ( output.view ( -1 , output.shape [ -1 ]), tgt [ 1 :, :] .reshape ( -1 ))
loss.backward ()
optimizer.step ()
total_loss += loss.item ()
print ( f "Epoch { epoch + 1 } , Loss: { total_loss / len ( train_iter )} " )
# 학습 완료 후 예시 출력
model. eval ()
학습 간 loss CrossEntropyLoss를 활용해줍니다.
이제 학습 후 도출된 텐서를 텍스트로 전환해주는 함수를 정의해주고, 일부 데이터의 샘플(영어 문장)을 넣어서 결과(프랑스어 문장)을 얼마나 잘 도출하는지 알아봅니다.
# 텍스트 변환 함수
def tensor_to_text ( tensor , vocab ) :
return ' ' .join ([ vocab.get_itos ()[ idx ] for idx in tensor ])
with torch.no_grad ():
for batch in train_iter :
src = batch [ 0 ] .to ( device )
tgt = batch [ 1 ] .to ( device )
output = model ( src , tgt [: -1 , :])
# 텐서를 텍스트로 변환
src_text = [ tensor_to_text ( seq , SRC_VOCAB ) for seq in src.T.cpu ()]
tgt_text = [ tensor_to_text ( seq , TRG_VOCAB ) for seq in tgt.T.cpu ()]
output_text = [ tensor_to_text ( torch.argmax ( seq , dim= 1 ), TRG_VOCAB ) for seq in output.cpu ()]
for i in range ( min ( len ( src_text ), len ( output_text ))): # Ensure the lengths match
print ( f "Input: { src_text [ i ]} " )
print ( f "Target: { tgt_text [ i ]} " )
print ( f "Output: { output_text [ i ]} " )
print ( "\n" )
break # Just run one batch for demonstration
결과를 확인해보니 번역결과가 별로 좋지 않네요.. 학습 epoch를 10번 설정했고, 계속 loss가 감소하는 것으로 보아 아직 under fitting 된것으로 확인됩니다. 이외에도 여러가지 이유가 있겠습니다.
이상 트랜스포머에 대해 처음부터 구현해보고 학습 및 검증을 해보는 코드를 작성해보았습니다.
댓글