코딩걸음마

[추천 시스템(RS)] AutoEncoder Meet Collaborative Filtering 본문

딥러닝 템플릿/추천시스템(RS) 코드

[추천 시스템(RS)] AutoEncoder Meet Collaborative Filtering

코딩걸음마 2022. 7. 21. 11:32
728x90

Data Loader

data_path = '파일경로'
import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import numpy as np
def read_data(data_path):
    df = pd.read_csv(os.path.join(data_path,'rates.csv'))[:10000]
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=1234, shuffle=True)

    user_to_index = {original: idx for idx, original in enumerate(df.user.unique())}
    movie_to_index = {original: idx for idx, original in enumerate(df.movie.unique())}

    return train_df, val_df, user_to_index, movie_to_index
class KMRDdataset(Dataset):
    def __init__(self, df, user_to_index, movie_to_index, item_based=True):
        self.min_rating = min(df.rate)
        self.max_rating = max(df.rate)

        self.user = [user_to_index[u] for u in df.user.values]
        self.movie = [movie_to_index[m] for m in df.movie.values]
        self.rating = df.rate.values

        if item_based:
          input_tensor = torch.LongTensor([self.movie, self.user])
          self.data = torch.sparse.FloatTensor(input_tensor, torch.FloatTensor(self.rating),
                                             torch.Size([len(movie_to_index), len(user_to_index)])).to_dense()
        else:
          input_tensor = torch.LongTensor([self.user, self.movie])
          self.data = torch.sparse.FloatTensor(input_tensor, torch.FloatTensor(self.rating),
                                             torch.Size([len(user_to_index), len(movie_to_index)])).to_dense()


    def __len__(self):
      return len(self.data)
    
    def __getitem__(self, idx):
      return self.data[idx]
data_path = '파일경로'
train_df, val_df, user_to_index, movie_to_index = read_data(data_path=data_path)
train_dataset = KMRDdataset(train_df, user_to_index, movie_to_index)
val_dataset = KMRDdataset(val_df, user_to_index, movie_to_index)
print(train_df.shape)
print(train_dataset.data[0].size())
print(val_df.shape)
print(val_dataset.data[0].size())
print(len(list(user_to_index.keys())))
train_dataset.data[0]
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=16, shuffle=True)

2. Define AutoEncoder

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.parallel
import torch.optim as optim
import torch.utils.data
from torch.autograd import Variable
import torch.nn.init as weight_init
class SimpleAutoEncoder(nn.Module):
  def __init__(self, num_inputs, num_hiddens, kind='sigmoid', dropout=None):
    super(SimpleAutoEncoder, self).__init__()
    # encoder -> hidden -> decoder
    # input -> hidden -> output
    # input -> hidden : encoder
    # hidden -> output = input : decoder
    self.encoder = nn.Sequential(nn.Linear(num_inputs, num_hiddens), self.activation(kind))
    self.decoder = nn.Sequential(nn.Linear(num_hiddens, num_inputs), self.activation(kind))  

  def activation(self, kind):
    if kind == 'selu':
      return nn.SELU()
    elif kind == 'relu':
      return nn.ReLU()
    elif kind == 'relu6':
      return nn.ReLU6()
    elif kind == 'sigmoid':
      return nn.Sigmoid()
    elif kind == 'tanh':
      return nn.Tanh()
    elif kind == 'elu':
      return nn.ELU()
    elif kind == 'lrelu':
      return nn.LeakyReLU()
    elif kind == 'none':
      return input
    else:
      raise ValueError('Unknown non-linearity type')

  def forward(self, x):
    return self.decoder(self.encoder(x))
class DeepAutoEncoder(nn.Module):
  def __init__(self, num_hiddens, num_layers, dropout=None, nn_type='diamond'):
    super(AutoEncoder, self).__init__()
    # input -> hidden -> output
    # input -> hidden(10) -> ... -> hidden(10) -> output = input
    self.encoder, self.decoder = self.generate_layers(num_hiddens, num_layers, dropout, nn_type)
  
  def forward(self, x):
    return self.decoder(self.encoder(x))
  
  def generate_layers(self, num_hiddens, num_layers, dropout=None, nn_type='diamond'):
    # hidden layers -> [50, 25, 12, 6, 12, 25, 50], [100 50 100] -> 100, 50, 60, 50 100 
    if nn_type == 'diamond':
      encoder_modules = []
      decoder_modules = []

      hidden_layers = []
      temp = num_hiddens
      for idx, x in enumerate(range(num_layers)):
        if idx == 0:
          hidden_layers.append(temp)
        else:
          hidden_layers.append(int(temp/2))
        temp = temp/2
      hidden_layers = [x for x in hidden_layers if x > 10]
      
      # encoder
      for idx, num_hidden in enumerate(hidden_layers):
        if idx < len(hidden_layers)-1:
          encoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
          encoder_modules.append(nn.Sigmoid())

      # decoder
      hidden_layers = list(reversed(hidden_layers))
      for idx, num_hidden in enumerate(hidden_layers):
        if idx < len(hidden_layers)-1:
          decoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
          decoder_modules.append(nn.Identity())

    # num_hidden = 50, num_layers = 3 ->  input_dim -> [50, 50, 50] -> output_dim = input_dim 
    elif nn_type == 'constant':
      hidden_layers = [num_hiddens] * num_layers
      for idx, enc in enumerate(hidden_layers):
        if idx < num_layers-1:
          encoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
          encoder_modules.append(nn.Sigmoid())
          decoder_modules.append(nn.Linear(hidden_layers[idx], hidden_layers[idx+1], bias=True))
          decoder_modules.append(nn.Identity())

    if dropout is not None:    
      encoder_modules = [x for y in (encoder_modules[i:i+2] + [nn.Dropout(dropout)] * (i < len(encoder_modules) - 1) 
                          for i in range(0, len(encoder_modules), 2)) for x in y]
      decoder_modules = [x for y in (decoder_modules[i:i+2] + [nn.Dropout(dropout)] * (i < len(decoder_modules) - 1)
                          for i in range(0, len(decoder_modules), 2)) for x in y]

    encoder = nn.Sequential(*encoder_modules)
    decoder = nn.Sequential(*decoder_modules)
    
    return encoder, decoder

Train

num_users = len(user_to_index.keys())
num_movies = len(movie_to_index.keys())
print(num_users, num_movies)
model = SimpleAutoEncoder(num_inputs=num_users, num_hiddens=100, kind='selu')
model
optimizer = optim.Adam(model.parameters(), lr=1e-3)
def weights_init(m):
    if isinstance(m, nn.Linear):
        torch.nn.init.xavier_uniform_(m.weight)
        torch.nn.init.zeros_(m.bias)

model.apply(weights_init)
train_dataset.data[0].size()
# NVIDIA Recommender System 참고
def MSEloss(inputs, targets, size_average=False):
  mask = targets != 0
  num_ratings = torch.sum(mask.float())
  criterion = nn.MSELoss(reduction='sum' if not size_average else 'mean')
  return criterion(inputs * mask.float(), targets), Variable(torch.Tensor([1.0])) if size_average else num_ratings
model.train()
train_loss = 0
for idx, batch in enumerate(train_dataloader):
    optimizer.zero_grad()
    
    pred = model(batch)
    loss, num_ratings = MSEloss(pred, batch)    
    loss = torch.sqrt(loss / num_ratings)
    loss.backward()
    train_loss += loss.item() 
    optimizer.step()
    
    print(train_loss / (idx+1))
model.eval()
val_loss = 0
with torch.no_grad():
  for idx, batch in enumerate(val_dataloader):
    pred = model(batch)
    loss, num_ratings = MSEloss(pred, batch)
    loss = torch.sqrt(loss / num_ratings)
    val_loss += loss.item()

    print(val_loss/(idx+1))
728x90
Comments