This notebook demonstates how to work with complex data sets that include both numerical and categorical features.
We will use the Adults
dataset which includes numerical and catgorical features of adults and our task is to predict if their income is above $50K. (data/adult.data.bz2
)
# lets analyze the data set description to obtain the names of the features
import re
with open('data/adult.names.txt', 'r') as f:
featureNames = [ line.split(':')[0] for line in f.readlines() if re.match(r'^[a-z\-]+:.*', line)]
print(featureNames)
columnNames = featureNames + ['income']
# load the date to a DataFrame
inputDF = spark.createDataFrame(spark.read.csv('data/adult.data.bz2', inferSchema=True).rdd, columnNames)
inputDF.printSchema()
display(inputDF.limit(3))
from pyspark.sql.functions import trim
# create the numerical label column ( 1.0 if income > 50K)
dataDF = inputDF.withColumn('label',trim(inputDF.income).startswith('>').cast('double'))
display(dataDF.limit(8))
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.types import StringType
# detect the categorial features from the schema
catrgoricalFeatures = [ f.name for f in dataDF.schema.fields if f.dataType == StringType() and f.name!='income']
print(catrgoricalFeatures)
# create the categorical values indexer
indexerPipeline = Pipeline(stages = [ StringIndexer(inputCol=feature,
outputCol= "cat_%s"% feature, handleInvalid='skip') for feature in catrgoricalFeatures])
pipelineModel = indexerPipeline.fit(dataDF)
indexed_df = pipelineModel.transform(dataDF)
indexed_df.printSchema()
from pyspark.ml.feature import VectorAssembler, VectorIndexer
# assemble and mark the categorical features
categorical_assembler = Pipeline(stages = [
VectorAssembler(inputCols = ["cat_%s"%c for c in catrgoricalFeatures ], outputCol='cat_vector'),
VectorIndexer(inputCol='cat_vector', outputCol='cat_features')
])
categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()
cat_df.select('cat_features').show(5)
# assemble both numerical and categorical features
feature_assembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week', 'cat_features'] , outputCol='features')
feature_df = feature_assembler.transform(cat_df)
feature_df.printSchema()
feature_df.select('features').show(5)
# split data into traning and testing sets with stratified sampline by label
trainingDF = dataDF.sampleBy('label', fractions = {0.0: 0.7, 1.0:0.7}).cache()
testDF = dataDF.subtract(trainingDF).cache()
from pyspark.sql.functions import col
# just check that it worked
print(trainingDF.count())
print(trainingDF.where(col('label') == 1.0).count())
print(testDF.count())
print(testDF.where(col('label') == 1.0).count())
from pyspark.ml.classification import RandomForestClassifier
# build and train random forest classifier
rfClassifier = RandomForestClassifier(featuresCol='features', labelCol='label', maxBins=50)
rfPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, feature_assembler, rfClassifier])
rfPipelineModel = rfPipeline.fit(trainingDF)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# evaluate random forest AUC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Random forest AUC: %s" % evaluator.evaluate(rfPipelineModel.transform(testDF)))
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# evaluate random forest accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
print("Random forest accuracy: %s" % evaluator.evaluate(rfPipelineModel.transform(testDF)))
predictionDF = rfPipelineModel.transform(testDF).select('label','prediction').cache()
# compute the confusion matrix
tpr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==1.0)).count()
tnr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==0.0)).count()
fpr = predictionDF.where((col('label') == 0.0) & (col('prediction') ==1.0)).count()
fnr = predictionDF.where((col('label') == 1.0) & (col('prediction') ==0.0)).count()
print(tnr, fpr)
print(fnr, tpr)
To use logistic regression we need to encode categorical features differently - using one hot encoding
from pyspark.ml.feature import OneHotEncoder
# create and test the OneHotEncoder for categorial features
categorical_assembler = Pipeline(stages = [ OneHotEncoder(inputCol="cat_%s" % feature,
outputCol= "enc_%s"% feature) for feature in catrgoricalFeatures])
categorical_assembler_model = categorical_assembler.fit(indexed_df)
cat_df = categorical_assembler_model.transform(indexed_df)
cat_df.printSchema()
from pyspark.ml.classification import LogisticRegression
# train the logistic regression classifier
lrClassifier = LogisticRegression(featuresCol='features', labelCol='label')
encFetureAssembler = VectorAssembler(inputCols = ['age', 'fnlwgt', 'hours-per-week'] +
["enc_%s"% feature for feature in catrgoricalFeatures ]
, outputCol='features')
lrPipeline = Pipeline(stages = [indexerPipeline, categorical_assembler, encFetureAssembler, lrClassifier])
lrPipeline = lrPipeline.fit(trainingDF)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Logisting regression AUC: %s" % evaluator.evaluate(lrPipeline.transform(testDF)))