0. 文章說明
首先需要指出的是,代碼是從李宏毅老師的課程中下載的,并不是我自己碼的,這篇文章主要是進行了部分演算法的原理說明,并在原代碼中加了一些講解和注釋,
1. 任務簡介
本次 Pytorch 實戰的目標是做影像的降維及聚類,所謂降維,就是將影像向一個低維空間去投影,比如將一個 28 × 28 28 \times 28 28×28 大小的影像投影到一個 2 2 2 維度的空間,這樣做的目的是去除一些多余的資訊,同時也方便向我們的客戶或者老板展示,在降維之后,我們可以對降維后的影像進行聚類,如: Kmeans演算法,或者簡單的計算各個影像之間的相似度(當兩個影像的相似度大于一定閾值的時候就將二者歸為一類),
2. 方法簡介
接下來我們簡單介紹一下本次實戰需要用到的方法:
- T-SNE
- Kernel-PCA
- Mini Batch KMeans
- Auto-Encoder
2.1 T-SNE
T-SNE 演算法的全名是 T-distributed Stochastic Neighbor Embedding,是一種在影像處理里面經常使用到的降維工具,T-SNE 的思想是將資料投影后,盡量保持原有資料之間的相似度,下圖是摘自李宏毅老師的 PPT ,可以看出,T-SNE 演算法構造了一個關于相似度的概率分布,同時要求投影后資料關于相似度的概率分布函式和原有的盡可能接近,接近程度的衡量指標使用的是KL散度,

T-SNE 演算法中相似度的具體構造如下圖所示,相比于 SNE 演算法,T-SNE 演算法改變了投影后資料的相似度計算方式,使得原資料集中相距較遠的資料點在投影后間隔更大,

2.2 Kernel-PCA
Kernel-PCA 就是核PCA,其實該方法的原理和我們在 SVM 中見到的非線性 SVM演算法里的核函式一致,一般來說,PCA 適用于資料的線性降維,而Kernel-PCA可實作資料的非線性降維,用于處理線性不可分的資料集,
Kernel-PCA的大致思路是:對于輸入空間 (Input space) 中的矩陣 X X X ,我們先用一個非線性映射把 X X X 中的所有樣本映射到一個高維甚至是無窮維的空間(稱為特征空間,Feature space),(使其線性可分),然后在這個高維空間進行PCA降維,Kernel-PCA 的具體推導可以參看這篇博客 核主成分分析(Kernel-PCA),
2.3 Mini Batch KMeans
Mini Batch KMeans 其實是為了減少計算量,每次只抽出一部分資料進行計算,并在計算的程序中不斷加入新的資料,具體的演算法步驟可以參考下面這個圖片以及這篇博客 【聚類演算法】MiniBatchKMeans演算法,

2.4 Auto-Encoder
其實我們觀察很多投影方法,其實都是在找到一個投影矩陣,然后將現有的資料投影到新的空間,那我們不免就會想:全連接的神經網路不就是產生一個投影矩陣,對原有的資料進行操作嗎?我們能不能用神經網路去將資料進行投影呢?答案是能的,這就是 Auto-Encoder 的思想,
我們從下面這種 PPT 來看一下,我們的目的是將一個圖片投影到一個更小的空間,但是神經網路需要一個輸出的真值來計算損失,進而通過后向傳播來不斷更新引數,可是我們沒有真值,這就造成網路沒法訓練了,那怎么辦呢?我們就想到類似于 Seq2seq 模型中的結構,先編碼,再解碼,最終的目標是使得最后輸出的影像和原有的影像盡可能相似,訓練結束后,中間的橙色的部分,就是我們投影后的結果,

那影像該怎么辦呢?中間的編碼層和解碼層該如何構造呢?這時我們通常是這么做的:編碼層主要是卷積層和池化層的堆疊;解碼層通常是 Upooling 層和 Deconvolution 層的堆疊,

Unpooling 是做什么呢?Pooling 是將每四個像素值中的最大值保存下來,進而將一個
4
×
4
4 \times 4
4×4 的影像壓縮到
2
×
2
2 \times 2
2×2 ,而 Unpooling 的一種做法是先將最大值的位置記錄下來,之后把
2
×
2
2 \times 2
2×2的影像擴展到
4
×
4
4 \times 4
4×4 ,再將最大值賦予到之前的位置,并將其余位置的值設為0,

Deconvolution 是做什么呢?如下圖所示,Convolution 是通過一個卷積核,每3個資料乘以對應的系數并相加,得到新的值,而 Deconvolution 是將每個資料乘以三個系數并保存到相應的位置,待所有的資料都乘完后,再將相應位置的資料相加得到最好的值,

3. 代碼實戰
首先下載資料
!gdown --id '1BZb2AqOHHaad7Mo82St1qTBaXo_xtcUc' --output trainX.npy
!gdown --id '152NKCpj8S_zuIx3bQy0NN5oqpvBjdPIq' --output valX.npy
!gdown --id '1_hRGsFtm5KEazUg2ZvPZcuNScGF-ANh4' --output valY.npy
!mkdir checkpoints
!ls
定義我們的 preprocess:將圖片的像素值從介于 0~255 的 int 線性轉化為 0~1 之間的 float,
import numpy as np
def preprocess(image_list):
""" Normalize Image and Permute (N,H,W,C) to (N,C,H,W)
Args:
image_list: List of images (9000, 32, 32, 3)
Returns:
image_list: List of images (9000, 3, 32, 32)
"""
image_list = np.array(image_list)
image_list = np.transpose(image_list, (0, 3, 1, 2))
image_list = (image_list / 255.0) * 2 - 1
image_list = image_list.astype(np.float32)
return image_list
定義 Dataset 類
from torch.utils.data import Dataset
class Image_Dataset(Dataset):
def __init__(self, image_list):
self.image_list = image_list
def __len__(self):
return len(self.image_list)
def __getitem__(self, idx):
images = self.image_list[idx]
return images
from torch.utils.data import DataLoader
trainX = np.load('trainX.npy')
trainX_preprocessed = preprocess(trainX)
img_dataset = Image_Dataset(trainX_preprocessed)
這邊提供一些有用的 functions, 一個是計算 model 引數量的(report 會用到),另一個是固定訓練用的亂數種子(以便 reproduce),
import random
import torch
def count_parameters(model, only_trainable=False):
if only_trainable:
return sum(p.numel() for p in model.parameters() if p.requires_grad)
else:
return sum(p.numel() for p in model.parameters())
def same_seeds(seed):
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed) # if you are using multi-GPU.
np.random.seed(seed) # Numpy module.
random.seed(seed) # Python random module.
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
定義我們的 Auto-Encoder 模型
import torch.nn as nn
class AE(nn.Module):
def __init__(self):
super(AE, self).__init__()
self.encoder = nn.Sequential(
nn.Conv2d(3, 64, 3, stride=1, padding=1),
nn.ReLU(True),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, stride=1, padding=1),
nn.ReLU(True),
nn.MaxPool2d(2),
nn.Conv2d(128, 256, 3, stride=1, padding=1),
nn.ReLU(True),
nn.MaxPool2d(2)
)
self.decoder = nn.Sequential(
nn.ConvTranspose2d(256, 128, 5, stride=1),
nn.ReLU(True),
nn.ConvTranspose2d(128, 64, 9, stride=1),
nn.ReLU(True),
nn.ConvTranspose2d(64, 3, 17, stride=1),
nn.Tanh()
)
def forward(self, x):
x1 = self.encoder(x)
x = self.decoder(x1)
return x1, x
這個部分就是主要的訓練階段,我們先將準備好的 dataset 當做引數喂給dataloader, 將 dataloader、model、loss criterion、optimizer 都準備好之后,就可以開始訓練, 訓練完成后,我們會將 model 存下來,
import torch
from torch import optim
model = AE().cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-5, weight_decay=1e-5)
model.train()
n_epoch = 100
same_seeds(0)
# 準備 dataloader, model, loss criterion 和 optimizer
img_dataloader = DataLoader(img_dataset, batch_size=64, shuffle=True)
# 主要的訓練過程
for epoch in range(n_epoch):
for data in img_dataloader:
img = data
img = img.cuda()
output1, output = model(img)
loss = criterion(output, img)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
torch.save(model.state_dict(), './checkpoints/checkpoint_{}.pth'.format(epoch+1))
print('epoch [{}/{}], loss:{:.5f}'.format(epoch+1, n_epoch, loss.data))
# 訓練完成後儲存 model
torch.save(model.state_dict(), './checkpoints/last_checkpoint.pth')
計算準確率和畫圖的函式,以便于之后使用,
import numpy as np
def cal_acc(gt, pred):
""" Computes categorization accuracy of our task.
Args:
gt: Ground truth labels (9000, )
pred: Predicted labels (9000, )
Returns:
acc: Accuracy (0~1 scalar)
"""
# Calculate Correct predictions
correct = np.sum(gt == pred)
acc = correct / gt.shape[0]
# 因為是 binary unsupervised clustering,因此取 max(acc, 1-acc)
return max(acc, 1-acc)
import matplotlib.pyplot as plt
def plot_scatter(feat, label, savefig=None):
""" Plot Scatter Image.
Args:
feat: the (x, y) coordinate of clustering result, shape: (9000, 2)
label: ground truth label of image (0/1), shape: (9000,)
Returns:
None
"""
X = feat[:, 0]
Y = feat[:, 1]
plt.scatter(X, Y, c = label)
plt.legend(loc='best')
if savefig is not None:
plt.savefig(savefig)
plt.show()
return
- 接著我們使用訓練好的 model,來預測 testing data 的類別,
- 由于 testing data 跟 training data 一樣,因此我們使用同樣的 dataset 來操作 dataloader,與 training 不同的地方在于 shuffle 這個引數在這邊是 False,
- 當準備好 model 和 dataloader,我們就可以進行預測了,
- 我們只需要 encoder 的結果(latents),利用 latents 進行 clustering 之后,就可以分類了,
import torch
from sklearn.decomposition import KernelPCA
from sklearn.manifold import TSNE
from sklearn.cluster import MiniBatchKMeans
def inference(X, model, batch_size=256):
X = preprocess(X)
dataset = Image_Dataset(X)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False)
latents = []
for i, x in enumerate(dataloader):
x = torch.FloatTensor(x)
vec, img = model(x.cuda())
if i == 0:
latents = vec.view(img.size()[0], -1).cpu().detach().numpy()
else:
latents = np.concatenate((latents, vec.view(img.size()[0], -1).cpu().detach().numpy()), axis = 0)
print('Latents Shape:', latents.shape)
return latents
def predict(latents):
# First Dimension Reduction
transformer = KernelPCA(n_components=200, kernel='rbf', n_jobs=-1)
kpca = transformer.fit_transform(latents)
print('First Reduction Shape:', kpca.shape)
# # Second Dimesnion Reduction
X_embedded = TSNE(n_components=2).fit_transform(kpca)
print('Second Reduction Shape:', X_embedded.shape)
# Clustering
pred = MiniBatchKMeans(n_clusters=2, random_state=0).fit(X_embedded)
pred = [int(i) for i in pred.labels_]
pred = np.array(pred)
return pred, X_embedded
def invert(pred):
return np.abs(1-pred)
def save_prediction(pred, out_csv='prediction.csv'):
with open(out_csv, 'w') as f:
f.write('id, label\n')
for i, p in enumerate(pred):
f.write(f'{i},{p}\n')
print(f'Save prediction to {out_csv}.')
# load model
model = AE().cuda()
model.load_state_dict(torch.load('./checkpoints/last_checkpoint.pth'))
model.eval()
# 準備 data
trainX = np.load('trainX.npy')
# 預測答案
latents = inference(X=trainX, model=model)
pred, X_embedded = predict(latents)
# 將預測結果存檔,上傳 kaggle
save_prediction(pred, 'prediction.csv')
# 由於是 unsupervised 的二分類問題,我們只在乎有沒有成功將圖片分成兩群
# 如果上面的檔案上傳 kaggle 後正確率不足 0.5,只要將 label 反過來就行了
save_prediction(invert(pred), 'prediction_invert.csv')
將 val data 的降維結果 (embedding) 與他們對應的 label 可視化一下,
valX = np.load('valX.npy')
valY = np.load('valY.npy')
# ==============================================
# 我們示範 basline model 的作圖,
# report 請同學另外還要再畫一張 improved model 的圖,
# ==============================================
model.load_state_dict(torch.load('./checkpoints/last_checkpoint.pth'))
model.eval()
latents = inference(valX, model)
pred_from_latent, emb_from_latent = predict(latents)
acc_latent = cal_acc(valY, pred_from_latent)
print('The clustering accuracy is:', acc_latent)
print('The clustering result:')
plot_scatter(emb_from_latent, valY, savefig='p1_baseline.png')
可以看出,降維效果蠻不錯的,

使用 test accuracy 最高的 autoencoder,從 trainX 中,取出 index 1, 2, 3, 6, 7, 9 這 6 張圖片,畫出他們的原圖以及 reconstruct 之后的圖片(即從解碼器輸出的圖片),
import matplotlib.pyplot as plt
import numpy as np
# 畫出原圖
plt.figure(figsize=(10,4))
indexes = [1,2,3,6,7,9]
imgs = trainX[indexes,]
for i, img in enumerate(imgs):
plt.subplot(2, 6, i+1, xticks=[], yticks=[])
plt.imshow(img)
# 畫出 reconstruct 的圖
inp = torch.Tensor(trainX_preprocessed[indexes,]).cuda()
latents, recs = model(inp)
recs = ((recs+1)/2 ).cpu().detach().numpy()
recs = recs.transpose(0, 2, 3, 1)
for i, img in enumerate(recs):
plt.subplot(2, 6, 6+i+1, xticks=[], yticks=[])
plt.imshow(img)
plt.tight_layout()
可以看出,還原度還可以,說明模型效果是不錯的,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423343.html
標籤:AI
上一篇:e代表的是什么
