3.2 Machine Learnign - Text Classification

This notebook demonstated building a simple classifier for text data.

We will use part of the Enron email SPAM dataset and build a SPAM/HAM classifier.

As the first step we need to preprocess the data to convert them to a format suitable for distributed processing.

The original data comes in two directories ham and spam each contaning many small files - an email per file.

We will convert these to Hadoop Sequence Files.

In [1]:
import zipfile
from os import path
In [2]:
def zip2text(filename):
    def do(zipinfoItertor):
        with zipfile.ZipFile(filename, 'r') as zf:
            for zi in zipinfoItertor:
                with zf.open(zi) as zfe:
                    yield (zi.filename, zfe.read())
    return do
In [3]:
with zipfile.ZipFile('data/ham.zip', 'r') as zf:
    ham = sc.parallelize(zf.infolist()) \
        .mapPartitions(zip2text('data/ham.zip'))

with zipfile.ZipFile('data/spam.zip', 'r') as zf:
    spam = sc.parallelize(zf.infolist()) \
        .mapPartitions(zip2text('data/spam.zip'))
In [4]:
%%sh
rm -rf output/ham.seq output/spam.seq
In [5]:
ham.saveAsSequenceFile('output/ham.seq')
spam.saveAsSequenceFile('output/spam.seq')
In [6]:
%%sh 
ls output/ham.seq
_SUCCESS
part-00000
part-00001
part-00002
part-00003

Now we can load the data to a DataFrame

In [7]:
from pyspark.sql import *

# load the 'ham' data
df_ham = sc.sequenceFile('output/ham.seq') \
    .map(lambda (f,t):Row(label=0.0, filename=f, text=t)).toDF()

# load the 'spam' data
df_spam = sc.sequenceFile('output/spam.seq') \
    .map(lambda (f,t):Row(label=1.0, filename=f, text=t)).toDF()

# combine the two datasets    
df_data = df_ham.union(df_spam).cache()
display(df_data.limit(5))
filename label text
0 0001.1999-12-10.farmer.ham.txt 0.0 Subject: christmas tree farm pictures\r\n
1 0002.1999-12-13.farmer.ham.txt 0.0 Subject: vastar resources , inc .\r\ngary , pr...
2 0003.1999-12-14.farmer.ham.txt 0.0 Subject: calpine daily gas nomination\r\n- cal...
3 0004.1999-12-14.farmer.ham.txt 0.0 Subject: re : issue\r\nfyi - see note below - ...
4 0005.1999-12-14.farmer.ham.txt 0.0 Subject: meter 7268 nov allocation\r\nfyi .\r\...

Let's build our feature extraction pipeline, which involves:

  • tokenizing each email
  • creating a token frequency vector for each eamil
  • applying the IDF (inverse document requency)
In [8]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol='text', outputCol='tokens')
hashingTF = HashingTF(numFeatures=100, inputCol='tokens', outputCol='tf')
idf = IDF(minDocFreq=3, inputCol='tf', outputCol='idf')

# build the feature extraction pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])

pipelineModel  = pipeline.fit(df_data)
df_data_tf = pipelineModel.transform(df_data)
display(df_data_tf.select('tokens', 'tf','idf').limit(5))
tokens tf idf
0 [subject:, christmas, tree, farm, pictures] (0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 0.0, 0.0, 0.85227187019, 0.0, 0.0, 0.0, ...
1 [subject:, vastar, resources, ,, inc, ., , gar... (0.0, 0.0, 6.0, 0.0, 5.0, 12.0, 3.0, 33.0, 0.0... (0.0, 0.0, 4.14382370679, 0.0, 3.33880857933, ...
2 [subject:, calpine, daily, gas, nomination, , ... (0.0, 2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ... (0.0, 1.74576342699, 0.0, 0.0, 0.0, 0.0, 0.0, ...
3 [subject:, re, :, issue, , fyi, -, see, note, ... (6.0, 3.0, 2.0, 0.0, 0.0, 6.0, 3.0, 34.0, 0.0,... (4.65055555743, 2.61864514049, 1.38127456893, ...
4 [subject:, meter, 7268, nov, allocation, , fyi... (0.0, 0.0, 2.0, 1.0, 3.0, 4.0, 3.0, 26.0, 0.0,... (0.0, 0.0, 1.38127456893, 0.85227187019, 2.003...

We will use logistic regression to train the classification model

In [9]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# split the data into the traning and testing sets
(trainData, testData) = df_data.randomSplit([0.7, 0.3])
In [10]:
# construct and train the logistic regression pipeline

lr = LogisticRegression(featuresCol='idf', labelCol='label')
lrPipeline = Pipeline(stages= [pipeline, lr])
lrPipelineModel = lrPipeline.fit(trainData)
In [11]:
# evaluate the logistic regression model (the default metric is 'AUC')
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Logistic regression AUC: %s" % evaluator.evaluate(lrPipelineModel.transform(testData)))
Logistic regression AUC: 0.922094149048

Let's see if a more complex model (random forest) does better:

In [12]:
from pyspark.ml.classification import RandomForestClassifier

# construct and train the random forest pipeline
rf = RandomForestClassifier(featuresCol = 'idf', labelCol='label')
rfPipeline = Pipeline(stages= [pipeline, rf])
rfPipelineModel = rfPipeline.fit(trainData)
In [13]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Random Forest AUC: %s" % evaluator.evaluate(rfPipelineModel.transform(testData)))
Random Forest AUC: 0.918341357895