This notebook demonstated building a simple classifier for text data.
We will use part of the Enron email SPAM dataset and build a SPAM/HAM classifier.
As the first step we need to preprocess the data to convert them to a format suitable for distributed processing.
The original data comes in two directories ham
and spam
each contaning many small files - an email per file.
We will convert these to Hadoop Sequence Files.
import zipfile
from os import path
def zip2text(filename):
def do(zipinfoItertor):
with zipfile.ZipFile(filename, 'r') as zf:
for zi in zipinfoItertor:
with zf.open(zi) as zfe:
yield (zi.filename, zfe.read())
return do
with zipfile.ZipFile('data/ham.zip', 'r') as zf:
ham = sc.parallelize(zf.infolist()) \
.mapPartitions(zip2text('data/ham.zip'))
with zipfile.ZipFile('data/spam.zip', 'r') as zf:
spam = sc.parallelize(zf.infolist()) \
.mapPartitions(zip2text('data/spam.zip'))
%%sh
rm -rf output/ham.seq output/spam.seq
ham.saveAsSequenceFile('output/ham.seq')
spam.saveAsSequenceFile('output/spam.seq')
%%sh
ls output/ham.seq
Now we can load the data to a DataFrame
from pyspark.sql import *
# load the 'ham' data
df_ham = sc.sequenceFile('output/ham.seq') \
.map(lambda (f,t):Row(label=0.0, filename=f, text=t)).toDF()
# load the 'spam' data
df_spam = sc.sequenceFile('output/spam.seq') \
.map(lambda (f,t):Row(label=1.0, filename=f, text=t)).toDF()
# combine the two datasets
df_data = df_ham.union(df_spam).cache()
display(df_data.limit(5))
Let's build our feature extraction pipeline, which involves:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
hashingTF = HashingTF(numFeatures=100, inputCol='tokens', outputCol='tf')
idf = IDF(minDocFreq=3, inputCol='tf', outputCol='idf')
# build the feature extraction pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])
pipelineModel = pipeline.fit(df_data)
df_data_tf = pipelineModel.transform(df_data)
display(df_data_tf.select('tokens', 'tf','idf').limit(5))
We will use logistic regression to train the classification model
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# split the data into the traning and testing sets
(trainData, testData) = df_data.randomSplit([0.7, 0.3])
# construct and train the logistic regression pipeline
lr = LogisticRegression(featuresCol='idf', labelCol='label')
lrPipeline = Pipeline(stages= [pipeline, lr])
lrPipelineModel = lrPipeline.fit(trainData)
# evaluate the logistic regression model (the default metric is 'AUC')
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Logistic regression AUC: %s" % evaluator.evaluate(lrPipelineModel.transform(testData)))
Let's see if a more complex model (random forest) does better:
from pyspark.ml.classification import RandomForestClassifier
# construct and train the random forest pipeline
rf = RandomForestClassifier(featuresCol = 'idf', labelCol='label')
rfPipeline = Pipeline(stages= [pipeline, rf])
rfPipelineModel = rfPipeline.fit(trainData)
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Random Forest AUC: %s" % evaluator.evaluate(rfPipelineModel.transform(testData)))