이번 포스팅은 지난번에 알아본 LSTM의 개념을 바탕으로 실습을 해보는 내용입니다.
1. LSTM을 활용한 회귀 분석
- LSTM은 RNN의 한 종류로, 시계열 데이터 분석에 효과적인 구조를 가지고 있습니다. LSTM은 시간 의존성이 긴 데이터에서도 중요한 패턴을 학습할 수 있도록 설계되었으며, 회귀 분석에서는 연속적인 값 예측, 주가 분석, 온도 예측 등 다양한 연속형 데이터 문제에 활용될 수 있습니다.
- LSTM을 시계열 데이터의 회귀 분석에 활용 시 다음과 같은 장점들이 있습니다.
1) 시간 의존성 학습 : 시계열 데이터에서 이전 시점의 정보를 사용해 현재 시점의 결과를 예측할 수 있습니다.
2) 장기 의존성 해결 : LSTM의 게이트 구조(입력, 망각, 출력 게이트)는 RNN의 단점인 장기 의존성 문제를 해결 할 수 있습니다.
- LSTM에 대한 자세한 내용은 아래 포스팅을 참조해보시면 좋을 것 같습니다.
[딥러닝 with Python] LSTM (Long Short Term Memory)
[딥러닝 with Python] LSTM (Long Short Term Memory)
[본 포스팅은 "만들면서 배우는 생성 AI 2판"을 참조로 작성했습니다] 이번에 알아볼 모형은 자기회귀 모델의 대표적인 모형인 LSTM입니다. LSTM은 Long Short Term Memory의 줄임말로 기존의 순환 신경
jaylala.tistory.com
2. Python을 활용한, LSTM 회귀 분석
먼저 분석에 필요한 라이브러리들을 불러옵니다.
import numpy as np
import torch
from torch import nn , optim
from sklearn . datasets import make_regression
from sklearn . preprocessing import MinMaxScaler
from sklearn . model_selection import train_test_split
import matplotlib . pyplot as plt
다음으로 예제 데이터를 만들어 주고 시각화해보겠습니다. x는 시간, y는 해당 시간에 대응되는 1차원 값입니다.
# 데이터 생성
X , y = make_regression ( n_samples = 1000 , n_features = 1 , noise = 0.1 , random_state = 42 )
# 시계열 데이터로 변환
def create_sequences ( data , target , sequence_length = 10 ):
X_seq , y_seq = [], []
for i in range ( len ( data ) - sequence_length ):
X_seq . append ( data [ i : i + sequence_length ])
y_seq . append ( target [ i + sequence_length ])
return np . array ( X_seq ), np . array ( y_seq )
# 시퀀스 데이터 생성
sequence_length = 10
X_seq , y_seq = create_sequences ( X , y , sequence_length )
# 데이터 분리
X_train , X_test , y_train , y_test = train_test_split ( X_seq , y_seq , test_size = 0.2 , random_state = 42 )
# 데이터 정규화
scaler_x = MinMaxScaler ()
scaler_y = MinMaxScaler ()
X_train = scaler_x . fit_transform ( X_train .reshape( - 1 , 1 )). reshape ( X_train .shape)
X_test = scaler_x . transform ( X_test .reshape( - 1 , 1 )). reshape ( X_test .shape)
y_train = scaler_y . fit_transform ( y_train .reshape( - 1 , 1 ))
y_test = scaler_y . transform ( y_test .reshape( - 1 , 1 ))
# PyTorch 텐서로 변환
X_train = torch . tensor ( X_train , dtype = torch . float32 )
X_test = torch . tensor ( X_test , dtype = torch . float32 )
y_train = torch . tensor ( y_train , dtype = torch . float32 ). view ( - 1 , 1 )
y_test = torch . tensor ( y_test , dtype = torch . float32 ). view ( - 1 , 1 )
# 원본 데이터 분포 시각화
plt . figure ( figsize = ( 10 , 5 ))
plt . plot ( y [: 200 ], label = "Original Data" )
plt . title ( "Original Data Distribution" )
plt . xlabel ( "Index" )
plt . ylabel ( "Target Value" )
plt . legend ()
plt . show ()
다음은 LSTM을 활용한 회귀모형을 정의해줍니다. 학습 데이터를 LSTM에 전달한 뒤 LSTM의 출력 중 [배치 크기, 시점 수, hidden_dim] 만을 out으로 정의하고, 마지막 은닉 상태 및 셀 상태는 _ 로 정의하여 사용하지 않도록 합니다.
이때 out은 [배치 크기, 시점 수, hidden_dim]으로 정의했는데, 마지막 시점만을 사용하기 위해 "시점 수" 에 -1 값을 넣었습니다. 이를 통해 최종 도출되는 결과는 [배치 크기, hidden_dim]이 되겠습니다.
그리고 이걸 Linear layer에 연결하여 가중합을 하고 결과를 도출하여 예측값을 만들어주었습니다.
class LSTMRegression ( nn . Module ):
def __init__ ( self , input_dim , hidden_dim , output_dim , num_layers ):
super ( LSTMRegression , self ). __init__ ()
self . lstm = nn . LSTM ( input_dim , hidden_dim , num_layers , batch_first = True )
self . fc = nn . Linear ( hidden_dim , output_dim )
def forward ( self , x ):
out , _ = self . lstm ( x ) # LSTM의 출력
out = self . fc ( out [:, - 1 , :]) # 마지막 시점의 출력만 사용
return out
이제 학습을 진행합니다. y값이 1개의 차원 이기에 input dim =1 이고, 회귀 문제이기에 output dim은 1입니다. hidden dim과 num_layers는 각각 lstm이 도출하는 hidden dimension의 크기와 lstm layer의 수를 의미하며 하이퍼파라미터가 되겠습니다. 이를 조절해서 모델을 튜닝할 수 있습니다.
loss는 MSE를, optimizer는 adam을 활용했습으며 100회 학습하고 한 번에 학습할 배치의 사이즈는 16으로 설정했습니다.
# 모델 초기화
input_dim = 1
hidden_dim = 50
output_dim = 1
num_layers = 2
model = LSTMRegression ( input_dim , hidden_dim , output_dim , num_layers )
criterion = nn . MSELoss ()
optimizer = optim . Adam ( model . parameters (), lr = 0.001 )
# 학습
epochs = 100
batch_size = 16
train_losses = []
for epoch in range ( epochs ):
model . train ()
epoch_loss = 0
for i in range ( 0 , X_train . size ( 0 ), batch_size ):
x_batch = X_train [ i : i + batch_size ]
y_batch = y_train [ i : i + batch_size ]
outputs = model ( x_batch )
loss = criterion ( outputs , y_batch )
optimizer . zero_grad ()
loss .backward()
optimizer . step ()
epoch_loss += loss .item()
train_losses . append ( epoch_loss / ( X_train . size ( 0 ) // batch_size ))
if ( epoch + 1 ) % 10 == 0 :
print ( f "Epoch [ { epoch + 1 } / { epochs } ], Loss: { epoch_loss :.4f} " )
training loss를 시각화 해보면 아래와 같습니다.
# 학습 손실 시각화
plt . figure ( figsize = ( 10 , 5 ))
plt . plot ( train_losses , label = "Train Loss" )
plt . title ( "Training Loss Over Epochs" )
plt . xlabel ( "Epoch" )
plt . ylabel ( "Loss" )
plt . legend ()
plt . show ()
이제 학습된 모델을 정의된 Test 데이터에 적용해본 결과는 아래와 같습니다.
# 평가
model . eval ()
with torch . no_grad ():
y_pred = model ( X_test )
# 스케일 복원
y_pred = scaler_y . inverse_transform ( y_pred .numpy())
y_actual = scaler_y . inverse_transform ( y_test . numpy ())
# 예측 결과 시각화
plt . figure ( figsize = ( 10 , 5 ))
plt . plot ( y_actual [: 100 ], label = "Actual" , linestyle = 'dotted' )
plt . plot ( y_pred [: 100 ], label = "Predicted" , linestyle = 'solid' )
plt . title ( "LSTM Regression: Actual vs Predicted" )
plt . xlabel ( "Sample Index" )
plt . ylabel ( "Target Value" )
plt . legend ()
plt . show ()
모델이 단순하기에 예측값과 실제값의 차이가 꽤 많이나고 있습니다. 보다 정교한 모델링이 필요하다는 것을 알 수 있습니다.
이번 시간에는 LSTM을 활용한 간단한 회귀 모형을 만들어 Test 하여 얼마나 예측력이 있는지 확인해보았습니다.
댓글