# K-Means Clustering with Python and Spark

Imagine that you have a customer dataset, and you need to apply customer segmentation on this historical data.

Customer segmentation is the practice of partitioning a customer base into groups of individuals that have similar characteristics.

It is a significant strategy as a business can target these specific groups of customers and effectively allocate marketing resources.

For example, one group might contain customers who are high-profit and low-risk, that is, more likely to purchase products, or subscribe for a service.

The installation of Python and Pyspark and the introduction of K-Means is given here.

### K-Means Clustering with Python

import random
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
%matplotlib inline

import pandas as pd

Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted Address DebtIncomeRatio
0 1 41 2 6 19 0.124 1.073 0.0 NBA001 6.3
1 2 47 1 26 100 4.582 8.218 0.0 NBA021 12.8
2 3 33 2 10 57 6.111 5.802 1.0 NBA013 20.9
3 4 29 2 4 19 0.681 0.516 0.0 NBA009 6.3
4 5 47 1 31 253 9.308 8.908 0.0 NBA008 7.2
df = cust_df.drop('Address', axis=1)

Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio
0 1 41 2 6 19 0.124 1.073 0.0 6.3
1 2 47 1 26 100 4.582 8.218 0.0 12.8
2 3 33 2 10 57 6.111 5.802 1.0 20.9
3 4 29 2 4 19 0.681 0.516 0.0 6.3
4 5 47 1 31 253 9.308 8.908 0.0 7.2

#### Normalizing over the standard deviation

Now let’s normalize the dataset. But why do we need normalization in the first place? Normalization is a statistical method that helps mathematical-based algorithms to interpret features with different magnitudes and distributions equally. We use StandardScaler() to normalize our dataset.

from sklearn.preprocessing import StandardScaler
X = df.values[:,1:]
X = np.nan_to_num(X)
Clus_dataSet = StandardScaler().fit_transform(X)
Clus_dataSet

array([[ 0.74291541,  0.31212243, -0.37878978, ..., -0.59048916,
-0.52379654, -0.57652509],
[ 1.48949049, -0.76634938,  2.5737211 , ...,  1.51296181,
-0.52379654,  0.39138677],
[-0.25251804,  0.31212243,  0.2117124 , ...,  0.80170393,
1.90913822,  1.59755385],
...,
[-1.24795149,  2.46906604, -1.26454304, ...,  0.03863257,
1.90913822,  3.45892281],
[-0.37694723, -0.76634938,  0.50696349, ..., -0.70147601,
-0.52379654, -1.08281745],
[ 2.1116364 , -0.76634938,  1.09746566, ...,  0.16463355,
-0.52379654, -0.2340332 ]])

clusterNum = 3
k_means = KMeans(init = "k-means++", n_clusters = clusterNum, n_init = 12)
k_means.fit(X)
labels = k_means.labels_
#print(labels)


We assign the labels to each row in dataframe.

df["Clus_km"] = labels

Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio Clus_km
0 1 41 2 6 19 0.124 1.073 0.0 6.3 0
1 2 47 1 26 100 4.582 8.218 0.0 12.8 2
2 3 33 2 10 57 6.111 5.802 1.0 20.9 0
3 4 29 2 4 19 0.681 0.516 0.0 6.3 0
4 5 47 1 31 253 9.308 8.908 0.0 7.2 1

We can easily check the centroid values by averaging the features in each cluster.

df.groupby('Clus_km').mean()

Customer Id Age Edu Years Employed Income Card Debt Other Debt Defaulted DebtIncomeRatio
Clus_km
0 432.468413 32.964561 1.614792 6.374422 31.164869 1.032541 2.104133 0.285185 10.094761
1 410.166667 45.388889 2.666667 19.555556 227.166667 5.678444 10.907167 0.285714 7.322222
2 402.295082 41.333333 1.956284 15.256831 83.928962 3.103639 5.765279 0.171233 10.724590

Now, lets look at the distribution of customers based on their age and income:

area = np.pi * ( X[:, 1])**2
plt.scatter(X[:, 0], X[:, 3], s=area, c=labels.astype(np.float), alpha=0.5)
plt.xlabel('Age', fontsize=18)
plt.ylabel('Income', fontsize=16)

plt.show()


C:\Anaconda3\envs\pyspark\lib\site-packages\ipykernel_launcher.py:2: DeprecationWarning: np.float is a deprecated alias for the builtin float. To silence this warning, use float by itself. Doing this will not modify any behavior and is safe. If you specifically wanted the numpy scalar type, use np.float64 here.
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations k-means will partition your customers into mutually exclusive groups, for example, into 3 clusters. The customers in each cluster are similar to each other demographically.

### K-Means Clustering with Pyspark

First thing to do is start a Spark Session

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('customers').getOrCreate()

from pyspark.ml.clustering import KMeans

dataset.head(1)

[Row(Customer Id=1, Age=41, Edu=2, Years Employed=6, Income=19, Card Debt=0.124, Other Debt=1.073, Defaulted=0, Address='NBA001', DebtIncomeRatio=6.3)]

#dataset.describe().show(1)

dataset.printSchema()

root
|-- Customer Id: integer (nullable = true)
|-- Age: integer (nullable = true)
|-- Edu: integer (nullable = true)
|-- Years Employed: integer (nullable = true)
|-- Income: integer (nullable = true)
|-- Card Debt: double (nullable = true)
|-- Other Debt: double (nullable = true)
|-- Defaulted: integer (nullable = true)
|-- Address: string (nullable = true)
|-- DebtIncomeRatio: double (nullable = true)


As you can see, Address in this dataset is a categorical variable. k-means algorithm isn’t directly applicable to categorical variables because Euclidean distance function isn’t really meaningful for discrete variables. So, lets drop this feature and run clustering.

columns_to_drop = ['Address']
dataset = dataset.drop(*columns_to_drop)
dataset.printSchema()

root
|-- Customer Id: integer (nullable = true)
|-- Age: integer (nullable = true)
|-- Edu: integer (nullable = true)
|-- Years Employed: integer (nullable = true)
|-- Income: integer (nullable = true)
|-- Card Debt: double (nullable = true)
|-- Other Debt: double (nullable = true)
|-- Defaulted: integer (nullable = true)
|-- DebtIncomeRatio: double (nullable = true)


dataset.columns

['Customer Id',
'Age',
'Edu',
'Years Employed',
'Income',
'Card Debt',
'Other Debt',
'Defaulted',
'DebtIncomeRatio']

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

from pyspark.sql.types import IntegerType
dataset = dataset.withColumn("Defaulted", dataset["Defaulted"].cast(IntegerType()))

dataset.printSchema()

root
|-- Customer Id: integer (nullable = true)
|-- Age: integer (nullable = true)
|-- Edu: integer (nullable = true)
|-- Years Employed: integer (nullable = true)
|-- Income: integer (nullable = true)
|-- Card Debt: double (nullable = true)
|-- Other Debt: double (nullable = true)
|-- Defaulted: integer (nullable = true)
|-- DebtIncomeRatio: double (nullable = true)


feat_cols = [
'Age',
'Edu','Years Employed','Income','Card Debt','Other Debt','DebtIncomeRatio']

vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')

final_data = vec_assembler.transform(dataset)

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)

final_data

DataFrame[Customer Id: int, Age: int, Edu: int, Years Employed: int, Income: int, Card Debt: double, Other Debt: double, Defaulted: int, DebtIncomeRatio: double, features: vector]

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(final_data)

# Normalize each feature to have unit standard deviation.
cluster_final_data = scalerModel.transform(final_data)


## Train the Model and Evaluate

** Time to find out whether its 2 or 3! **

# Trains a k-means model.
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)

model3 = kmeans3.fit(cluster_final_data)
model2 = kmeans2.fit(cluster_final_data)

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Make predictions
predictions3 = model3.transform(cluster_final_data)
predictions2 = model2.transform(cluster_final_data)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions3)
print("With k=3 Silhouette with squared euclidean distance = " + str(silhouette))
silhouette = evaluator.evaluate(predictions2)
print("With k=2 Silhouette with squared euclidean distance = " + str(silhouette))

With k=3 Silhouette with squared euclidean distance = 0.05288868077858952
With k=2 Silhouette with squared euclidean distance = 0.6581986063262965

#Show the results

centers=model.clusterCenters()
print("Cluster Centers:")
for center in centers:
print(center)

Cluster Centers:
[5.38544237 2.08381863 2.54750121 2.6356673  1.9964595  2.30786829
2.04421758]
[3.71924395 2.29195271 0.55736835 0.78361889 0.68066746 0.87084406
2.18192288]
[4.38305208 1.49522293 1.24517269 0.97828653 0.34932727 0.44759616
0.93527274]

# Evaluate clustering by computing Within Set Sum of Squared Errors.

for k in range(2,9):
kmeans = KMeans(featuresCol='scaledFeatures',k=k)
model = kmeans.fit(cluster_final_data)
predictions = model.transform(cluster_final_data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("With K={}".format(k))
print("Silhouette with squared euclidean distance = " + str(silhouette))
print('--'*30)

With K=2
Silhouette with squared euclidean distance = 0.6581986063262965
------------------------------------------------------------
With K=3
Silhouette with squared euclidean distance = 0.05288868077858952
------------------------------------------------------------
With K=4
Silhouette with squared euclidean distance = 0.23934815694787387
------------------------------------------------------------
With K=5
Silhouette with squared euclidean distance = 0.09503424227629984
------------------------------------------------------------
With K=6
Silhouette with squared euclidean distance = 0.03747870420927804
------------------------------------------------------------
With K=7
Silhouette with squared euclidean distance = 0.07544170107415611
------------------------------------------------------------
With K=8
Silhouette with squared euclidean distance = 0.07633841797382555
------------------------------------------------------------


Let’s check with the transform and prediction columns that result form this! Congratulations if you made this connection, it was quite tricky given what we’ve covered!

model3.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  261|
|         2|  439|
|         0|  150|
+----------+-----+


model2.transform(cluster_final_data).groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|  183|
|         0|  667|
+----------+-----+