Collaborative Filtering with Python and Spark
Recommendation systems are a collection of algorithms used to recommend items to users based on information taken from the user. These systems have become ubiquitous can be commonly seen in online stores, movies databases and job finders.
Table of contents
The installation of Python and Pyspark and the introduction of theory the Recommendation systems is given here.
Understanding the Data
To acquire and extract the data, Dataset acquired from GroupLens. Lets download the dataset.
import requests
import zipfile
url = r'https://s3-api.us-geo.objectstorage.softlayer.net/cf-courses-data/CognitiveClass/ML0101ENv3/labs/moviedataset.zip'
output ='moviedataset.zip'
r = requests.get(url)
with open(output, 'wb') as f:
f.write(r.content)
with zipfile.ZipFile(output) as item:
item.extractall()
#Dataframe manipulation library
import pandas as pd
#Math functions, we'll only need the sqrt function so let's import only that
from math import sqrt
import numpy as np
#Storing the movie information into a pandas dataframe
movies_df = pd.read_csv('./ml-latest/movies.csv')
#Storing the user information into a pandas dataframe
ratings_df = pd.read_csv('./ml-latest/ratings.csv')
#Head is a function that gets the first N rows of a dataframe. N's default is 5.
movies_df.head(1)
movieId | title | genres | |
---|---|---|---|
0 | 1 | Toy Story (1995) | Adventure|Animation|Children|Comedy|Fantasy |
Let’s remove the year from the title column and place it into its own one by using the handy .
#Using regular expressions to find a year stored between parentheses
#We specify the parantheses so we don't conflict with movies that have years in their titles
movies_df['year'] = movies_df.title.str.extract('(\(\d\d\d\d\))',expand=False)
#Removing the parentheses
movies_df['year'] = movies_df.year.str.extract('(\d\d\d\d)',expand=False)
#Removing the years from the 'title' column
movies_df['title'] = movies_df.title.str.replace('(\(\d\d\d\d\))', '')
#Applying the strip function to get rid of any ending whitespace characters that may have appeared
movies_df['title'] = movies_df['title'].apply(lambda x: x.strip())
movies_df.head(1)
movieId | title | genres | year | |
---|---|---|---|---|
0 | 1 | Toy Story | Adventure|Animation|Children|Comedy|Fantasy | 1995 |
#Dropping the genres column
movies_df = movies_df.drop('genres', 1)
movies_df.head(2)
movieId | title | year | |
---|---|---|---|
0 | 1 | Toy Story | 1995 |
1 | 2 | Jumanji | 1995 |
ratings_df.head(2)
userId | movieId | rating | timestamp | |
---|---|---|---|---|
0 | 1 | 169 | 2.5 | 1204927694 |
1 | 1 | 2471 | 3.0 | 1204927438 |
Every row in the ratings dataframe has a user id associated with at least one movie, a rating and a timestamp showing when they reviewed it. We won’t be needing the timestamp column, so let’s drop it to save on memory.
#Drop removes a specified row or column from a dataframe
ratings_df = ratings_df.drop('timestamp', 1)
ratings_df.to_csv("ratings.csv", encoding='utf-8', index=False)
The process for creating a User Based recommendation system is as follows:
- Select a user with the movies the user has watched
- Based on his rating to movies, find the top X neighbours
- Get the watched movie record of the user for each neighbour.
- Calculate a similarity score using some formula
- Recommend the items with the highest score
Let’s begin by creating an input user to recommend movies to:
Notice: To add more movies, simply increase the amount of elements in the userInput.
Collaborative Filtering with Python
#Dataframe manipulation library
import pandas as pd
#Math functions, we'll only need the sqrt function so let's import only thatfrom math
import sqrtimport numpy as np
import matplotlib.pyplot as plt%matplotlib inline
The first technique we’re going to take a look at is called Collaborative Filtering, which is also known as User-User Filtering.
It attempts to find users that have similar preferences and opinions as the input and then recommends items that they have liked to the input.
The process for creating a User Based recommendation system is as follows:
- Select a user with the movies the user has watched
- Based on his rating to movies, find the top X neighbours
- Get the watched movie record of the user for each neighbour.
- Calculate a similarity score using some formula
- Recommend the items with the highest score
userInput = [
{'title':'Breakfast Club, The', 'rating':5},
{'title':'Toy Story', 'rating':3.5},
{'title':'Jumanji', 'rating':2},
{'title':"Pulp Fiction", 'rating':5},
{'title':'Akira', 'rating':4.5}
]
inputMovies = pd.DataFrame(userInput)
inputMovies
title | rating | |
---|---|---|
0 | Breakfast Club, The | 5.0 |
1 | Toy Story | 3.5 |
2 | Jumanji | 2.0 |
3 | Pulp Fiction | 5.0 |
4 | Akira | 4.5 |
Add movieId to input user
With the input complete, let’s extract the input movies’s ID’s from the movies dataframe and add them into it.
We can achieve this by first filtering out the rows that contain the input movies’ title and then merging this subset with the input dataframe. We also drop unnecessary columns for the input to save memory space.
#Filtering out the movies by title
inputId = movies_df[movies_df['title'].isin(inputMovies['title'].tolist())]
#Then merging it so we can get the movieId. It's implicitly merging it by title.
inputMovies = pd.merge(inputId, inputMovies)
#Dropping information we won't use from the input dataframe
inputMovies = inputMovies.drop('year', 1)
#Final input dataframe
#If a movie you added in above isn't here, then it might not be in the original
#dataframe or it might spelled differently, please check capitalisation.
inputMovies
movieId | title | rating | |
---|---|---|---|
0 | 1 | Toy Story | 3.5 |
1 | 2 | Jumanji | 2.0 |
2 | 296 | Pulp Fiction | 5.0 |
3 | 1274 | Akira | 4.5 |
4 | 1968 | Breakfast Club, The | 5.0 |
The users who has seen the same movies
Now with the movie ID’s in our input, we can now get the subset of users that have watched and reviewed the movies in our input.
#Filtering out users that have watched movies that the input has watched and storing it
userSubset = ratings_df[ratings_df['movieId'].isin(inputMovies['movieId'].tolist())]
userSubset.head()
userId | movieId | rating | |
---|---|---|---|
19 | 4 | 296 | 4.0 |
441 | 12 | 1968 | 3.0 |
479 | 13 | 2 | 2.0 |
531 | 13 | 1274 | 5.0 |
681 | 14 | 296 | 2.0 |
#Groupby creates several sub dataframes where they all have the same value in the column specified as the parameter
userSubsetGroup = userSubset.groupby(['userId'])
lets look at one of the users, e.g. the one with userID=1130
userSubsetGroup.get_group(1130)
userId | movieId | rating | |
---|---|---|---|
104167 | 1130 | 1 | 0.5 |
104168 | 1130 | 2 | 4.0 |
104214 | 1130 | 296 | 4.0 |
104363 | 1130 | 1274 | 4.5 |
104443 | 1130 | 1968 | 4.5 |
#Sorting it so users with movie most in common with the input will have priority
userSubsetGroup = sorted(userSubsetGroup, key=lambda x: len(x[1]), reverse=True)
Now lets look at the first user
userSubsetGroup[0:3]
[(75,
userId movieId rating
7507 75 1 5.0
7508 75 2 3.5
7540 75 296 5.0
7633 75 1274 4.5
7673 75 1968 5.0),
(106,
userId movieId rating
9083 106 1 2.5
9084 106 2 3.0
9115 106 296 3.5
9198 106 1274 3.0
9238 106 1968 3.5),
(686,
userId movieId rating
61336 686 1 4.0
61337 686 2 3.0
61377 686 296 4.0
61478 686 1274 4.0
61569 686 1968 5.0)]
We will select a subset of users to iterate through. This limit is imposed because we don’t want to waste too much time going through every single user.
userSubsetGroup = userSubsetGroup[0:100]
Next, we are going to compare all users to our specified user and find the one that is most similar.
we’re going to find out how similar each user is to the input through the Pearson Correlation Coefficient. It is used to measure the strength of a linear association between two variables. Pearson correlation is invariant to scaling, i.e. multiplying all elements by a nonzero constant or adding any constant to all elements. .
The values given by the formula vary from r = -1 to r = 1, where 1 forms a direct correlation between the two entities (it means a perfect positive correlation) and -1 forms a perfect negative correlation. In our case, a 1 means that the two users have similar tastes while a -1 means the opposite.
Now, we calculate the Pearson Correlation between input user and subset group, and store it in a dictionary, where the key is the user Id and the value is the coefficient
#Store the Pearson Correlation in a dictionary, where the key is the user Id and the value is the coefficient
pearsonCorrelationDict = {}
#For every user group in our subset
for name, group in userSubsetGroup:
#Let's start by sorting the input and current user group so the values aren't mixed up later on
group = group.sort_values(by='movieId')
inputMovies = inputMovies.sort_values(by='movieId')
#Get the N for the formula
nRatings = len(group)
#Get the review scores for the movies that they both have in common
temp_df = inputMovies[inputMovies['movieId'].isin(group['movieId'].tolist())]
#And then store them in a temporary buffer variable in a list format to facilitate future calculations
tempRatingList = temp_df['rating'].tolist()
#Let's also put the current user group reviews in a list format
tempGroupList = group['rating'].tolist()
#Now let's calculate the pearson correlation between two users, so called, x and y
Sxx = sum([i**2 for i in tempRatingList]) - pow(sum(tempRatingList),2)/float(nRatings)
Syy = sum([i**2 for i in tempGroupList]) - pow(sum(tempGroupList),2)/float(nRatings)
Sxy = sum( i*j for i, j in zip(tempRatingList, tempGroupList)) - sum(tempRatingList)*sum(tempGroupList)/float(nRatings)
#If the denominator is different than zero, then divide, else, 0 correlation.
if Sxx != 0 and Syy != 0:
pearsonCorrelationDict[name] = Sxy/sqrt(Sxx*Syy)
else:
pearsonCorrelationDict[name] = 0
pearsonDF = pd.DataFrame.from_dict(pearsonCorrelationDict, orient='index')
pearsonDF.columns = ['similarityIndex']
pearsonDF['userId'] = pearsonDF.index
pearsonDF.index = range(len(pearsonDF))
pearsonDF.head()
similarityIndex | userId | |
---|---|---|
0 | 0.827278 | 75 |
1 | 0.586009 | 106 |
2 | 0.832050 | 686 |
3 | 0.576557 | 815 |
4 | 0.943456 | 1040 |
The top x similar users to input user
Now let’s get the top 50 users that are most similar to the input.
topUsers=pearsonDF.sort_values(by='similarityIndex', ascending=False)[0:50]
topUsers.head()
similarityIndex | userId | |
---|---|---|
64 | 0.961678 | 12325 |
34 | 0.961538 | 6207 |
55 | 0.961538 | 10707 |
67 | 0.960769 | 13053 |
4 | 0.943456 | 1040 |
Rating of selected users to all movies
We’re going to do this by taking the weighted average of the ratings of the movies using the Pearson Correlation as the weight. But to do this, we first need to get the movies watched by the users in our pearsonDF from the ratings dataframe and then store their correlation in a new column called _similarityIndex”. This is achieved below by merging of these two tables.
topUsersRating=topUsers.merge(ratings_df, left_on='userId', right_on='userId', how='inner')topUsersRating.head()
similarityIndex | userId | movieId | rating | |
---|---|---|---|---|
0 | 0.961678 | 12325 | 1 | 3.5 |
1 | 0.961678 | 12325 | 2 | 1.5 |
2 | 0.961678 | 12325 | 3 | 3.0 |
3 | 0.961678 | 12325 | 5 | 0.5 |
4 | 0.961678 | 12325 | 6 | 2.5 |
Now all we need to do is simply multiply the movie rating by its weight (The similarity index), then sum up the new ratings and divide it by the sum of the weights.
We can easily do this by simply multiplying two columns, then grouping up the dataframe by movieId and then dividing two columns:
It shows the idea of all similar users to candidate movies for the input user:
#Multiplies the similarity by the user's ratings
topUsersRating['weightedRating'] = topUsersRating['similarityIndex']*topUsersRating['rating']
topUsersRating.head()
similarityIndex | userId | movieId | rating | weightedRating | |
---|---|---|---|---|---|
0 | 0.961678 | 12325 | 1 | 3.5 | 3.365874 |
1 | 0.961678 | 12325 | 2 | 1.5 | 1.442517 |
2 | 0.961678 | 12325 | 3 | 3.0 | 2.885035 |
3 | 0.961678 | 12325 | 5 | 0.5 | 0.480839 |
4 | 0.961678 | 12325 | 6 | 2.5 | 2.404196 |
#Applies a sum to the topUsers after grouping it up by userId
tempTopUsersRating = topUsersRating.groupby('movieId').sum()[['similarityIndex','weightedRating']]
tempTopUsersRating.columns = ['sum_similarityIndex','sum_weightedRating']
tempTopUsersRating.head()
sum_similarityIndex | sum_weightedRating | |
---|---|---|
movieId | ||
1 | 38.376281 | 140.800834 |
2 | 38.376281 | 96.656745 |
3 | 10.253981 | 27.254477 |
4 | 0.929294 | 2.787882 |
5 | 11.723262 | 27.151751 |
#Creates an empty dataframe
recommendation_df = pd.DataFrame()
#Now we take the weighted average
recommendation_df['weighted average recommendation score'] = tempTopUsersRating['sum_weightedRating']/tempTopUsersRating['sum_similarityIndex']
recommendation_df['movieId'] = tempTopUsersRating.index
recommendation_df.head()
weighted average recommendation score | movieId | |
---|---|---|
movieId | ||
1 | 3.668955 | 1 |
2 | 2.518658 | 2 |
3 | 2.657941 | 3 |
4 | 3.000000 | 4 |
5 | 2.316058 | 5 |
Now let’s sort it and see the top 20 movies that the algorithm recommended!
recommendation_df = recommendation_df.sort_values(by='weighted average recommendation score', ascending=False)recommendation_df.head(10)
weighted average recommendation score | movieId | |
---|---|---|
movieId | ||
5073 | 5.0 | 5073 |
3329 | 5.0 | 3329 |
2284 | 5.0 | 2284 |
26801 | 5.0 | 26801 |
6776 | 5.0 | 6776 |
6672 | 5.0 | 6672 |
3759 | 5.0 | 3759 |
3769 | 5.0 | 3769 |
3775 | 5.0 | 3775 |
90531 | 5.0 | 90531 |
movies_df.loc[movies_df['movieId'].isin(recommendation_df.head(10)['movieId'].tolist())]
movieId | title | year | |
---|---|---|---|
2200 | 2284 | Bandit Queen | 1994 |
3243 | 3329 | Year My Voice Broke, The | 1987 |
3669 | 3759 | Fun and Fancy Free | 1947 |
3679 | 3769 | Thunderbolt and Lightfoot | 1974 |
3685 | 3775 | Make Mine Music | 1946 |
4978 | 5073 | Son's Room, The (Stanza del figlio, La) | 2001 |
6563 | 6672 | War Photographer | 2001 |
6667 | 6776 | Lagaan: Once Upon a Time in India | 2001 |
9064 | 26801 | Dragon Inn (Sun lung moon hak chan) | 1992 |
18106 | 90531 | Shame | 2011 |
Collaborative Filtering with Pyspark
First thing to do is start a Spark Session
import findspark
findspark.init()
from pyspark.sql import SparkSession
If your computer has less than 5g of RAM change the memory of spark
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "5g") \
.appName('rec') \
.getOrCreate()
from pyspark.ml.evaluation import RegressionEvaluatorfrom pyspark.ml.recommendation import ALS
data = spark.read.csv('ratings.csv',inferSchema=True,header=True)
data.head()
Row(userId=1, movieId=169, rating=2.5)
data.describe().show()
+-------+------------------+------------------+------------------+
|summary| userId| movieId| rating|
+-------+------------------+------------------+------------------+
| count| 22884377| 22884377| 22884377|
| mean|123545.22803517876|11408.161728851084|3.5260770044122243|
| stddev| 71474.6902962076| 24136.87588274057|1.0611734340135037|
| min| 1| 1| 0.5|
| max| 247753| 151711| 5.0|
+-------+------------------+------------------+------------------+
# Smaller dataset so we will use 0.8 / 0.2
(training, test) = data.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
Attention: we requiere 5g of free RAM memory
model = als.fit(training)
Now let’s see how the model performed!
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
predictions.show()
+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|178254| 148| 3.0| 4.6890635|
| 90446| 148| 2.0| 2.8395023|
|134189| 148| 5.0| 3.7372081|
|158304| 148| 3.0| 3.2317648|
|236731| 148| 3.0| 2.6928616|
|233017| 148| 2.0| 2.8125327|
|119850| 148| 4.0| 3.2640007|
|219880| 148| 5.0| 5.158979|
|108678| 148| 4.0| 2.820398|
| 48620| 148| 2.0| 3.032034|
|112136| 148| 2.0| 3.3153908|
|147802| 148| 5.0| 3.659751|
| 33400| 148| 3.0| 3.9928482|
| 88163| 148| 3.0| 2.1727314|
|191207| 148| 3.0| 3.5695121|
| 37586| 148| 3.0|0.97570467|
|142515| 148| 3.0| 2.9032598|
|184545| 148| 1.0| 2.0106587|
|169266| 148| 1.0|0.44079798|
| 86384| 148| 3.0| 3.259998|
+------+-------+------+----------+
only showing top 20 rows
So now that we have the model, how would you actually supply a recommendation to a user?
The same way we did with the test data! For example:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])
# User had 10 ratings in the test data set
single_user.show()
+-------+------+
|movieId|userId|
+-------+------+
| 3| 11|
| 186| 11|
| 908| 11|
| 1220| 11|
| 1225| 11|
| 1266| 11|
| 1275| 11|
| 2599| 11|
| 2641| 11|
| 2687| 11|
| 2993| 11|
| 3686| 11|
| 3826| 11|
+-------+------+
reccomendations = model.transform(single_user)
reccomendations.orderBy('prediction',ascending=False).show()
+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
| 1275| 11| 3.4204795|
| 1225| 11| 3.4091446|
| 2687| 11| 3.3668394|
| 1266| 11| 3.320037|
| 908| 11| 3.2962897|
| 3686| 11| 3.268262|
| 1220| 11| 3.22914|
| 2993| 11| 3.0513|
| 2599| 11| 3.0509036|
| 2641| 11| 2.8271532|
| 3| 11| 2.8164563|
| 186| 11| 2.8024044|
| 3826| 11| 2.7049482|
+-------+------+----------+
You can download the notebook here
Congratulations! We have practiced Collaborative Filtering with Python and Spark
Leave a comment