Outlier Detection in Pyspark

21 minute read

Hello today we are going to discuss how to perform data analysis of one dataset by using pyspark.

When you are dealing with BigData and you want to perform data analysis, the first step to do is understand the data and identify anomalies in the raw data.

In ordering to work with BigData we can use different tecchnologies such as:

  • Spark. Fastest Batch processor or the most voluminous stream processor - Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming clusters with implicit data parallelism and fault tolerance
  • Hadoop. - Hadoop is a collection of open-source software utilities that facilitates using a network of many computers to solve problems involving massive amounts of data and computation. It provides a software framework for distributed storage and processing of big data using the MapReduce programming mode
  • MapReduce is a programming model and an associated implementation for processing and generating big data sets with a parallel, distributed algorithm on a cluster. A MapReduce program is composed of a map procedure, which performs filtering and sorting, and a reduce method, which performs a summary operation
  • Hive. Big data analytics framework. Hive is a data warehouse software project built on top of Apache Hadoop for providing data query and analysis. Hive gives an SQL-like interface to query data stored in various databases and file systems that integrate with Hadoop
  • Samza. Streaming processor made for Kafka - Is an open-source, near-realtime, asynchronous computational framework for stream processing developed by the Apache Software Foundation in Scala and Java.
  • Flink. A true hybrid Big data processor . The core of Apache Flink is a distributed streaming data-flow engine written in Java and Scala. Flink executes arbitrary dataflow programs in a data-parallel and pipelined manner

In this blog post we will focus on Pyspark based in Spark.

Outlier Detection

There are several methods for identifing the outiers into a big dataframe. The method that we are going to use is the Interquartile Range method, also known as IQR, was developed by John Widler Turky, an American mathematician best known for development of the FFT algorithm and box plot. IQR is a measure of statistical dispersion, which is equal to the difference between the 75th percentile and the 25th percentile. In other words:

Representation of the Interquartile Range - WikipediaRepresentation of the Interquartile Range - Wikipedia

IQR is a fairly interpretable method, often used to draw Box Plots and display the distribution of a dataset.

First we are going to install the environment and then we proceed with the data analysis.

Step 1. Installation of Conda

First you need to install anaconda at this link

png

in this location C:\Anaconda3 , then you, check that your terminal , recognize conda

C:\conda --version
conda 23.1.0

Step 2. Environment creation

The environments supported that I will consider is Python 3.10,

I will create an environment called analysis, but you can put the name that you like.

conda create -n analysis python==3.10

then we activate

conda activate analysis

then in your terminal type the following commands:

conda install ipykernel notebook

then

python -m ipykernel install --user --name analysis --display-name "Python (Analysis)"

then we install the following libraries used to the data analysis

pip install wget seaborn matplotlib requests tqdm

If you are in Google Colab pip install pyspark and go to the step 4.

Step 3. Pyspark Installation

If you have installed pyspark before you can skip this step.

# Import the os module
import os
import requests
import wget
from tqdm import tqdm
def download(url: str, fname: str):
    resp = requests.get(url, stream=True)
    total = int(resp.headers.get('content-length', 0))
    # Can also replace 'file' with a io.BytesIO object
    with open(fname, 'wb') as file, tqdm(
        desc=fname,
        total=total,
        unit='iB',
        unit_scale=True,
        unit_divisor=1024,
    ) as bar:
        for data in resp.iter_content(chunk_size=1024):
            size = file.write(data)
            bar.update(size)
#import zipfile module
from zipfile import ZipFile
def extract_zip(filename):
    with ZipFile(filename, 'r') as f:
        #extract in current directory
        f.extractall()
# importing the "tarfile" module
import tarfile
def extract_tar(filename):
    # open file
    file = tarfile.open(filename)
    # extracting file
    file.extractall()
    file.close()

Spark Installation

SPARK_FILE="spark-3.3.2-bin-hadoop3.tgz"
URL = "https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz"
#response = wget.download(URL, SPARK_FILE)
download(URL,SPARK_FILE)
spark-3.3.2-bin-hadoop3.tgz: 100%|█████████████████████████████████████████████████| 285M/285M [00:13<00:00, 22.9MiB/s]
# Path
home = os.getcwd()
# Print the current working directory
#print("Current working directory: {0}".format(home))
#set filename
fpath = os.path.join(home, SPARK_FILE)
extract_tar(fpath)
if os.name == 'nt':

    !del spark-3.3.2-bin-hadoop3.tgz
else:

    !rm spark-3.3.2-bin-hadoop3.tgz

Java Installation

if os.name == 'nt':
    url_java='https://builds.openlogic.com/downloadJDK/openlogic-openjdk/11.0.18+10/openlogic-openjdk-11.0.18+10-windows-x64.zip'
    JAVA_FILE='openjdk-11.zip'
else:
    JAVA_FILE='openjdk-11.tar.gz'
    url_java='https://builds.openlogic.com/downloadJDK/openlogic-openjdk/11.0.18+10/openlogic-openjdk-11.0.18+10-linux-x64.tar.gz'
download(url_java,JAVA_FILE)
openjdk-11.zip: 100%|██████████████████████████████████████████████████████████████| 214M/214M [00:07<00:00, 31.6MiB/s]
try:
    if os.name == 'nt':
        extract_zip(JAVA_FILE)
        !del openjdk-11.zip
    else:
        extract_tar(JAVA_FILE)
        !rm openjdk-11.tar.gz
except:
    print("")

Environment setup of Spark

import os
# Path
home = os.getcwd()
JAVA_FOLDER='openlogic-openjdk-11.0.18+10-windows-x64'
#set filename
JAVA_HOME = os.path.join(home, JAVA_FOLDER)
SPARK_FOLDER='spark-3.3.2-bin-hadoop3'
#set filename
SPARK_HOME = os.path.join(home, SPARK_FOLDER)
import os
os.environ['JAVA_HOME']=JAVA_HOME
os.environ['SPARK_HOME']=SPARK_HOME

Step 4 Importing libraries

import pyspark
import seaborn as sns
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession

Creating a Spark Session

spark = SparkSession.builder.appName('Data Analysis').getOrCreate()

Step 5. Begin Analysis

The sample dataset that we are going to analyze is here.

Reading the dataset using spark session

df = spark.read.csv('customer_data.csv',header = True)
df.show()
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
|    Young and rising|               1|                 2|20-30 years|         Living well|     13064|    0|
|Large religious f...|               2|                 3|30-40 years|Conservative fami...|     29090|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|      6895|    0|
|Lower class large...|               1|                 2|50-60 years|Family with grown...|     35497|    0|
|     Family starters|               2|                 3|40-50 years|      Average Family|     30800|    0|
|       Stable family|               1|                 4|40-50 years|      Average Family|     39157|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     40839|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|     30008|    0|
|        Mixed rurals|               1|                 3|40-50 years|             Farmers|     37209|    0|
|    Young and rising|               1|                 1|30-40 years|         Living well|     45361|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|     45650|    0|
|Traditional families|               1|                 2|40-50 years|Conservative fami...|     18982|    0|
|Mixed apartment d...|               2|                 3|40-50 years|         Living well|     30093|    0|
|Young all america...|               1|                 4|30-40 years|      Average Family|     27097|    0|
|Low income catholics|               1|                 2|50-60 years|Retired and Relig...|     23511|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 20 rows

Get DataFrame Schema

Printing the schema, to check the datatypes

df.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: string (nullable = true)
 |-- Avg_size_household: string (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: string (nullable = true)
 |-- label: string (nullable = true)

First at all found the following issues:

  1. All the columns are defined as strings datatypes
  2. There are categorical fields that should be indentifed
  3. There are columns that have to be converted into numerical data types

Counting the number of records

df.count()
2000

Checking the number of columns

len(df.columns)
7

Identifying Data types columns

#Get All column names and it's types
for col in df.dtypes:
    print(col[0]+" , "+col[1])
Customer_subtype , string
Number_of_houses , string
Avg_size_household , string
Avg_age , string
Customer_main_type , string
Avg_Salary , string
label , string
#store all column names in a list
allCols = [item[0] for item in df.dtypes]
print(allCols)
['Customer_subtype', 'Number_of_houses', 'Avg_size_household', 'Avg_age', 'Customer_main_type', 'Avg_Salary', 'label']
#store all column names that are categorical in a list
categoricalCols = [item[0] for item in df.dtypes if item[1].startswith('string')]
print(categoricalCols)
['Customer_subtype', 'Number_of_houses', 'Avg_size_household', 'Avg_age', 'Customer_main_type', 'Avg_Salary', 'label']
#store all column names that are continous in a list
continuousCols =[item[0] for item in df.dtypes if item[1].startswith('bigint')]
print(continuousCols)
[]

Segregating the desired columns to convert the data type into IntegerType

numeric_index = [1,2,5,6]
numeric_cols = [allCols[val] for val in numeric_index]
numeric_cols
['Number_of_houses', 'Avg_size_household', 'Avg_Salary', 'label']
from pyspark.sql import functions as f
from pyspark.sql.types import IntegerType

The first option you have when it comes to converting data types is pyspark.sql.Column.cast() function that converts the input column to the specified data type.

for column in numeric_cols:
    df = df.withColumn(column,f.col(column).cast(IntegerType()))

Checking the schema after converting the numerical columns into IntegerType

df.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)

Complementary information

Alternatively, you can use pyspark.sql.DataFrame.selectExpr function by specifying the corresponding SQL expressions that can cast the data type of desired columns, as shown below.

dfa = df.selectExpr(
    'cast(Number_of_houses as int) Number_of_houses',
)
dfa.printSchema()
root
 |-- Number_of_houses: integer (nullable = true)

Sometimes we need add date type

'to_date(colA, \'dd-MM-yyyy\') colA',
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, IntegerType, DateType
dfa = df \
    .withColumn('Number_of_houses', col('Number_of_houses').cast(IntegerType()))

and the datatype equivalent, first we define dhe universal defined function

# UDF to process the date column
func = udf(lambda x: datetime.strptime(x, '%d-%m-%Y'), DateType())

with 

.withColumn('colA', func(col('colA'))) \
dfa.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)

from pyspark.sql import SparkSession
# Create an instance of spark session
spark_session = SparkSession.builder \
    .master('local[1]') \
    .appName('Example') \
    .getOrCreate()
# Removing Global views
spark.catalog.dropGlobalTempView("dfa")
False
# First we need to register the DF as a global temporary view
df.createGlobalTempView("dfa")
dfa = spark_session.sql(
    """
    SELECT 
        cast(Number_of_houses as int) Number_of_houses
    FROM global_temp.dfa
    """
)
dfa.printSchema()
root
 |-- Number_of_houses: integer (nullable = true)

Creating a variable to store all the numerical columns into a list

numeric_columns = [column[0] for column in df.dtypes if column[1]=='int']
numeric_columns
['Number_of_houses', 'Avg_size_household', 'Avg_Salary', 'label']
df.show()
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
|    Young and rising|               1|                 2|20-30 years|         Living well|     13064|    0|
|Large religious f...|               2|                 3|30-40 years|Conservative fami...|     29090|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|      6895|    0|
|Lower class large...|               1|                 2|50-60 years|Family with grown...|     35497|    0|
|     Family starters|               2|                 3|40-50 years|      Average Family|     30800|    0|
|       Stable family|               1|                 4|40-50 years|      Average Family|     39157|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     40839|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|     30008|    0|
|        Mixed rurals|               1|                 3|40-50 years|             Farmers|     37209|    0|
|    Young and rising|               1|                 1|30-40 years|         Living well|     45361|    0|
|Lower class large...|               1|                 2|40-50 years|Family with grown...|     45650|    0|
|Traditional families|               1|                 2|40-50 years|Conservative fami...|     18982|    0|
|Mixed apartment d...|               2|                 3|40-50 years|         Living well|     30093|    0|
|Young all america...|               1|                 4|30-40 years|      Average Family|     27097|    0|
|Low income catholics|               1|                 2|50-60 years|Retired and Relig...|     23511|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 20 rows

Case 1 - Working with only integers

dfa=df.select(*numeric_columns)
dfa.printSchema()
root
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)

Creating a customized function

def find_outliers(df,numerical_columns=None):
    if numerical_columns==None:
        # Identifying the numerical columns in a spark dataframe
        numeric_columns = [column[0] for column in df.dtypes if column[1]=='int'or column[1]=='double']
    else:
        #Custom numerical columns
        numeric_columns =numerical_columns

    # Using the `for` loop to create new columns by identifying the outliers for each feature
    for column in numeric_columns:

        less_Q1 = 'less_Q1_{}'.format(column)
        more_Q3 = 'more_Q3_{}'.format(column)
        Q1 = 'Q1_{}'.format(column)
        Q3 = 'Q3_{}'.format(column)

        # Q1 : First Quartile ., Q3 : Third Quartile
        Q1 = df.approxQuantile(column,[0.25],relativeError=0)
        Q3 = df.approxQuantile(column,[0.75],relativeError=0)
        
        # IQR : Inter Quantile Range
        # We need to define the index [0], as Q1 & Q3 are a set of lists., to perform a mathematical operation
        # Q1 & Q3 are defined seperately so as to have a clear indication on First Quantile & 3rd Quantile
        IQR = Q3[0] - Q1[0]
        
        #selecting the data, with -1.5*IQR to + 1.5*IQR., where param = 1.5 default value
        less_Q1 =  Q1[0] - 1.5*IQR
        more_Q3 =  Q3[0] + 1.5*IQR
        
        isOutlierCol = 'is_outlier_{}'.format(column)
        
        df = df.withColumn(isOutlierCol,f.when((df[column] > more_Q3) | (df[column] < less_Q1), 1).otherwise(0))
    

    # Selecting the specific columns which we have added above, to check if there are any outliers
    selected_columns = [column for column in df.columns if column.startswith("is_outlier")]

    # Adding all the outlier columns into a new colum "total_outliers", to see the total number of outliers
    df = df.withColumn('total_outliers',sum(df[column] for column in selected_columns))

    # Dropping the extra columns created above, just to create nice dataframe., without extra columns
    df = df.drop(*[column for column in df.columns if column.startswith("is_outlier")])

    return df

Using the customized Outlier function and applying it to a spark dataframe

new_df = find_outliers(dfa)
new_df.show()
+----------------+------------------+----------+-----+--------------+
|Number_of_houses|Avg_size_household|Avg_Salary|label|total_outliers|
+----------------+------------------+----------+-----+--------------+
|               1|                 3|     44905|    0|             0|
|               1|                 2|     37575|    0|             0|
|               1|                 2|     27915|    0|             0|
|               1|                 3|     19504|    0|             0|
|               1|                 4|     34943|    0|             0|
|               1|                 2|     13064|    0|             0|
|               2|                 3|     29090|    0|             1|
|               1|                 2|      6895|    0|             0|
|               1|                 2|     35497|    0|             0|
|               2|                 3|     30800|    0|             1|
|               1|                 4|     39157|    0|             0|
|               1|                 3|     40839|    0|             0|
|               1|                 2|     30008|    0|             0|
|               1|                 3|     37209|    0|             0|
|               1|                 1|     45361|    0|             0|
|               1|                 2|     45650|    0|             0|
|               1|                 2|     18982|    0|             0|
|               2|                 3|     30093|    0|             1|
|               1|                 4|     27097|    0|             0|
|               1|                 2|     23511|    0|             0|
+----------------+------------------+----------+-----+--------------+
only showing top 20 rows

Fitering the above dataframe, to select only those records where the outlier count is < = 1

new_df_with_no_outliers = new_df.filter(new_df['total_Outliers']<=1)
new_df_with_no_outliers = new_df_with_no_outliers.select(*dfa.columns)

new_df_with_no_outliers.show()
+----------------+------------------+----------+-----+
|Number_of_houses|Avg_size_household|Avg_Salary|label|
+----------------+------------------+----------+-----+
|               1|                 3|     44905|    0|
|               1|                 2|     37575|    0|
|               1|                 2|     27915|    0|
|               1|                 3|     19504|    0|
|               1|                 4|     34943|    0|
|               1|                 2|     13064|    0|
|               2|                 3|     29090|    0|
|               1|                 2|      6895|    0|
|               1|                 2|     35497|    0|
|               2|                 3|     30800|    0|
|               1|                 4|     39157|    0|
|               1|                 3|     40839|    0|
|               1|                 2|     30008|    0|
|               1|                 3|     37209|    0|
|               1|                 1|     45361|    0|
|               1|                 2|     45650|    0|
|               1|                 2|     18982|    0|
|               2|                 3|     30093|    0|
|               1|                 4|     27097|    0|
|               1|                 2|     23511|    0|
+----------------+------------------+----------+-----+
only showing top 20 rows

The count of the dataframe, after removing the outliers

new_df_with_no_outliers.count()
1938

The dataset, which contains 2 or more outliers in each record

data_with_outliers = new_df.filter(new_df['total_Outliers']>=2)
data_with_outliers.show()
+----------------+------------------+----------+-----+--------------+
|Number_of_houses|Avg_size_household|Avg_Salary|label|total_outliers|
+----------------+------------------+----------+-----+--------------+
|               2|                 5|     40832|    0|             2|
|               1|                 3|    762769|    1|             2|
|               2|                 3|   6138618|    0|             2|
|               1|                 2|  12545905|    1|             2|
|               2|                 3|     30117|    1|             2|
|               2|                 2|    690080|    0|             2|
|               2|                 5|     37394|    0|             2|
|               1|                 4|  35032441|    1|             2|
|               1|                 4|    682348|    1|             2|
|               2|                 3|    762483|    0|             2|
|               2|                 4|     20503|    1|             2|
|               2|                 2|    315815|    0|             2|
|               2|                 3|     14798|    1|             2|
|               2|                 4|    681714|    0|             2|
|               1|                 3|    903291|    1|             2|
|               1|                 1|  17462103|    1|             2|
|               1|                 5|  18657560|    0|             2|
|               2|                 5|     26224|    0|             2|
|               3|                 2|    596723|    0|             2|
|               1|                 2|  48069548|    1|             2|
+----------------+------------------+----------+-----+--------------+
only showing top 20 rows
# Selecting the numerical columns from the original dataframe and converting into pandas

numeric_columns
['Number_of_houses', 'Avg_size_household', 'Avg_Salary', 'label']

Converting a spark dataframe into pandas dataframe, which enables us to plot the graphs using seaborn and matplotlib

If we can get an small subset of dataset we can use pandas. It is not recommendable use pandas if you are working with real big data. Because you will have problems of memory. Just for illustrative purposes , I will show the pandas dataframes.

original_numerical_df = dfa.select(*numeric_columns).toPandas()

original_numerical_df.head(10)
Number_of_houses Avg_size_household Avg_Salary label
0 1 3 44905 0
1 1 2 37575 0
2 1 2 27915 0
3 1 3 19504 0
4 1 4 34943 0
5 1 2 13064 0
6 2 3 29090 0
7 1 2 6895 0
8 1 2 35497 0
9 2 3 30800 0
# Plotting the box for the dataset after removing the outliers

dataset_after_removing_outliers = new_df_with_no_outliers.toPandas()
dataset_after_removing_outliers.head(10)
Number_of_houses Avg_size_household Avg_Salary label
0 1 3 44905 0
1 1 2 37575 0
2 1 2 27915 0
3 1 3 19504 0
4 1 4 34943 0
5 1 2 13064 0
6 2 3 29090 0
7 1 2 6895 0
8 1 2 35497 0
9 2 3 30800 0
numeric_columns
['Number_of_houses', 'Avg_size_household', 'Avg_Salary', 'label']

Plotting the box plot, to check the outliers in the original dataframe., and comaring it with the new dataframe after removing outliers

original_numerical_df
Number_of_houses Avg_size_household Avg_Salary label
0 1 3 44905 0
1 1 2 37575 0
2 1 2 27915 0
3 1 3 19504 0
4 1 4 34943 0
... ... ... ... ...
1995 1 2 45857 0
1996 1 4 45665 0
1997 1 2 32903 0
1998 1 3 46911924 0
1999 1 3 45401 0

2000 rows × 4 columns

size_columns=len(original_numerical_df.columns)
fig,ax = plt.subplots(2,size_columns,figsize=(15,8))
for i,dfa in enumerate([original_numerical_df,dataset_after_removing_outliers]):
  
    for j, col in enumerate(numeric_columns):
        sns.boxplot(data = dfa, y=col,ax=ax[i][j])


png

Case 2 - Working with strings

df.show(5)
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+
only showing top 5 rows
df.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)

Encoding string values into numeric values in Spark DataFrame

Let us assume that we are interested to encoding the columns that are strings.

StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns.

Single Column Encoding

from pyspark.ml.feature import StringIndexer
label_col ='Customer_subtype'
indexer = StringIndexer(inputCol=label_col, outputCol="idx_{0}".format(label_col))
indexed = indexer.fit(df).transform(df)
indexed.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- idx_Customer_subtype: double (nullable = false)

Multiple Columns Encodings

We assume that we want to encode all string columns

from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
#Get data type of a specific column from dtypes
#store all column names that are categorical in a list
categoricalCols = [item[0] for item in df.dtypes if item[1].startswith('string')]
print(categoricalCols)
['Customer_subtype', 'Avg_age', 'Customer_main_type']
label_col =categoricalCols  # List of string columns to encode
label_col
['Customer_subtype', 'Avg_age', 'Customer_main_type']

For classifications problems, if you want to use ML you should index label as well if you want to use MLlib it is not necessary For regression problems you should omit label in the indexing as shown below

# Indexers encode strings with doubles
string_indexers = [
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))

   for x in df.columns if x in label_col # Exclude other columns if needed
]
df.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)

inputCols=["idx_{0}".format(x) for x in label_col]
inputCols
['idx_Customer_subtype', 'idx_Avg_age', 'idx_Customer_main_type']
# Assembles multiple columns into a single vector
assembler = VectorAssembler(
    inputCols=inputCols,
    outputCol="features"
)
pipeline = Pipeline(stages=string_indexers + [assembler])
#pipeline = Pipeline(stages=string_indexers )
model = pipeline.fit(df)
new_df = model.transform(df)
new_df.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- idx_Customer_subtype: double (nullable = false)
 |-- idx_Avg_age: double (nullable = false)
 |-- idx_Customer_main_type: double (nullable = false)
 |-- features: vector (nullable = true)

new_df.show(3)
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|idx_Customer_subtype|idx_Avg_age|idx_Customer_main_type|      features|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|                 0.0|        1.0|                   0.0| [0.0,1.0,0.0]|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|                18.0|        1.0|                   0.0|[18.0,1.0,0.0]|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|                18.0|        1.0|                   0.0|[18.0,1.0,0.0]|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
only showing top 3 rows

#new_df.show(5)
pandasDF = new_df.toPandas()
pandasDF.head()
Customer_subtype Number_of_houses Avg_size_household Avg_age Customer_main_type Avg_Salary label idx_Customer_subtype idx_Avg_age idx_Customer_main_type features
0 Lower class large families 1 3 30-40 years Family with grown ups 44905 0 0.0 1.0 0.0 [0.0, 1.0, 0.0]
1 Mixed small town dwellers 1 2 30-40 years Family with grown ups 37575 0 18.0 1.0 0.0 [18.0, 1.0, 0.0]
2 Mixed small town dwellers 1 2 30-40 years Family with grown ups 27915 0 18.0 1.0 0.0 [18.0, 1.0, 0.0]
3 Modern, complete families 1 3 40-50 years Average Family 19504 0 4.0 0.0 1.0 [4.0, 0.0, 1.0]
4 Large family farms 1 4 30-40 years Farmers 34943 0 25.0 1.0 7.0 [25.0, 1.0, 7.0]
dfb=new_df
dfb.show(5)
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
|    Customer_subtype|Number_of_houses|Avg_size_household|    Avg_age|  Customer_main_type|Avg_Salary|label|idx_Customer_subtype|idx_Avg_age|idx_Customer_main_type|      features|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
|Lower class large...|               1|                 3|30-40 years|Family with grown...|     44905|    0|                 0.0|        1.0|                   0.0| [0.0,1.0,0.0]|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     37575|    0|                18.0|        1.0|                   0.0|[18.0,1.0,0.0]|
|Mixed small town ...|               1|                 2|30-40 years|Family with grown...|     27915|    0|                18.0|        1.0|                   0.0|[18.0,1.0,0.0]|
|Modern, complete ...|               1|                 3|40-50 years|      Average Family|     19504|    0|                 4.0|        0.0|                   1.0| [4.0,0.0,1.0]|
|  Large family farms|               1|                 4|30-40 years|             Farmers|     34943|    0|                25.0|        1.0|                   7.0|[25.0,1.0,7.0]|
+--------------------+----------------+------------------+-----------+--------------------+----------+-----+--------------------+-----------+----------------------+--------------+
only showing top 5 rows

new_df_b = find_outliers(dfb,inputCols)
new_df_b.printSchema()
root
 |-- Customer_subtype: string (nullable = true)
 |-- Number_of_houses: integer (nullable = true)
 |-- Avg_size_household: integer (nullable = true)
 |-- Avg_age: string (nullable = true)
 |-- Customer_main_type: string (nullable = true)
 |-- Avg_Salary: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- idx_Customer_subtype: double (nullable = false)
 |-- idx_Avg_age: double (nullable = false)
 |-- idx_Customer_main_type: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- total_outliers: integer (nullable = false)

pandasDFb = new_df_b.toPandas()
pandasDFb.head()
Customer_subtype Number_of_houses Avg_size_household Avg_age Customer_main_type Avg_Salary label idx_Customer_subtype idx_Avg_age idx_Customer_main_type features total_outliers
0 Lower class large families 1 3 30-40 years Family with grown ups 44905 0 0.0 1.0 0.0 [0.0, 1.0, 0.0] 0
1 Mixed small town dwellers 1 2 30-40 years Family with grown ups 37575 0 18.0 1.0 0.0 [18.0, 1.0, 0.0] 0
2 Mixed small town dwellers 1 2 30-40 years Family with grown ups 27915 0 18.0 1.0 0.0 [18.0, 1.0, 0.0] 0
3 Modern, complete families 1 3 40-50 years Average Family 19504 0 4.0 0.0 1.0 [4.0, 0.0, 1.0] 0
4 Large family farms 1 4 30-40 years Farmers 34943 0 25.0 1.0 7.0 [25.0, 1.0, 7.0] 0
pandasDFb[pandasDFb["total_outliers"] > 0]
Customer_subtype Number_of_houses Avg_size_household Avg_age Customer_main_type Avg_Salary label idx_Customer_subtype idx_Avg_age idx_Customer_main_type features total_outliers
5 Young and rising 1 2 20-30 years Living well 13064 0 6.0 4.0 5.0 [6.0, 4.0, 5.0] 1
63 Young and rising 2 1 20-30 years Living well 28875 0 6.0 4.0 5.0 [6.0, 4.0, 5.0] 1
69 Porchless seniors: no front yard 1 2 60-70 years Retired and Religious 31553 0 24.0 3.0 3.0 [24.0, 3.0, 3.0] 1
71 Young seniors in the city 1 2 60-70 years Cruising Seniors 10511 0 26.0 3.0 8.0 [26.0, 3.0, 8.0] 1
84 Young and rising 1 2 20-30 years Living well 37780 0 6.0 4.0 5.0 [6.0, 4.0, 5.0] 1
... ... ... ... ... ... ... ... ... ... ... ... ...
1888 Young and rising 1 2 20-30 years Living well 13647 0 6.0 4.0 5.0 [6.0, 4.0, 5.0] 1
1899 Own home elderly 1 1 60-70 years Cruising Seniors 22087 0 30.0 3.0 8.0 [30.0, 3.0, 8.0] 1
1925 Own home elderly 1 2 60-70 years Cruising Seniors 21040 0 30.0 3.0 8.0 [30.0, 3.0, 8.0] 1
1953 Mixed seniors 1 1 70-80 years Successful hedonists 15518 0 9.0 5.0 4.0 [9.0, 5.0, 4.0] 1
1956 Mixed seniors 1 2 60-70 years Retired and Religious 36748 0 9.0 3.0 3.0 [9.0, 3.0, 3.0] 1

103 rows × 12 columns

pandasDFb.head()
Customer_subtype Number_of_houses Avg_size_household Avg_age Customer_main_type Avg_Salary label idx_Customer_subtype idx_Avg_age idx_Customer_main_type features total_outliers
0 Lower class large families 1 3 30-40 years Family with grown ups 44905 0 0.0 1.0 0.0 [0.0, 1.0, 0.0] 0
1 Mixed small town dwellers 1 2 30-40 years Family with grown ups 37575 0 18.0 1.0 0.0 [18.0, 1.0, 0.0] 0
2 Mixed small town dwellers 1 2 30-40 years Family with grown ups 27915 0 18.0 1.0 0.0 [18.0, 1.0, 0.0] 0
3 Modern, complete families 1 3 40-50 years Average Family 19504 0 4.0 0.0 1.0 [4.0, 0.0, 1.0] 0
4 Large family farms 1 4 30-40 years Farmers 34943 0 25.0 1.0 7.0 [25.0, 1.0, 7.0] 0

</div>

data=pandasDFb
size_columns=len(inputCols)
fig,ax = plt.subplots(1,size_columns,figsize=(8,5))
for i in enumerate(inputCols):
    print(i)
    sns.boxplot(data=data, y=i[1],ax=ax[i[0]])
(0, 'idx_Customer_subtype')
(1, 'idx_Avg_age')
(2, 'idx_Customer_main_type')

png

inputCols
['idx_Customer_subtype', 'idx_Avg_age', 'idx_Customer_main_type']
def calculate_bounds(df):
    bounds = {
        c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
        for c,d in zip(df.columns, df.dtypes) if d[1] == "int" or d[1]=='double'
    }

    for c in bounds:
        iqr = bounds[c]['q3'] - bounds[c]['q1']
        bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
        bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

    return bounds
calculate_bounds(df)
{'Number_of_houses': {'q1': 1.0, 'q3': 1.0, 'min': 1.0, 'max': 1.0},
 'Avg_size_household': {'q1': 2.0, 'q3': 3.0, 'min': 0.5, 'max': 4.5},
 'Avg_Salary': {'q1': 20315.0, 'q3': 42949.0, 'min': -13636.0, 'max': 76900.0},
 'label': {'q1': 0.0, 'q3': 0.0, 'min': 0.0, 'max': 0.0}}
dfc=dfb.select(inputCols)
calculate_bounds(dfc)
{'idx_Customer_subtype': {'q1': 2.0, 'q3': 16.0, 'min': -19.0, 'max': 37.0},
 'idx_Avg_age': {'q1': 0.0, 'q3': 1.0, 'min': -1.5, 'max': 2.5},
 'idx_Customer_main_type': {'q1': 0.0, 'q3': 5.0, 'min': -7.5, 'max': 12.5}}
def single_flag_outliers(df, singleCol):
    c=singleCol
    dfc=df.select(c)
    bounds = calculate_bounds(dfc)
    outliers = {}
    
    return df.select(*df.columns,
        *[f.when(~f.col(c).between(bounds[c]['min'], bounds[c]['max']),"yes").otherwise("no").alias(c+'_outlier')]
    )
inputCols[0]
'idx_Customer_subtype'
res=single_flag_outliers(dfc,inputCols[0])
res.show(5)
+--------------------+-----------+----------------------+----------------------------+
|idx_Customer_subtype|idx_Avg_age|idx_Customer_main_type|idx_Customer_subtype_outlier|
+--------------------+-----------+----------------------+----------------------------+
|                 0.0|        1.0|                   0.0|                          no|
|                18.0|        1.0|                   0.0|                          no|
|                18.0|        1.0|                   0.0|                          no|
|                 4.0|        0.0|                   1.0|                          no|
|                25.0|        1.0|                   7.0|                          no|
+--------------------+-----------+----------------------+----------------------------+
only showing top 5 rows

def multiple_flag_outliers(df, columns):
    dfc=df.select(*columns)
    bounds = calculate_bounds(dfc)
    outliers = {}
    
    return df.select(*df.columns,*[
        f.when(~f.col(c).between(bounds[c]['min'], bounds[c]['max']),"yes").otherwise("no").alias(c+'_outlier') for c in columns
    ]
    )
res=multiple_flag_outliers(dfc,inputCols)
res.show(5)
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+
|idx_Customer_subtype|idx_Avg_age|idx_Customer_main_type|idx_Customer_subtype_outlier|idx_Avg_age_outlier|idx_Customer_main_type_outlier|
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+
|                 0.0|        1.0|                   0.0|                          no|                 no|                            no|
|                18.0|        1.0|                   0.0|                          no|                 no|                            no|
|                18.0|        1.0|                   0.0|                          no|                 no|                            no|
|                 4.0|        0.0|                   1.0|                          no|                 no|                            no|
|                25.0|        1.0|                   7.0|                          no|                 no|                            no|
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+
only showing top 5 rows

# specify column names
columns = inputCols
  
# creating a dataframe from the lists of data
dataframe = res
  
# select ID where ID less than 3
dataframe.select(*res.columns,'idx_Avg_age_outlier').where(dataframe.idx_Avg_age_outlier == 'yes').show()
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+-------------------+
|idx_Customer_subtype|idx_Avg_age|idx_Customer_main_type|idx_Customer_subtype_outlier|idx_Avg_age_outlier|idx_Customer_main_type_outlier|idx_Avg_age_outlier|
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+-------------------+
|                 6.0|        4.0|                   5.0|                          no|                yes|                            no|                yes|
|                 6.0|        4.0|                   5.0|                          no|                yes|                            no|                yes|
|                24.0|        3.0|                   3.0|                          no|                yes|                            no|                yes|
|                26.0|        3.0|                   8.0|                          no|                yes|                            no|                yes|
|                 6.0|        4.0|                   5.0|                          no|                yes|                            no|                yes|
|                30.0|        3.0|                   8.0|                          no|                yes|                            no|                yes|
|                16.0|        3.0|                   1.0|                          no|                yes|                            no|                yes|
|                 6.0|        4.0|                   5.0|                          no|                yes|                            no|                yes|
|                 5.0|        3.0|                   0.0|                          no|                yes|                            no|                yes|
|                30.0|        3.0|                   8.0|                          no|                yes|                            no|                yes|
|                16.0|        3.0|                   1.0|                          no|                yes|                            no|                yes|
|                 5.0|        3.0|                   0.0|                          no|                yes|                            no|                yes|
|                 1.0|        4.0|                   2.0|                          no|                yes|                            no|                yes|
|                 6.0|        4.0|                   5.0|                          no|                yes|                            no|                yes|
|                 7.0|        3.0|                   4.0|                          no|                yes|                            no|                yes|
|                 9.0|        5.0|                   3.0|                          no|                yes|                            no|                yes|
|                24.0|        4.0|                   3.0|                          no|                yes|                            no|                yes|
|                31.0|        4.0|                   9.0|                          no|                yes|                            no|                yes|
|                 9.0|        3.0|                   3.0|                          no|                yes|                            no|                yes|
|                19.0|        3.0|                   3.0|                          no|                yes|                            no|                yes|
+--------------------+-----------+----------------------+----------------------------+-------------------+------------------------------+-------------------+
only showing top 20 rows
pandasDFc = res.toPandas()
pandasDFc.head()
idx_Customer_subtype idx_Avg_age idx_Customer_main_type idx_Customer_subtype_outlier idx_Avg_age_outlier idx_Customer_main_type_outlier
0 0.0 1.0 0.0 no no no
1 18.0 1.0 0.0 no no no
2 18.0 1.0 0.0 no no no
3 4.0 0.0 1.0 no no no
4 25.0 1.0 7.0 no no no
dfd=pandasDFc
dfd[(dfd.idx_Customer_subtype_outlier == 'yes') | (dfd.idx_Avg_age_outlier == 'yes') | (dfd.idx_Customer_main_type_outlier == 'yes')]

idx_Customer_subtype idx_Avg_age idx_Customer_main_type idx_Customer_subtype_outlier idx_Avg_age_outlier idx_Customer_main_type_outlier
5 6.0 4.0 5.0 no yes no
63 6.0 4.0 5.0 no yes no
69 24.0 3.0 3.0 no yes no
71 26.0 3.0 8.0 no yes no
84 6.0 4.0 5.0 no yes no
... ... ... ... ... ... ...
1888 6.0 4.0 5.0 no yes no
1899 30.0 3.0 8.0 no yes no
1925 30.0 3.0 8.0 no yes no
1953 9.0 5.0 4.0 no yes no
1956 9.0 3.0 3.0 no yes no

103 rows × 6 columns

If you are interested to the removal outliers part you can see the whole notebook here, in the optional part.

Congratulations! We have identified the outliers in Pyspark.

Posted:

Leave a comment