3.3 Machine Learning - Complex Features

This notebook demonstates how to work with complex data sets that include both numerical and categorical features.

We will use the Adults dataset which includes numerical and catgorical features of adults and our task is to predict if their income is above $50K. (data/adult.data.bz2)

In [1]:
# lets analyze the data set description to obtain the names of the features

import re
with open('data/adult.names.txt', 'r') as f:
    featureNames = [ line.split(':')[0] for line in f.readlines() if re.match(r'^[a-z\-]+:.*', line)]

print(featureNames)
columnNames = featureNames + ['income']
['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country']
In [2]:
# load the date to a DataFrame
inputDF = spark.createDataFrame(spark.read.csv('data/adult.data.bz2', inferSchema=True).rdd, columnNames)
inputDF.printSchema()
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)

In [3]:
display(inputDF.limit(3))
age workclass fnlwgt education education-num marital-status occupation relationship race sex capital-gain capital-loss hours-per-week native-country income
0 39 State-gov 77516.0 Bachelors 13.0 Never-married Adm-clerical Not-in-family White Male 2174.0 0.0 40.0 United-States <=50K
1 50 Self-emp-not-inc 83311.0 Bachelors 13.0 Married-civ-spouse Exec-managerial Husband White Male 0.0 0.0 13.0 United-States <=50K
2 38 Private 215646.0 HS-grad 9.0 Divorced Handlers-cleaners Not-in-family White Male 0.0 0.0 40.0 United-States <=50K
In [4]:
from pyspark.sql.functions import trim

# create the numerical label column ( 1.0 if income > 50K)
dataDF = inputDF.withColumn('label',trim(inputDF.income).startswith('>').cast('double'))
display(dataDF.limit(8))
age workclass fnlwgt education education-num marital-status occupation relationship race sex capital-gain capital-loss hours-per-week native-country income label
0 39 State-gov 77516.0 Bachelors 13.0 Never-married Adm-clerical Not-in-family White Male 2174.0 0.0 40.0 United-States <=50K 0.0
1 50 Self-emp-not-inc 83311.0 Bachelors 13.0 Married-civ-spouse Exec-managerial Husband White Male 0.0 0.0 13.0 United-States <=50K 0.0
2 38 Private 215646.0 HS-grad 9.0 Divorced Handlers-cleaners Not-in-family White Male 0.0 0.0 40.0 United-States <=50K 0.0
3 53 Private 234721.0 11th 7.0 Married-civ-spouse Handlers-cleaners Husband Black Male 0.0 0.0 40.0 United-States <=50K 0.0
4 28 Private 338409.0 Bachelors 13.0 Married-civ-spouse Prof-specialty Wife Black Female 0.0 0.0 40.0 Cuba <=50K 0.0
5 37 Private 284582.0 Masters 14.0 Married-civ-spouse Exec-managerial Wife White Female 0.0 0.0 40.0 United-States <=50K 0.0
6 49 Private 160187.0 9th 5.0 Married-spouse-absent Other-service Not-in-family Black Female 0.0 0.0 16.0 Jamaica <=50K 0.0
7 52 Self-emp-not-inc 209642.0 HS-grad 9.0 Married-civ-spouse Exec-managerial Husband White Male 0.0 0.0 45.0 United-States >50K 1.0
In [5]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType

# detect the categorial features from the schema
catrgoricalFeatures = [ f.name for f in dataDF.schema.fields if f.dataType == StringType() and f.name!='income']
print(catrgoricalFeatures)

# create the categorical values indexer
indexerPipeline = Pipeline(stages = [ StringIndexer(inputCol=feature, 
                                            outputCol= "cat_%s"% feature, handleInvalid='skip') for feature in catrgoricalFeatures])

pipelineModel = indexerPipeline.fit(dataDF)
indexed_df = pipelineModel.transform(dataDF)
indexed_df.printSchema()
['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country']
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: double (nullable = true)

In [6]:
from pyspark.ml.feature import VectorAssembler, VectorIndexer

# assemble and mark the categorical features

categorical_assembler = Pipeline(stages = [
    VectorAssembler(inputCols = ["cat_%s"%c for c in catrgoricalFeatures ], outputCol='cat_vector'),                    
    VectorIndexer(inputCol='cat_vector', outputCol='cat_features')
])

categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()
cat_df.select('cat_features').show(5)
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: double (nullable = true)
 |-- cat_vector: vector (nullable = true)
 |-- cat_features: vector (nullable = true)

+--------------------+
|        cat_features|
+--------------------+
|[4.0,2.0,1.0,3.0,...|
|(8,[0,1,3],[1.0,2...|
|(8,[2,3,4],[2.0,9...|
|(8,[1,3,5],[5.0,9...|
|[0.0,2.0,0.0,0.0,...|
+--------------------+
only showing top 5 rows

In [7]:
# assemble both numerical and categorical features
feature_assembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week', 'cat_features'] , outputCol='features')
feature_df = feature_assembler.transform(cat_df)

feature_df.printSchema()
feature_df.select('features').show(5)
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: double (nullable = true)
 |-- cat_vector: vector (nullable = true)
 |-- cat_features: vector (nullable = true)
 |-- features: vector (nullable = true)

+--------------------+
|            features|
+--------------------+
|[39.0,77516.0,40....|
|(11,[0,1,2,3,4,6]...|
|(11,[0,1,2,5,6,7]...|
|(11,[0,1,2,4,6,8]...|
|[28.0,338409.0,40...|
+--------------------+
only showing top 5 rows

In [8]:
# split data into traning and testing sets with stratified sampline by label
trainingDF = dataDF.sampleBy('label', fractions = {0.0: 0.7, 1.0:0.7}).cache()
testDF = dataDF.subtract(trainingDF).cache()
In [9]:
from pyspark.sql.functions import col

# just check that it worked
print(trainingDF.count())
print(trainingDF.where(col('label') == 1.0).count())
print(testDF.count())
print(testDF.where(col('label') == 1.0).count())
22658
5414
9890
2425
In [10]:
from pyspark.ml.classification import RandomForestClassifier

# build and train random forest classifier
rfClassifier = RandomForestClassifier(featuresCol='features', labelCol='label', maxBins=50)
rfPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, feature_assembler, rfClassifier])
rfPipelineModel = rfPipeline.fit(trainingDF)
In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# evaluate random forest AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Random forest AUC: %s" % evaluator.evaluate(rfPipelineModel.transform(testDF)))
Random forest AUC: 0.886826830643
In [12]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluate random forest accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
print("Random forest accuracy: %s" % evaluator.evaluate(rfPipelineModel.transform(testDF)))
Random forest accuracy: 0.824350288199
In [13]:
predictionDF = rfPipelineModel.transform(testDF).select('label','prediction').cache()
In [14]:
# compute the confusion matrix
tpr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==1.0)).count()
tnr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==0.0)).count()
fpr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==1.0)).count()
fnr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==0.0)).count()

print(tnr, fpr)
print(fnr, tpr)
(7104, 360)
(1377, 1048)

To use logistic regression we need to encode categorical features differently - using one hot encoding

In [15]:
from pyspark.ml.feature import OneHotEncoder

# create and test the OneHotEncoder for categorial features
categorical_assembler = Pipeline(stages = [ OneHotEncoder(inputCol="cat_%s" % feature, 
                                            outputCol= "enc_%s"% feature) for feature in catrgoricalFeatures])

categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()
root
 |-- age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- education-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- label: double (nullable = true)
 |-- cat_workclass: double (nullable = true)
 |-- cat_education: double (nullable = true)
 |-- cat_marital-status: double (nullable = true)
 |-- cat_occupation: double (nullable = true)
 |-- cat_relationship: double (nullable = true)
 |-- cat_race: double (nullable = true)
 |-- cat_sex: double (nullable = true)
 |-- cat_native-country: double (nullable = true)
 |-- enc_workclass: vector (nullable = true)
 |-- enc_education: vector (nullable = true)
 |-- enc_marital-status: vector (nullable = true)
 |-- enc_occupation: vector (nullable = true)
 |-- enc_relationship: vector (nullable = true)
 |-- enc_race: vector (nullable = true)
 |-- enc_sex: vector (nullable = true)
 |-- enc_native-country: vector (nullable = true)

In [16]:
from pyspark.ml.classification import LogisticRegression


# train the logistic regression classifier
lrClassifier = LogisticRegression(featuresCol='features', labelCol='label')
encFetureAssembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week'] +
                               ["enc_%s"% feature for feature in catrgoricalFeatures ]      
                                     , outputCol='features')
lrPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, encFetureAssembler, lrClassifier])
lrPipeline = lrPipeline.fit(trainingDF)
In [17]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Logisting regression AUC: %s" % evaluator.evaluate(lrPipeline.transform(testDF)))
Logisting regression AUC: 0.891875117402