K-Means Clustering with Python and Spark

6 minute read

Imagine that you have a customer dataset, and you need to apply customer segmentation on this historical data.

Customer segmentation is the practice of partitioning a customer base into groups of individuals that have similar characteristics.

It is a significant strategy as a business can target these specific groups of customers and effectively allocate marketing resources.

For example, one group might contain customers who are high-profit and low-risk, that is, more likely to purchase products, or subscribe for a service.

Table of contents


The installation of Python and Pyspark and the introduction of K-Means is given here.

K-Means Clustering with Python

import random 
import numpy as np 
import matplotlib.pyplot as plt 
from sklearn.cluster import KMeans 
%matplotlib inline
import pandas as pd
cust_df = pd.read_csv("Cust_Segmentation.csv")
cust_df.head()
Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted Address DebtIncomeRatio
0 1 41 2 6 19 0.124 1.073 0.0 NBA001 6.3
1 2 47 1 26 100 4.582 8.218 0.0 NBA021 12.8
2 3 33 2 10 57 6.111 5.802 1.0 NBA013 20.9
3 4 29 2 4 19 0.681 0.516 0.0 NBA009 6.3
4 5 47 1 31 253 9.308 8.908 0.0 NBA008 7.2
df = cust_df.drop('Address', axis=1)
df.head()
Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio
0 1 41 2 6 19 0.124 1.073 0.0 6.3
1 2 47 1 26 100 4.582 8.218 0.0 12.8
2 3 33 2 10 57 6.111 5.802 1.0 20.9
3 4 29 2 4 19 0.681 0.516 0.0 6.3
4 5 47 1 31 253 9.308 8.908 0.0 7.2

Normalizing over the standard deviation

Now let’s normalize the dataset. But why do we need normalization in the first place? Normalization is a statistical method that helps mathematical-based algorithms to interpret features with different magnitudes and distributions equally. We use StandardScaler() to normalize our dataset.

from sklearn.preprocessing import StandardScaler
X = df.values[:,1:]
X = np.nan_to_num(X)
Clus_dataSet = StandardScaler().fit_transform(X)
Clus_dataSet
array([[ 0.74291541,  0.31212243, -0.37878978, ..., -0.59048916,
        -0.52379654, -0.57652509],
       [ 1.48949049, -0.76634938,  2.5737211 , ...,  1.51296181,
        -0.52379654,  0.39138677],
       [-0.25251804,  0.31212243,  0.2117124 , ...,  0.80170393,
         1.90913822,  1.59755385],
       ...,
       [-1.24795149,  2.46906604, -1.26454304, ...,  0.03863257,
         1.90913822,  3.45892281],
       [-0.37694723, -0.76634938,  0.50696349, ..., -0.70147601,
        -0.52379654, -1.08281745],
       [ 2.1116364 , -0.76634938,  1.09746566, ...,  0.16463355,
        -0.52379654, -0.2340332 ]])
clusterNum = 3
k_means = KMeans(init = "k-means++", n_clusters = clusterNum, n_init = 12)
k_means.fit(X)
labels = k_means.labels_
#print(labels)

We assign the labels to each row in dataframe.

df["Clus_km"] = labels
df.head(5)
Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio Clus_km
0 1 41 2 6 19 0.124 1.073 0.0 6.3 0
1 2 47 1 26 100 4.582 8.218 0.0 12.8 2
2 3 33 2 10 57 6.111 5.802 1.0 20.9 0
3 4 29 2 4 19 0.681 0.516 0.0 6.3 0
4 5 47 1 31 253 9.308 8.908 0.0 7.2 1

We can easily check the centroid values by averaging the features in each cluster.

df.groupby('Clus_km').mean()
Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio
Clus_km
0 432.468413 32.964561 1.614792 6.374422 31.164869 1.032541 2.104133 0.285185 10.094761
1 410.166667 45.388889 2.666667 19.555556 227.166667 5.678444 10.907167 0.285714 7.322222
2 402.295082 41.333333 1.956284 15.256831 83.928962 3.103639 5.765279 0.171233 10.724590

Now, lets look at the distribution of customers based on their age and income:

area = np.pi * ( X[:, 1])**2  
plt.scatter(X[:, 0], X[:, 3], s=area, c=labels.astype(np.float), alpha=0.5)
plt.xlabel('Age', fontsize=18)
plt.ylabel('Income', fontsize=16)

plt.show()

C:\Anaconda3\envs\pyspark\lib\site-packages\ipykernel_launcher.py:2: DeprecationWarning: `np.float` is a deprecated alias for the builtin `float`. To silence this warning, use `float` by itself. Doing this will not modify any behavior and is safe. If you specifically wanted the numpy scalar type, use `np.float64` here.
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations


png

k-means will partition your customers into mutually exclusive groups, for example, into 3 clusters. The customers in each cluster are similar to each other demographically.

K-Means Clustering with Pyspark

First thing to do is start a Spark Session

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('customers').getOrCreate()
from pyspark.ml.clustering import KMeans
# Loads data.
dataset = spark.read.csv("Cust_Segmentation.csv",header=True,inferSchema=True)
dataset.head(1)
[Row(Customer Id=1, Age=41, Edu=2, Years Employed=6, Income=19, Card Debt=0.124, Other Debt=1.073, Defaulted=0, Address='NBA001', DebtIncomeRatio=6.3)]
#dataset.describe().show(1)
dataset.printSchema()
root
 |-- Customer Id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Edu: integer (nullable = true)
 |-- Years Employed: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Card Debt: double (nullable = true)
 |-- Other Debt: double (nullable = true)
 |-- Defaulted: integer (nullable = true)
 |-- Address: string (nullable = true)
 |-- DebtIncomeRatio: double (nullable = true)

As you can see, Address in this dataset is a categorical variable. k-means algorithm isn’t directly applicable to categorical variables because Euclidean distance function isn’t really meaningful for discrete variables. So, lets drop this feature and run clustering.

columns_to_drop = ['Address']
dataset = dataset.drop(*columns_to_drop)
dataset.printSchema()
root
 |-- Customer Id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Edu: integer (nullable = true)
 |-- Years Employed: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Card Debt: double (nullable = true)
 |-- Other Debt: double (nullable = true)
 |-- Defaulted: integer (nullable = true)
 |-- DebtIncomeRatio: double (nullable = true)

dataset.columns
['Customer Id',
 'Age',
 'Edu',
 'Years Employed',
 'Income',
 'Card Debt',
 'Other Debt',
 'Defaulted',
 'DebtIncomeRatio']
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import IntegerType
dataset = dataset.withColumn("Defaulted", dataset["Defaulted"].cast(IntegerType()))
dataset.printSchema()
root
 |-- Customer Id: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Edu: integer (nullable = true)
 |-- Years Employed: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Card Debt: double (nullable = true)
 |-- Other Debt: double (nullable = true)
 |-- Defaulted: integer (nullable = true)
 |-- DebtIncomeRatio: double (nullable = true)

feat_cols = [
 'Age',
 'Edu','Years Employed','Income','Card Debt','Other Debt','DebtIncomeRatio']
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')
final_data = vec_assembler.transform(dataset)
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
final_data
DataFrame[Customer Id: int, Age: int, Edu: int, Years Employed: int, Income: int, Card Debt: double, Other Debt: double, Defaulted: int, DebtIncomeRatio: double, features: vector]
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)
# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_data)

Train the Model and Evaluate

** Time to find out whether its 2 or 3! **

# Trains a k-means model.
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)
model3 = kmeans3.fit(cluster_final_data)
model2 = kmeans2.fit(cluster_final_data)
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Make predictions
predictions3 = model3.transform(cluster_final_data)
predictions2 = model2.transform(cluster_final_data)
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions3)
print("With k=3 Silhouette with squared euclidean distance = " + str(silhouette))
silhouette = evaluator.evaluate(predictions2)
print("With k=2 Silhouette with squared euclidean distance = " + str(silhouette))
With k=3 Silhouette with squared euclidean distance = 0.05288868077858952
With k=2 Silhouette with squared euclidean distance = 0.6581986063262965
#Show the results
centers=model.clusterCenters()
print("Cluster Centers:")
for center in centers:
    print(center)
Cluster Centers:
[5.38544237 2.08381863 2.54750121 2.6356673  1.9964595  2.30786829
 2.04421758]
[3.71924395 2.29195271 0.55736835 0.78361889 0.68066746 0.87084406
 2.18192288]
[4.38305208 1.49522293 1.24517269 0.97828653 0.34932727 0.44759616
 0.93527274]
# Evaluate clustering by computing Within Set Sum of Squared Errors.
for k in range(2,9):
    kmeans = KMeans(featuresCol='scaledFeatures',k=k)
    model = kmeans.fit(cluster_final_data)
    predictions = model.transform(cluster_final_data)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    print("With K={}".format(k))
    print("Silhouette with squared euclidean distance = " + str(silhouette))
    print('--'*30)
With K=2
Silhouette with squared euclidean distance = 0.6581986063262965
------------------------------------------------------------
With K=3
Silhouette with squared euclidean distance = 0.05288868077858952
------------------------------------------------------------
With K=4
Silhouette with squared euclidean distance = 0.23934815694787387
------------------------------------------------------------
With K=5
Silhouette with squared euclidean distance = 0.09503424227629984
------------------------------------------------------------
With K=6
Silhouette with squared euclidean distance = 0.03747870420927804
------------------------------------------------------------
With K=7
Silhouette with squared euclidean distance = 0.07544170107415611
------------------------------------------------------------
With K=8
Silhouette with squared euclidean distance = 0.07633841797382555
------------------------------------------------------------

Let’s check with the transform and prediction columns that result form this! Congratulations if you made this connection, it was quite tricky given what we’ve covered!

model3.transform(cluster_final_data).groupBy('prediction').count().show()
+----------+-----+
|prediction|count|
+----------+-----+
|         1|  261|
|         2|  439|
|         0|  150|
+----------+-----+

model2.transform(cluster_final_data).groupBy('prediction').count().show()
+----------+-----+
|prediction|count|
+----------+-----+
|         1|  183|
|         0|  667|
+----------+-----+

​ You can download the notebook here

Congratulations! We have practiced K-Means Clustering with Python and Spark.

Posted:

Leave a comment