250x250
Notice
Recent Posts
Recent Comments
Link
코딩걸음마
[추천 시스템(RS)] AutoEncoder Meet Collaborative Filtering 본문
728x90
Data Loader
data_path = '파일경로'
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import numpy as np
def read_data(data_path):
df = pd.read_csv(os.path.join(data_path,'rates.csv'))[:10000]
train_df, val_df = train_test_split(df, test_size=0.2, random_state=1234, shuffle=True)
user_to_index = {original: idx for idx, original in enumerate(df.user.unique())}
movie_to_index = {original: idx for idx, original in enumerate(df.movie.unique())}
return train_df, val_df, user_to_index, movie_to_index
class KMRDdataset(Dataset):
def __init__(self, df, user_to_index, movie_to_index, item_based=True):
self.min_rating = min(df.rate)
self.max_rating = max(df.rate)
self.user = [user_to_index[u] for u in df.user.values]
self.movie = [movie_to_index[m] for m in df.movie.values]
self.rating = df.rate.values
if item_based:
input_tensor = torch.LongTensor([self.movie, self.user])
self.data = torch.sparse.FloatTensor(input_tensor, torch.FloatTensor(self.rating),
torch.Size([len(movie_to_index), len(user_to_index)])).to_dense()
else:
input_tensor = torch.LongTensor([self.user, self.movie])
self.data = torch.sparse.FloatTensor(input_tensor, torch.FloatTensor(self.rating),
torch.Size([len(user_to_index), len(movie_to_index)])).to_dense()
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx]
data_path = '파일경로'
train_df, val_df, user_to_index, movie_to_index = read_data(data_path=data_path)
train_dataset = KMRDdataset(train_df, user_to_index, movie_to_index)
val_dataset = KMRDdataset(val_df, user_to_index, movie_to_index)
print(train_df.shape)
print(train_dataset.data[0].size())
print(val_df.shape)
print(val_dataset.data[0].size())
print(len(list(user_to_index.keys())))
train_dataset.data[0]
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True)
2. Define AutoEncoder
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
import torch.nn.init as weight_init
class SimpleAutoEncoder(nn.Module):
def __init__(self, num_inputs, num_hiddens, kind='sigmoid', dropout=None):
super(SimpleAutoEncoder, self).__init__()
# encoder -> hidden -> decoder
# input -> hidden -> output
# input -> hidden : encoder
# hidden -> output = input : decoder
self.encoder = nn.Sequential(nn.Linear(num_inputs, num_hiddens), self.activation(kind))
self.decoder = nn.Sequential(nn.Linear(num_hiddens, num_inputs), self.activation(kind))
def activation(self, kind):
if kind == 'selu':
return nn.SELU()
elif kind == 'relu':
return nn.ReLU()
elif kind == 'relu6':
return nn.ReLU6()
elif kind == 'sigmoid':
return nn.Sigmoid()
elif kind == 'tanh':
return nn.Tanh()
elif kind == 'elu':
return nn.ELU()
elif kind == 'lrelu':
return nn.LeakyReLU()
elif kind == 'none':
return input
else:
raise ValueError('Unknown non-linearity type')
def forward(self, x):
return self.decoder(self.encoder(x))
class DeepAutoEncoder(nn.Module):
def __init__(self, num_hiddens, num_layers, dropout=None, nn_type='diamond'):
super(AutoEncoder, self).__init__()
# input -> hidden -> output
# input -> hidden(10) -> ... -> hidden(10) -> output = input
self.encoder, self.decoder = self.generate_layers(num_hiddens, num_layers, dropout, nn_type)
def forward(self, x):
return self.decoder(self.encoder(x))
def generate_layers(self, num_hiddens, num_layers, dropout=None, nn_type='diamond'):
# hidden layers -> [50, 25, 12, 6, 12, 25, 50], [100 50 100] -> 100, 50, 60, 50 100
if nn_type == 'diamond':
encoder_modules = []
decoder_modules = []
hidden_layers = []
temp = num_hiddens
for idx, x in enumerate(range(num_layers)):
if idx == 0:
hidden_layers.append(temp)
else:
hidden_layers.append(int(temp/2))
temp = temp/2
hidden_layers = [x for x in hidden_layers if x > 10]
# encoder
for idx, num_hidden in enumerate(hidden_layers):
if idx < len(hidden_layers)-1:
encoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
encoder_modules.append(nn.Sigmoid())
# decoder
hidden_layers = list(reversed(hidden_layers))
for idx, num_hidden in enumerate(hidden_layers):
if idx < len(hidden_layers)-1:
decoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
decoder_modules.append(nn.Identity())
# num_hidden = 50, num_layers = 3 -> input_dim -> [50, 50, 50] -> output_dim = input_dim
elif nn_type == 'constant':
hidden_layers = [num_hiddens] * num_layers
for idx, enc in enumerate(hidden_layers):
if idx < num_layers-1:
encoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
encoder_modules.append(nn.Sigmoid())
decoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
decoder_modules.append(nn.Identity())
if dropout is not None:
encoder_modules = [x for y in (encoder_modules[i:i+2] + [nn.Dropout(dropout)] * (i < len(encoder_modules) - 1)
for i in range(0, len(encoder_modules), 2)) for x in y]
decoder_modules = [x for y in (decoder_modules[i:i+2] + [nn.Dropout(dropout)] * (i < len(decoder_modules) - 1)
for i in range(0, len(decoder_modules), 2)) for x in y]
encoder = nn.Sequential(*encoder_modules)
decoder = nn.Sequential(*decoder_modules)
return encoder, decoder
Train
num_users = len(user_to_index.keys())
num_movies = len(movie_to_index.keys())
print(num_users, num_movies)
model = SimpleAutoEncoder(num_inputs=num_users, num_hiddens=100, kind='selu')
model
optimizer = optim.Adam(model.parameters(), lr=1e-3)
def weights_init(m):
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight)
torch.nn.init.zeros_(m.bias)
model.apply(weights_init)
train_dataset.data[0].size()
# NVIDIA Recommender System 참고
def MSEloss(inputs, targets, size_average=False):
mask = targets != 0
num_ratings = torch.sum(mask.float())
criterion = nn.MSELoss(reduction='sum' if not size_average else 'mean')
return criterion(inputs * mask.float(), targets), Variable(torch.Tensor([1.0])) if size_average else num_ratings
model.train()
train_loss = 0
for idx, batch in enumerate(train_dataloader):
optimizer.zero_grad()
pred = model(batch)
loss, num_ratings = MSEloss(pred, batch)
loss = torch.sqrt(loss / num_ratings)
loss.backward()
train_loss += loss.item()
optimizer.step()
print(train_loss / (idx+1))
model.eval()
val_loss = 0
with torch.no_grad():
for idx, batch in enumerate(val_dataloader):
pred = model(batch)
loss, num_ratings = MSEloss(pred, batch)
loss = torch.sqrt(loss / num_ratings)
val_loss += loss.item()
print(val_loss/(idx+1))
728x90
'딥러닝 템플릿 > 추천시스템(RS) 코드' 카테고리의 다른 글
[추천 시스템(RS)] DeepFM Frame (0) | 2022.07.21 |
---|---|
[추천 시스템(RS)] Wide & Deep Learning for Recommender System (0) | 2022.07.20 |
[추천 시스템(RS)] Factorization Machine (0) | 2022.07.19 |
[추천 시스템(RS)] Neural Collaborative Filtering (0) | 2022.07.19 |
[추천 시스템(RS)] Matrix Factorization (0) | 2022.07.19 |
Comments