[본 포스팅은 Medium의 "Semantic Segmentation in Self-driving Cars" 포스팅과 아래 블로그를 참조하여 작성하였습니다]
https://blog.jovian.com/semantic-segmentation-in-self-driving-cars-3cb89aa08e9b
https://velog.io/@jarvis_geun/U-Net-%EC%8B%A4%EC%8A%B5
이번에는 지난 시간에 알아본 U-Net의 코드를 파이토치를 활용해 구현해보겠습니다.
[딥러닝 with 파이썬] (논문리뷰)U-Net이란? U-Net: Convolutional Networks for Biomedical Image Segmentation
U-Net은 Contracting Path를 통해 입력 이미지 데이터를 Down Sampling하고 이 과정에서 얻어지는 Context 정보, 그리고 Expansive Path를 통해 Down sampled된 데이터를 Up sampling함으로써 이 과정에서 얻어지는 Localization한 정보들을 얻고, 이 둘을 Skip Architecture를 통해 연결해줌으로써 Segmentation 작업을 효과적으로 해줄 수 있는 모델입니다.
논문이 2015년도에 발표되었음에도 불구하고, 현재도 해당 모델은 Segmentation 작업간 Baseline 모델로서 많이 활용되고 있습니다.
이러한 U-Net에 대한 이론을 지난 포스팅에서 알아보았으니, 이번에는 파이토치를 통한 코딩을 통해 직접 구현해보도록 하겠습니다.
코드 활용의 편의를 위해 구글 코랩(Colab)에서 작성하였으며, 사용된 데이터는 Segmentation 데이터로서 만이 활용되는 Cityscape Dataset 입니다.
1. Cityscapes Dataset
- 해당 데이터는 CityScapes Dataset이라는 사이트에서 발췌한 데이터입니다.
https://www.cityscapes-dataset.com/
- 해당 사이트에서 직접 데이터를 다운로드 받을수도 있겠지만, 가입 간 근무하는 학술기관 또는 기업의 이메일을 적어야하는 부분에서 막혀 Medium 포스팅처럼 Kaggle에 올라와있는 Cityscapes dataset 중 하나를 활용하였습니다.
https://www.kaggle.com/datasets/dansbecker/cityscapes-image-pairs
- 이 데이터는 독일에서 운전한 차량에서 촬영하였으며 레이블이 지정된 비디오 셋입니다. 이 데이터셋은 원본 비디오로부터의 이미지와 Semantic segmentation 레이블 또한 함께 표시된 데이터입니다. 아래는 데이터셋 중 예시 입니다
* 왼쪽은 이미지 데이터의 원본이며, 오른쪽은 Semantic Segmentation이 된 결과물로 이는 Ground Truth로 사용되어 이후 만들 U-Net 모델의 예측값의 평가 기준으로서 사용되겠습니다.
2. 파이토치를 활용한 U-Net 모델 설계
- 먼저 모델 구축 및 실행 간 사용할 라이브러리들을 불러옵니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import os
from PIL import Image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from tqdm.notebook import tqdm
|
cs |
* 대부분 일반적인 데이터처리에 활용되는 라이브러리(numpy, pandas)와 이미지 또는 그림 출력(PIL, pyplot), 그리고 Pytorch를 활용하여 딥러닝 모델을 구현하기 위해 사용되는 라이브러리(torch, nn, functional, optim. Dataset, DataLoader, transforms), 그리고 모델 실행 간 소요시간 등을 확인하기 위한 tqdm이 있습니다.
* 이 중 일반적이지 않은 라이브러리는 Kmeasn로, 이는 scikit learn에서 K-means 클러스터링을 하기 위해 사용되는 라이브러리 입니다. 이는 색상 공간에서 데이터 포인트를 그룹화하여 색상의 수를 줄이고, 이를 통해 이미지의 색상 팔레트를 단순화하기 위함입니다. (너무 많은 Semantic 요소들은 Segmentation의 품질을 저하시킬 수 있습니다)
- 다음은 학습간 GPU 활용을 위해 설정하겠습니다.
1
2
3
4
|
# GPU 사용이 가능할 경우, GPU를 사용할 수 있게 함.
device = "cuda:0" if torch.cuda.is_available() else "cpu"
device = torch.device(device)
print(device)
|
cs |
* 저는 코랩 환경에서 T4 GPU 사용을 설정하였기 때문에 위처럼 "cuda:0"이 출력되었으며 이는 GPU 사용이 가능함을 의미합니다.
- 다운로드 받은 파일을 불러옵니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# 자신의 폴더 경로에 맞게 재지정해주세요.
root_path = '/content/drive/MyDrive/cityscapes_data'
data_dir = root_path
# data_dir의 경로(문자열)와 train(문자열)을 결합해서 train_dir(train 폴더의 경로)에 저장합니다.
train_dir = os.path.join(data_dir, "train")
# data_dir의 경로(문자열)와 val(문자열)을 결합해서 val_dir(val 폴더의 경로)에 저장합니다.
val_dir = os.path.join(data_dir, "val")
# train_dir 경로에 있는 모든 파일을 리스트의 형태로 불러와서 train_fns에 저장합니다.
train_fns = os.listdir(train_dir)
# val_dir 경로에 있는 모든 파일을 리스트의 형태로 불러와서 val_fns에 저장합니다.
val_fns = os.listdir(val_dir)
print(len(train_fns), len(val_fns))
|
cs |
* train 데이터는 2975개의 이미지, validation 데이터는 500개의 이미지 임을 확인할 수 있습니다.
- PIL 라이브러리의 Image 모듈을 활용해 학습용 이미지를 불러오고 이를 출력해봅니다.
1
2
3
4
5
6
7
8
|
# train_dir(문자열)와 train_fns[0](문자열)의 경로를 결합하여 sample_image_fp(샘플 이미지의 경로)에 저장합니다.
sample_image_fp = os.path.join(train_dir, train_fns[1])
# PIL 라이브러리의 Image 모듈을 사용하여, sample_image_fp를 불러옵니다.
sample_image = Image.open(sample_image_fp)
plt.imshow(sample_image)
plt.show()
|
cs |
* 위 코드에서 train_fns[숫자] <-- 안의 숫자를 변경해서 train 데이터에 있는 다른 이미지를 볼 수 있습니다.
- 이제 Output label을 정의해줍니다.
1
2
3
4
5
6
7
|
#Output label 정의하기
num_items = 1000
# 0~255 사이의 숫자를 3*num_items번 랜덤하게 뽑기
color_array = np.random.choice(range(256), 3*num_items).reshape(-1, 3)
print(color_array.shape)
|
cs |
* 1000개의 색상 데이터 포인트를 생성한 뒤, 0~255 사이의 값을 가진 3x1000 크기의 배열을 color_array로 만들어주며 이는 RGB 색상 공간에서 랜덤한 색을 나타내도록 해줍니다.
- K-Means 클러스터링을 활용해, 10개의 클러스터로 color_array에 대해 군집화를 준비합니다.
1
2
3
4
5
|
num_classes = 10
# K-means clustering 알고리즘을 사용하여 label_model에 저장합니다.
label_model = KMeans(n_clusters = num_classes)
label_model.fit(color_array)
|
cs |
* 이는 Output label에 활용할 클래스를 10개로 한정하고, 각 클래스에 대한 색을 부여하기 위함입니다.
- Original image와 Labeld image를 분리해주고, K-means 클러스터링을 사용해 레이블 이미지의 색상을 분류한 뒤 결과를 시각화 해줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# 이전에 샘플이미지에서 볼 수 있듯이, original image와 labeled image가 연결되어 있는데 이를 분리해줍니다.
def split_image(image) :
image = np.array(image)
# 이미지의 크기가 256 x 512 였는데 이를 original image와 labeled image로 분리하기 위해 리스트로 슬라이싱 합니다.
# 그리고 분리된 이미지를 각각 cityscape(= original image)와 label(= labeled image)에 저장합니다.
cityscape, label = image[:, :256, :], image[:, 256:, :]
return cityscape, label
# 바로 이전 코드에서 정의한 split_image() 함수를 이용하여 sample_image를 분리한 후, cityscape과 label에 각각 저장합니다.
cityscape, label = split_image(sample_image)
label_class = label_model.predict(label.reshape(-1, 3)).reshape(256, 256)
fig, axes = plt.subplots(1, 3, figsize = (15, 5))
axes[0].imshow(cityscape)
axes[1].imshow(label)
axes[2].imshow(label_class)
plt.show()
|
cs |
* 원본 데이터는 cityscape에, 레이블된 이미지는 label에 저장하고, 이전에 지정 한 label_model.predict를 사용해서 레이블 이미지의 각 픽셀에 대한 클러스터 레이블을 예측해줍니다.
* 좌측은 원본 이미지, 중간은 원본에서 labeled 이미지, 우측은 이번에 10개의 종류로 K-Means 클러스터링한 결과를 중간에 위치한 labeled 된 이미지에 적용한 결과입니다.
- 데이터셋 정의
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
class CityscapeDataset(Dataset):
def __init__(self, image_dir, label_model):
self.image_dir = image_dir
self.image_fns = os.listdir(image_dir)
self.label_model = label_model
def __len__(self) :
return len(self.image_fns)
def __getitem__(self, index) :
image_fn = self.image_fns[index]
image_fp = os.path.join(self.image_dir, image_fn)
image = Image.open(image_fp)
image = np.array(image)
cityscape, label = self.split_image(image)
label_class = self.label_model.predict(label.reshape(-1, 3)).reshape(256, 256)
label_class = torch.Tensor(label_class).long()
cityscape = self.transform(cityscape)
return cityscape, label_class
def split_image(self, image) :
image = np.array(image)
cityscape, label = image[ : , :256, : ], image[ : , 256: , : ]
return cityscape, label
def transform(self, image) :
transform_ops = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean = (0.485, 0.56, 0.406), std = (0.229, 0.224, 0.225))
])
return transform_ops(image)
|
cs |
* 이 코드는 파이토치의 DataLoader와 함께 사용되어 배치 단위로 이미지를 로드하고, 세그멘테이션 모델을 훈련하거나 평가하기 위해 구성되어 있습니다.
* __init__ 메소드는 이미지 파일불러오고 / __len__ 메소드는 데이터셋에 있는 총 이미지수를 반화하며 / __getitem__ 메소드는 이미지를 불러오고 전처리하며 분리하고, K-means 클러스터링을 사용해 세그멘테이션된 레이블로 변환하는 작업을 하고 / __split_image__ 메소드는 입력 이미지와 원본 이미지, 레이블 이미지로 분리하며 / transform 메소드는 이와 같이 처리된 이미지 데이터를 텐서의 형태로 변환하고 정규화를 진행해줍니다.
- 데이터셋을 불러오고 텐서 형태로 변환된 것을 확인해줍니다.
1
2
3
4
5
6
|
dataset = CityscapeDataset(train_dir, label_model)
print(len(dataset))
cityscape, label_class = dataset[0]
print(cityscape.shape)
print(label_class.shape)
|
cs |
- 이전 포스팅에서 다루었던 논문 형태 그대로의 U-Net 모델을 정의하겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
class UNet(nn.Module):
def __init__(self, num_classes):
super(UNet, self).__init__()
self.num_classes = num_classes
self.contracting_11 = self.conv_block(in_channels=3, out_channels=64)
self.contracting_12 = nn.MaxPool2d(kernel_size=2, stride=2)
self.contracting_21 = self.conv_block(in_channels=64, out_channels=128)
self.contracting_22 = nn.MaxPool2d(kernel_size=2, stride=2)
self.contracting_31 = self.conv_block(in_channels=128, out_channels=256)
self.contracting_32 = nn.MaxPool2d(kernel_size=2, stride=2)
self.contracting_41 = self.conv_block(in_channels=256, out_channels=512)
self.contracting_42 = nn.MaxPool2d(kernel_size=2, stride=2)
self.middle = self.conv_block(in_channels=512, out_channels=1024)
self.expansive_11 = nn.ConvTranspose2d(in_channels=1024, out_channels=512, kernel_size=3, stride=2, padding=1, output_padding=1)
self.expansive_12 = self.conv_block(in_channels=1024, out_channels=512)
self.expansive_21 = nn.ConvTranspose2d(in_channels=512, out_channels=256, kernel_size=3, stride=2, padding=1, output_padding=1)
self.expansive_22 = self.conv_block(in_channels=512, out_channels=256)
self.expansive_31 = nn.ConvTranspose2d(in_channels=256, out_channels=128, kernel_size=3, stride=2, padding=1, output_padding=1)
self.expansive_32 = self.conv_block(in_channels=256, out_channels=128)
self.expansive_41 = nn.ConvTranspose2d(in_channels=128, out_channels=64, kernel_size=3, stride=2, padding=1, output_padding=1)
self.expansive_42 = self.conv_block(in_channels=128, out_channels=64)
self.output = nn.Conv2d(in_channels=64, out_channels=num_classes, kernel_size=3, stride=1, padding=1)
def conv_block(self, in_channels, out_channels):
block = nn.Sequential(nn.Conv2d(in_channels=in_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.BatchNorm2d(num_features=out_channels),
nn.Conv2d(in_channels=out_channels, out_channels=out_channels, kernel_size=3, stride=1, padding=1),
nn.ReLU(),
nn.BatchNorm2d(num_features=out_channels))
return block
def forward(self, X):
contracting_11_out = self.contracting_11(X) # [-1, 64, 256, 256]
contracting_12_out = self.contracting_12(contracting_11_out) # [-1, 64, 128, 128]
contracting_21_out = self.contracting_21(contracting_12_out) # [-1, 128, 128, 128]
contracting_22_out = self.contracting_22(contracting_21_out) # [-1, 128, 64, 64]
contracting_31_out = self.contracting_31(contracting_22_out) # [-1, 256, 64, 64]
contracting_32_out = self.contracting_32(contracting_31_out) # [-1, 256, 32, 32]
contracting_41_out = self.contracting_41(contracting_32_out) # [-1, 512, 32, 32]
contracting_42_out = self.contracting_42(contracting_41_out) # [-1, 512, 16, 16]
middle_out = self.middle(contracting_42_out) # [-1, 1024, 16, 16]
expansive_11_out = self.expansive_11(middle_out) # [-1, 512, 32, 32]
expansive_12_out = self.expansive_12(torch.cat((expansive_11_out, contracting_41_out), dim=1)) # [-1, 1024, 32, 32] -> [-1, 512, 32, 32]
expansive_21_out = self.expansive_21(expansive_12_out) # [-1, 256, 64, 64]
expansive_22_out = self.expansive_22(torch.cat((expansive_21_out, contracting_31_out), dim=1)) # [-1, 512, 64, 64] -> [-1, 256, 64, 64]
expansive_31_out = self.expansive_31(expansive_22_out) # [-1, 128, 128, 128]
expansive_32_out = self.expansive_32(torch.cat((expansive_31_out, contracting_21_out), dim=1)) # [-1, 256, 128, 128] -> [-1, 128, 128, 128]
expansive_41_out = self.expansive_41(expansive_32_out) # [-1, 64, 256, 256]
expansive_42_out = self.expansive_42(torch.cat((expansive_41_out, contracting_11_out), dim=1)) # [-1, 128, 256, 256] -> [-1, 64, 256, 256]
output_out = self.output(expansive_42_out) # [-1, num_classes, 256, 256]
return output_out
|
cs |
* UNet 논문에서는 3x3covolutional block을 거칠때 마다 feature map의 크기가 2씩 감소하는데, 위 코드에서는 convolutional block에 크기 1의 패딩을 넣어줌으로써 feature map의 크기가 감소하는 것을 방지해줍니다
(maxpooling에 의해서 줄어드는 것은 동일합니다)
- 모델에 10개의 클래스가 도출되는 것을 정의하고, batch size를 4로 정의한뒤 학습 데이터의 형태를 확인해줍니다.
1
2
3
4
5
6
7
8
|
model = UNet(num_classes=num_classes)
data_loader = DataLoader(dataset, batch_size = 4)
print(len(dataset), len(data_loader))
X, Y = next(iter(data_loader))
print(X.shape)
print(Y.shape)
|
cs |
* 총 학습 데이터의 개수가 2975개이고, 1개의 배치사이즈가 4이므로 2975개를 4등분한 것 중 하나의 데이터로더 크기가 744개임을 확인할 수 있습니다
* X 값은 4개의 배치사이즈 / 3개의 채널 / 높이 256 / 너비 256
* Y 값은 4개의 배차시이즈 / 높이 256 / 너비 256
임을 알 수 있습니다.
- X값을 넣어서 도출된 Y 예측값의 형태를 확인해봅니다
1
2
|
Y_pred = model(X)
print(Y_pred.shape)
|
cs |
*모델에서 정의했듯, 10개의 클래스가 반영되어
4개의 배치사이즈 / 10개의 클래스 / 높이 256 / 너비 256 이 예측값의 형태로 도출되었습니다.
- 모델을 구동합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
batch_size = 4
epochs = 10
lr = 0.01
dataset = CityscapeDataset(train_dir, label_model)
data_loader = DataLoader(dataset, batch_size = batch_size)
model = UNet(num_classes = num_classes).to(device)
# 손실함수 정의
criterion = nn.CrossEntropyLoss()
# Optimizer 정의
optimizer = optim.Adam(model.parameters(), lr = lr)
step_losses = []
epoch_losses = []
for epoch in tqdm(range(epochs)) :
epoch_loss = 0
for X, Y in tqdm(data_loader, total = len(data_loader), leave = False) :
X, Y = X.to(device), Y.to(device)
optimizer.zero_grad()
Y_pred = model(X)
loss = criterion(Y_pred, Y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
step_losses.append(loss.item())
epoch_losses.append(epoch_loss/len(data_loader))
|
cs |
* 약 40분의 시간이 소요되었습니다. (1번의 에포크당 약 238초 소요 / 10번 반복)
- Step loss와 Epoch loss를 그래프로 가시화합니다.
1
2
3
4
5
|
fig, axes = plt.subplots(1, 2, figsize=(10, 5))
axes[0].plot(step_losses)
axes[1].plot(epoch_losses)
plt.show()
|
cs |
* Step loss는 1배치당 발생하는 loss이며, Epoch loss는 1Epoch의 모든 배치의 loss를 평균한 값을 의미합니다.
- 학습된 모델을 저장해줍니다
1
2
|
model_name = "UNet.pth"
torch.save(model.state_dict(), root_path + model_name)
|
cs |
- 저장된 모델을 불러와서 테스트 데이터 셋을 세그멘테이션 할 준비를 합니다.
1
2
3
|
model_path = root_path + model_name
model_ = UNet(num_classes = num_classes).to(device)
model_.load_state_dict(torch.load(model_path))
|
cs |
- 테스트 배치 사이즈를 8로 설정하고, X와 Y로 구분하고 모델에 넣어줍니다.
1
2
3
4
5
6
7
8
9
10
|
test_batch_size = 8
dataset = CityscapeDataset(val_dir, label_model)
data_loader = DataLoader(dataset, batch_size = test_batch_size)
X,Y = next(iter(data_loader))
X,Y = X.to(device), Y.to(device)
Y_pred = model_(X)
print(Y_pred.shape)
Y_pred = torch.argmax(Y_pred, dim=1)
print(Y_pred.shape)
|
cs |
-
1
2
3
|
inverse_transform = transforms.Compose([
transforms.Normalize((-0.485/0.229, -0.456/0.224, -0.406/0.225), (1/0.229, 1/0.224, 1/0.225))
])
|
cs |
- 테스트 데이터를 모델에 넣고 epoch에 따라 학습을 진행해줍니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
fig, axes = plt.subplots(test_batch_size, 3, figsize=(3*5, test_batch_size*5))
iou_scores = []
for i in range(test_batch_size):
landscape = inverse_transform(X[i]).permute(1, 2, 0).cpu().detach().numpy()
label_class = Y[i].cpu().detach().numpy()
label_class_predicted = Y_pred[i].cpu().detach().numpy()
# IOU score
intersection = np.logical_and(label_class, label_class_predicted)
union = np.logical_or(label_class, label_class_predicted)
iou_score = np.sum(intersection) / np.sum(union)
iou_scores.append(iou_score)
axes[i, 0].imshow(landscape)
axes[i, 0].set_title("Landscape")
axes[i, 1].imshow(label_class)
axes[i, 1].set_title("Label Class")
axes[i, 2].imshow(label_class_predicted)
axes[i, 2].set_title("Label Class - Predicted")
plt.show()
|
cs |
* Label 클래스에 근접하게 학습 결과물이 나오는 것을 확인할 수 있습니다.
- IOU Score를 계산해봅니다.
1
|
print(sum(iou_scores) / len(iou_scores))
|
cs |
약 98%가 되는 IOU 정확도를 보여줍니다.
댓글