Collaborative Filtering with Python and Spark

10 minute read

Recommendation systems are a collection of algorithms used to recommend items to users based on information taken from the user. These systems have become ubiquitous can be commonly seen in online stores, movies databases and job finders.

Table of contents


The installation of Python and Pyspark and the introduction of theory the Recommendation systems is given here.

Understanding the Data

To acquire and extract the data, Dataset acquired from GroupLens. Lets download the dataset.

import requests
import zipfile
url = r'https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0101ENv3/labs/moviedataset.zip'
output ='moviedataset.zip'
r = requests.get(url)
with open(output, 'wb') as f:
    f.write(r.content)
with zipfile.ZipFile(output) as item:
    item.extractall()
#Dataframe manipulation library
import pandas as pd
#Math functions, we'll only need the sqrt function so let's import only that
from math import sqrt
import numpy as np
#Storing the movie information into a pandas dataframe
movies_df = pd.read_csv('./ml-latest/movies.csv')
#Storing the user information into a pandas dataframe
ratings_df = pd.read_csv('./ml-latest/ratings.csv')
#Head is a function that gets the first N rows of a dataframe. N's default is 5.
movies_df.head(1)
movieId title genres
0 1 Toy Story (1995) Adventure|Animation|Children|Comedy|Fantasy

Let’s remove the year from the title column and place it into its own one by using the handy .

#Using regular expressions to find a year stored between parentheses
#We specify the parantheses so we don't conflict with movies that have years in their titles
movies_df['year'] = movies_df.title.str.extract('(\(\d\d\d\d\))',expand=False)
#Removing the parentheses
movies_df['year'] = movies_df.year.str.extract('(\d\d\d\d)',expand=False)
#Removing the years from the 'title' column
movies_df['title'] = movies_df.title.str.replace('(\(\d\d\d\d\))', '')
#Applying the strip function to get rid of any ending whitespace characters that may have appeared
movies_df['title'] = movies_df['title'].apply(lambda x: x.strip())
movies_df.head(1)
movieId title genres year
0 1 Toy Story Adventure|Animation|Children|Comedy|Fantasy 1995
#Dropping the genres column
movies_df = movies_df.drop('genres', 1)
movies_df.head(2)
movieId title year
0 1 Toy Story 1995
1 2 Jumanji 1995
ratings_df.head(2)
userId movieId rating timestamp
0 1 169 2.5 1204927694
1 1 2471 3.0 1204927438

Every row in the ratings dataframe has a user id associated with at least one movie, a rating and a timestamp showing when they reviewed it. We won’t be needing the timestamp column, so let’s drop it to save on memory.

#Drop removes a specified row or column from a dataframe
ratings_df = ratings_df.drop('timestamp', 1)
ratings_df.to_csv("ratings.csv", encoding='utf-8', index=False)

The process for creating a User Based recommendation system is as follows:

  • Select a user with the movies the user has watched
  • Based on his rating to movies, find the top X neighbours
  • Get the watched movie record of the user for each neighbour.
  • Calculate a similarity score using some formula
  • Recommend the items with the highest score

Let’s begin by creating an input user to recommend movies to:

Notice: To add more movies, simply increase the amount of elements in the userInput.

Collaborative Filtering with Python

#Dataframe manipulation library
import pandas as pd
#Math functions, we'll only need the sqrt function so let's import only thatfrom math 
import sqrtimport numpy as np
import matplotlib.pyplot as plt%matplotlib inline

The first technique we’re going to take a look at is called Collaborative Filtering, which is also known as User-User Filtering.

It attempts to find users that have similar preferences and opinions as the input and then recommends items that they have liked to the input.

The process for creating a User Based recommendation system is as follows:

  • Select a user with the movies the user has watched
  • Based on his rating to movies, find the top X neighbours
  • Get the watched movie record of the user for each neighbour.
  • Calculate a similarity score using some formula
  • Recommend the items with the highest score
userInput = [
            {'title':'Breakfast Club, The', 'rating':5},
            {'title':'Toy Story', 'rating':3.5},
            {'title':'Jumanji', 'rating':2},
            {'title':"Pulp Fiction", 'rating':5},
            {'title':'Akira', 'rating':4.5}
         ] 
inputMovies = pd.DataFrame(userInput)
inputMovies
title rating
0 Breakfast Club, The 5.0
1 Toy Story 3.5
2 Jumanji 2.0
3 Pulp Fiction 5.0
4 Akira 4.5

Add movieId to input user

With the input complete, let’s extract the input movies’s ID’s from the movies dataframe and add them into it.

We can achieve this by first filtering out the rows that contain the input movies’ title and then merging this subset with the input dataframe. We also drop unnecessary columns for the input to save memory space.

#Filtering out the movies by title
inputId = movies_df[movies_df['title'].isin(inputMovies['title'].tolist())]

#Then merging it so we can get the movieId. It's implicitly merging it by title.
inputMovies = pd.merge(inputId, inputMovies)
#Dropping information we won't use from the input dataframe
inputMovies = inputMovies.drop('year', 1)
#Final input dataframe
#If a movie you added in above isn't here, then it might not be in the original 
#dataframe or it might spelled differently, please check capitalisation.
inputMovies
movieId title rating
0 1 Toy Story 3.5
1 2 Jumanji 2.0
2 296 Pulp Fiction 5.0
3 1274 Akira 4.5
4 1968 Breakfast Club, The 5.0

The users who has seen the same movies

Now with the movie ID’s in our input, we can now get the subset of users that have watched and reviewed the movies in our input.

#Filtering out users that have watched movies that the input has watched and storing it
userSubset = ratings_df[ratings_df['movieId'].isin(inputMovies['movieId'].tolist())]
userSubset.head()
userId movieId rating
19 4 296 4.0
441 12 1968 3.0
479 13 2 2.0
531 13 1274 5.0
681 14 296 2.0
#Groupby creates several sub dataframes where they all have the same value in the column specified as the parameter
userSubsetGroup = userSubset.groupby(['userId'])

lets look at one of the users, e.g. the one with userID=1130

userSubsetGroup.get_group(1130)
userId movieId rating
104167 1130 1 0.5
104168 1130 2 4.0
104214 1130 296 4.0
104363 1130 1274 4.5
104443 1130 1968 4.5
#Sorting it so users with movie most in common with the input will have priority
userSubsetGroup = sorted(userSubsetGroup,  key=lambda x: len(x[1]), reverse=True)

Now lets look at the first user

userSubsetGroup[0:3]
[(75,
        userId  movieId  rating
  7507      75        1     5.0
  7508      75        2     3.5
  7540      75      296     5.0
  7633      75     1274     4.5
  7673      75     1968     5.0),
 (106,
        userId  movieId  rating
  9083     106        1     2.5
  9084     106        2     3.0
  9115     106      296     3.5
  9198     106     1274     3.0
  9238     106     1968     3.5),
 (686,
         userId  movieId  rating
  61336     686        1     4.0
  61337     686        2     3.0
  61377     686      296     4.0
  61478     686     1274     4.0
  61569     686     1968     5.0)]

We will select a subset of users to iterate through. This limit is imposed because we don’t want to waste too much time going through every single user.

userSubsetGroup = userSubsetGroup[0:100]

Next, we are going to compare all users to our specified user and find the one that is most similar.
we’re going to find out how similar each user is to the input through the Pearson Correlation Coefficient. It is used to measure the strength of a linear association between two variables. Pearson correlation is invariant to scaling, i.e. multiplying all elements by a nonzero constant or adding any constant to all elements. .

The values given by the formula vary from r = -1 to r = 1, where 1 forms a direct correlation between the two entities (it means a perfect positive correlation) and -1 forms a perfect negative correlation. In our case, a 1 means that the two users have similar tastes while a -1 means the opposite.

Now, we calculate the Pearson Correlation between input user and subset group, and store it in a dictionary, where the key is the user Id and the value is the coefficient

#Store the Pearson Correlation in a dictionary, where the key is the user Id and the value is the coefficient
pearsonCorrelationDict = {}

#For every user group in our subset
for name, group in userSubsetGroup:
    #Let's start by sorting the input and current user group so the values aren't mixed up later on
    group = group.sort_values(by='movieId')
    inputMovies = inputMovies.sort_values(by='movieId')
    #Get the N for the formula
    nRatings = len(group)
    #Get the review scores for the movies that they both have in common
    temp_df = inputMovies[inputMovies['movieId'].isin(group['movieId'].tolist())]
    #And then store them in a temporary buffer variable in a list format to facilitate future calculations
    tempRatingList = temp_df['rating'].tolist()
    #Let's also put the current user group reviews in a list format
    tempGroupList = group['rating'].tolist()
    #Now let's calculate the pearson correlation between two users, so called, x and y
    Sxx = sum([i**2 for i in tempRatingList]) - pow(sum(tempRatingList),2)/float(nRatings)
    Syy = sum([i**2 for i in tempGroupList]) - pow(sum(tempGroupList),2)/float(nRatings)
    Sxy = sum( i*j for i, j in zip(tempRatingList, tempGroupList)) - sum(tempRatingList)*sum(tempGroupList)/float(nRatings)
    
    #If the denominator is different than zero, then divide, else, 0 correlation.
    if Sxx != 0 and Syy != 0:
        pearsonCorrelationDict[name] = Sxy/sqrt(Sxx*Syy)
    else:
        pearsonCorrelationDict[name] = 0

pearsonDF = pd.DataFrame.from_dict(pearsonCorrelationDict, orient='index')
pearsonDF.columns = ['similarityIndex']
pearsonDF['userId'] = pearsonDF.index
pearsonDF.index = range(len(pearsonDF))
pearsonDF.head()
similarityIndex userId
0 0.827278 75
1 0.586009 106
2 0.832050 686
3 0.576557 815
4 0.943456 1040

The top x similar users to input user

Now let’s get the top 50 users that are most similar to the input.

topUsers=pearsonDF.sort_values(by='similarityIndex', ascending=False)[0:50]
topUsers.head()
similarityIndex userId
64 0.961678 12325
34 0.961538 6207
55 0.961538 10707
67 0.960769 13053
4 0.943456 1040

Rating of selected users to all movies

We’re going to do this by taking the weighted average of the ratings of the movies using the Pearson Correlation as the weight. But to do this, we first need to get the movies watched by the users in our pearsonDF from the ratings dataframe and then store their correlation in a new column called _similarityIndex”. This is achieved below by merging of these two tables.

topUsersRating=topUsers.merge(ratings_df, left_on='userId', right_on='userId', how='inner')topUsersRating.head()
similarityIndex userId movieId rating
0 0.961678 12325 1 3.5
1 0.961678 12325 2 1.5
2 0.961678 12325 3 3.0
3 0.961678 12325 5 0.5
4 0.961678 12325 6 2.5

Now all we need to do is simply multiply the movie rating by its weight (The similarity index), then sum up the new ratings and divide it by the sum of the weights.

We can easily do this by simply multiplying two columns, then grouping up the dataframe by movieId and then dividing two columns:

It shows the idea of all similar users to candidate movies for the input user:

#Multiplies the similarity by the user's ratings
topUsersRating['weightedRating'] = topUsersRating['similarityIndex']*topUsersRating['rating']
topUsersRating.head()
similarityIndex userId movieId rating weightedRating
0 0.961678 12325 1 3.5 3.365874
1 0.961678 12325 2 1.5 1.442517
2 0.961678 12325 3 3.0 2.885035
3 0.961678 12325 5 0.5 0.480839
4 0.961678 12325 6 2.5 2.404196
#Applies a sum to the topUsers after grouping it up by userId
tempTopUsersRating = topUsersRating.groupby('movieId').sum()[['similarityIndex','weightedRating']]
tempTopUsersRating.columns = ['sum_similarityIndex','sum_weightedRating']
tempTopUsersRating.head()
sum_similarityIndex sum_weightedRating
movieId
1 38.376281 140.800834
2 38.376281 96.656745
3 10.253981 27.254477
4 0.929294 2.787882
5 11.723262 27.151751
#Creates an empty dataframe
recommendation_df = pd.DataFrame()
#Now we take the weighted average
recommendation_df['weighted average recommendation score'] = tempTopUsersRating['sum_weightedRating']/tempTopUsersRating['sum_similarityIndex']
recommendation_df['movieId'] = tempTopUsersRating.index
recommendation_df.head()
weighted average recommendation score movieId
movieId
1 3.668955 1
2 2.518658 2
3 2.657941 3
4 3.000000 4
5 2.316058 5

Now let’s sort it and see the top 20 movies that the algorithm recommended!

recommendation_df = recommendation_df.sort_values(by='weighted average recommendation score', ascending=False)recommendation_df.head(10)
weighted average recommendation score movieId
movieId
5073 5.0 5073
3329 5.0 3329
2284 5.0 2284
26801 5.0 26801
6776 5.0 6776
6672 5.0 6672
3759 5.0 3759
3769 5.0 3769
3775 5.0 3775
90531 5.0 90531
movies_df.loc[movies_df['movieId'].isin(recommendation_df.head(10)['movieId'].tolist())]
movieId title year
2200 2284 Bandit Queen 1994
3243 3329 Year My Voice Broke, The 1987
3669 3759 Fun and Fancy Free 1947
3679 3769 Thunderbolt and Lightfoot 1974
3685 3775 Make Mine Music 1946
4978 5073 Son's Room, The (Stanza del figlio, La) 2001
6563 6672 War Photographer 2001
6667 6776 Lagaan: Once Upon a Time in India 2001
9064 26801 Dragon Inn (Sun lung moon hak chan) 1992
18106 90531 Shame 2011

Collaborative Filtering with Pyspark

First thing to do is start a Spark Session

import findspark
findspark.init()
from pyspark.sql import SparkSession

If your computer has less than 5g of RAM change the memory of spark

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "5g") \
    .appName('rec') \
    .getOrCreate()
from pyspark.ml.evaluation import RegressionEvaluatorfrom pyspark.ml.recommendation import ALS
data = spark.read.csv('ratings.csv',inferSchema=True,header=True)
data.head()
Row(userId=1, movieId=169, rating=2.5)
data.describe().show()
+-------+------------------+------------------+------------------+
|summary|            userId|           movieId|            rating|
+-------+------------------+------------------+------------------+
|  count|          22884377|          22884377|          22884377|
|   mean|123545.22803517876|11408.161728851084|3.5260770044122243|
| stddev|  71474.6902962076| 24136.87588274057|1.0611734340135037|
|    min|                 1|                 1|               0.5|
|    max|            247753|            151711|               5.0|
+-------+------------------+------------------+------------------+

# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

Attention: we requiere 5g of free RAM memory

model = als.fit(training)

Now let’s see how the model performed!

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|178254|    148|   3.0| 4.6890635|
| 90446|    148|   2.0| 2.8395023|
|134189|    148|   5.0| 3.7372081|
|158304|    148|   3.0| 3.2317648|
|236731|    148|   3.0| 2.6928616|
|233017|    148|   2.0| 2.8125327|
|119850|    148|   4.0| 3.2640007|
|219880|    148|   5.0|  5.158979|
|108678|    148|   4.0|  2.820398|
| 48620|    148|   2.0|  3.032034|
|112136|    148|   2.0| 3.3153908|
|147802|    148|   5.0|  3.659751|
| 33400|    148|   3.0| 3.9928482|
| 88163|    148|   3.0| 2.1727314|
|191207|    148|   3.0| 3.5695121|
| 37586|    148|   3.0|0.97570467|
|142515|    148|   3.0| 2.9032598|
|184545|    148|   1.0| 2.0106587|
|169266|    148|   1.0|0.44079798|
| 86384|    148|   3.0|  3.259998|
+------+-------+------+----------+
only showing top 20 rows

So now that we have the model, how would you actually supply a recommendation to a user?

The same way we did with the test data! For example:

single_user = test.filter(test['userId']==11).select(['movieId','userId'])
# User had 10 ratings in the test data set 
single_user.show()
+-------+------+
|movieId|userId|
+-------+------+
|      3|    11|
|    186|    11|
|    908|    11|
|   1220|    11|
|   1225|    11|
|   1266|    11|
|   1275|    11|
|   2599|    11|
|   2641|    11|
|   2687|    11|
|   2993|    11|
|   3686|    11|
|   3826|    11|
+-------+------+

reccomendations = model.transform(single_user)
reccomendations.orderBy('prediction',ascending=False).show()
+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1275|    11| 3.4204795|
|   1225|    11| 3.4091446|
|   2687|    11| 3.3668394|
|   1266|    11|  3.320037|
|    908|    11| 3.2962897|
|   3686|    11|  3.268262|
|   1220|    11|   3.22914|
|   2993|    11|    3.0513|
|   2599|    11| 3.0509036|
|   2641|    11| 2.8271532|
|      3|    11| 2.8164563|
|    186|    11| 2.8024044|
|   3826|    11| 2.7049482|
+-------+------+----------+

You can download the notebook here

Congratulations! We have practiced Collaborative Filtering with Python and Spark

Posted:

Leave a comment