이번 시간에는 이전 시간에 알아본 포스팅의 내용을 파이썬 코드로 구현해서 분류 결과를 도출해보겠습니다.
[개념정리] HYDRA(HYbrid Dicionary-Rocket Architecture)(1/2)
코드는 범용적인 재현성을 위해 구글 코랩환경에서 구현하였습니다.
1. HYDRA를 활용한 시계열 데이터 분류
- 이번 실험을 위해 사용할 데이터는 Cricket 데이터입니다.
[출처 : https://www.timeseriesclassification.com/description.php?Dataset=Cricket ]
- 해당 데이터에 대한 간략한 설명은 아래와 같습니다.
* 이 데이터는 크리켓 경기에서 심판이 점수 기록자에게 다양한 게임 이벤트를 신호로 전달하는 동작을 분석하는 과정에서 나온 데이터입니다. 4명의 심판이 12가지 신호를 10번씩 반복한 데이터가 포함되어 있으며, 가속도계가 양쪽 손목에 부착되어 X,Y,Z 축의 데이터를 기록하였습니다. 이때 데이터는 총 6차원(양손 가속도계의 3축 데이터)로 구성되어 있습니다.
먼저, 시계열 데이터를 불러와서 학습이 가능한 상태로 가공해줍니다.
from sklearn.linear_model import RidgeClassifierCV
from sklearn.metrics import accuracy_score
from tsai.all import *
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import RidgeClassifierCV
import numpy as np
import torch
# 데이터 로드
X_train , y_train , X_test , y_test = get_UCR_data ( 'Cricket' , split_data= True )
# 데이터를 Tensor로 변환 (num_examples, 1, length 형태로 변환)
X_train = torch.tensor ( X_train ) . float () # (n_samples, 1, length)
X_test = torch.tensor ( X_test ) . float ()
# y_train, y_test가 문자열 라벨일 경우, LabelEncoder로 변환
from sklearn.preprocessing import LabelEncoder
label_encoder = LabelEncoder ()
y_train = label_encoder.fit_transform ( y_train )
y_test = label_encoder.transform ( y_test )
# HydraMultivariate 사용 시, 시계열 길이 및 채널 수를 정확하게 전달해야 합니다.
input_length = X_train.shape [ -1 ] # 시계열 길이
num_channels = X_train.shape [ 1 ] # 채널 수 (1일 수 있음)
해당 데이터는 6개의 채널을 가지고 있으므로 HydraMultivariate을 사용하겠습니다. 아래 코드는 해당 논문의 github 에서 가져왔습니다.
# Angus Dempster, Daniel F. Schmidt, Geoffrey I. Webb
# HYDRA: Competing convolutional kernels for fast and accurate time series classification
# ** EXPERIMENTAL **
# This is an *untested*, *experimental* extension of Hydra to multivariate input.
# todo: cleanup, documentation
import numpy as np
import torch , torch.nn as nn , torch.nn.functional as F
class HydraMultivariate ( nn . Module ) :
def __init__ ( self , input_length , num_channels , k = 8 , g = 64 , max_num_channels = 8 ) :
super () . __init__ ()
self .k = k # num kernels per group
self .g = g # num groups
max_exponent = np.log2 (( input_length - 1 ) / ( 9 - 1 )) # kernel length = 9
self .dilations = 2 ** torch.arange ( int ( max_exponent ) + 1 )
self .num_dilations = len ( self .dilations )
self .paddings = torch.div (( 9 - 1 ) * self .dilations , 2 , rounding_mode = "floor" ) . int ()
# if g > 1, assign: half the groups to X, half the groups to diff(X)
divisor = 2 if self .g > 1 else 1
_g = g // divisor
self ._g = _g
self .W = [ self .normalize ( torch.randn ( divisor , k * _g , 1 , 9 )) for _ in range ( self .num_dilations )]
# combine num_channels // 2 channels (2 < n < max_num_channels)
num_channels_per = np.clip ( num_channels // 2 , 2 , max_num_channels )
self .I = [ torch.randint ( 0 , num_channels , ( divisor , _g , num_channels_per )) for _ in range ( self .num_dilations )]
@staticmethod
def normalize ( W ) :
W -= W.mean ( -1 , keepdims = True )
W /= W. abs () . sum ( -1 , keepdims = True )
return W
# transform in batches of *batch_size*
def batch ( self , X , batch_size = 256 ) :
num_examples = X.shape [ 0 ]
if num_examples <= batch_size :
return self ( X )
else :
Z = []
batches = torch.arange ( num_examples ) .split ( batch_size )
for i , batch in enumerate ( batches ):
Z.append ( self ( X [ batch ]))
return torch.cat ( Z )
def forward ( self , X ) :
num_examples = X.shape [ 0 ]
if self .g > 1 :
diff_X = torch.diff ( X )
Z = []
for dilation_index in range ( self .num_dilations ):
d = self .dilations [ dilation_index ] .item ()
p = self .paddings [ dilation_index ] .item ()
# diff_index == 0 -> X
# diff_index == 1 -> diff(X)
for diff_index in range ( min ( 2 , self .g )):
_Z = F.conv1d ( X [:, self .I [ dilation_index ][ diff_index ]] . sum ( 2 ) if diff_index == 0 else diff_X [:, self .I [ dilation_index ][ diff_index ]] . sum ( 2 ),
self .W [ dilation_index ][ diff_index ], dilation = d , padding = p ,
groups = self ._g ) \
.view ( num_examples , self ._g , self .k , -1 )
max_values , max_indices = _Z. max ( 2 )
count_max = torch.zeros ( num_examples , self ._g , self .k )
min_values , min_indices = _Z. min ( 2 )
count_min = torch.zeros ( num_examples , self ._g , self .k )
count_max.scatter_add_ ( -1 , max_indices , max_values )
count_min.scatter_add_ ( -1 , min_indices , torch.ones_like ( min_values ))
Z.append ( count_max )
Z.append ( count_min )
Z = torch.cat ( Z , 1 ) .view ( num_examples , -1 )
return Z
이제 이를 바탕으로 HYDRA를 활용해 특성을 추출합니다.
이때 고차원의 시계열 데이터를 처리하기 위해 Sparse Sclaer를 활용해줍니다. 이는 각 차원(채널)의 값들을 특정 범위로 변환하여 데이터의 스케일을 조정해줍니다. 이를 통해 각 차원의 데이터들이 모델에서 고르게 반영될 수 있게 해줍니다. 특히, 대부분의 값이 0이거나 매우 작은 값인 경우에도 유용하게 작용하며, 이를 통해 시계열 데이터의 Scale 불균형 문제를 해결해줍니다.
class SparseScaler () :
def __init__ ( self , mask = True , exponent = 4 ) :
self .mask = mask
self .exponent = exponent
self .fitted = False
def fit ( self , X ) :
assert not self .fitted , "Already fitted."
X = X.clamp ( 0 ) .sqrt ()
self .epsilon = ( X == 0 ) . float () .mean ( 0 ) ** self .exponent + 1e-8
self .mu = X.mean ( 0 )
self .sigma = X.std ( 0 ) + self .epsilon
self .fitted = True
def transform ( self , X ) :
assert self .fitted , "Not fitted."
X = X.clamp ( 0 ) .sqrt ()
if self .mask :
return (( X - self .mu ) * ( X != 0 )) / self .sigma
else :
return ( X - self .mu ) / self .sigma
def fit_transform ( self , X ) :
self .fit ( X )
return self .transform ( X )
# HydraMultivariate 사용 시, 시계열 길이 및 채널 수를 정확하게 전달해야 합니다.
input_length = X_train.shape [ -1 ] # 시계열 길이
num_channels = X_train.shape [ 1 ] # 채널 수 (1일 수 있음)
# HydraMultivariate 모델 초기화
model = HydraMultivariate ( input_length=input_length , num_channels=num_channels )
# 훈련 및 테스트 데이터 변환
X_train_transform = model.batch ( X_train )
X_test_transform = model.batch ( X_test )
# SparseScaler로 변환된 데이터를 스케일링
scaler = SparseScaler ()
X_train_transform = scaler.fit_transform ( X_train_transform )
X_test_transform = scaler.transform ( X_test_transform )
이제 이렇게 변화된 특성을 활용해서 Ridge Classifier로 분류를 해줍니다.
from sklearn.linear_model import RidgeClassifierCV
from sklearn.metrics import accuracy_score
# RidgeClassifierCV로 모델 학습
classifier = RidgeClassifierCV ( alphas=np.logspace ( -3 , 3 , 10 ))
classifier.fit ( X_train_transform , y_train )
# 테스트 데이터 예측
y_pred = classifier.predict ( X_test_transform )
# 정확도 출력
accuracy = accuracy_score ( y_test , y_pred )
print ( f 'Test Accuracy: { accuracy * 100:.2f } %' )
결과는 위와 같이 98.61%의 정확도를 얻을 수 있었습니다.
댓글