This notebook demonstrates the basics of using SparkML for Machine Learning. We will build a few different regression models, tune their parameters and evaluate their performance.
We will be using the wine quality dataset from: https://archive.ics.uci.edu/ml/datasets/Wine+Quality, which captures various physical properties of wines (alcohol content, acidity etc) and their quality as apprised by wine experts (data/winequality-white.csv
).
Let's preview the data first:
%%sh
# preview the data file
head -n 5 data/winequality-white.csv
It's a semi-colon delimited text file with the header.
SparkML operates on DataFrames so we need to create one with our data:
# load contents of winequality-white.csv to spark DataFrma
# we need to specify the custom separator `;`
inputDF = spark.read.csv('data/winequality-white.csv',header='true', inferSchema='true', sep=';')
# let's see the schema and the number of rows
inputDF.printSchema()
print("Rows: %s" % inputDF.count())
Our dataframe is relatively small. In fact it is so small that we could easily analyse using standard python
tools. However the code below, which is currenlty running on a sigle computer can be easily run on large spark clusters to analyse dataframes with billions of rows.
Let's see as sample of the data:
display(inputDF.limit(5))
Here we will show a few example of using Spark ML to build regression models that predict the quality of wine based on its properties.
We have called the new dataframe dataDF
and applied caching to it. This will tell Spark to try to cache the data in memory for faster access.
We will now build a simple linear regression model.
Spark requires that all the predictors (features) are combined into a single feature vector.
We can use VectorAssembler
to build it from selected columns of our DataFrame
(we will use all properties):
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
# select the columns to be used as the features (all except `quality`)
featureColumns = [c for c in inputDF.columns if c != 'quality']
# create and configure the assembler
assembler = VectorAssembler(inputCols=featureColumns,
outputCol="features")
# transform the original data
dataDF = assembler.transform(inputDF)
dataDF.printSchema()
display(dataDF.limit(3))
We can use the assembler to transform
our input dataframe into one that includes the feature vector (features
) shown above.
Now we are ready to train a simple regession model:
from pyspark.ml.regression import LinearRegression
# fit a `LinearRegression` model using features in colum `features` and label in column `quality`
lr = LinearRegression(maxIter=30, regParam=0.3, elasticNetParam=0.3, featuresCol="features", labelCol="quality")
lrModel = lr.fit(dataDF)
We can now look at the linear regression the coefficients:
for t in zip(featureColumns, lrModel.coefficients):
print t
You may notice that for most properties the coefficients are zero, which means that they do not contribute (according to this specific model) to wine quality.
This model is using elastic net regularizations which naturally 'selects' the most important variables (as it in turns uses LASSO)
We can now use the model to make predictions:
# predict the quality, the predicted quality will be saved in `prediction` column
predictionsDF = lrModel.transform(dataDF)
display(predictionsDF.limit(3))
Now we can evaluate the performance of our model with Root Mean Squared Error:
from pyspark.ml.evaluation import RegressionEvaluator
# create a regression evaluator with RMSE metrics
evaluator = RegressionEvaluator(
labelCol='quality', predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictionsDF)
print("Root Mean Squared Error (RMSE) = %g" % rmse)
To get some idea of how good the model is we can try to compare it to the 'zero' model that always predicts the average quality
.
from pyspark.sql.functions import *
# calculate the average wine quality
avgQuality = inputDF.groupBy().avg('quality').first()[0]
print(avgQuality)
# compute the 'zero' model predictions
# `lit` function creates a 'literal' column that is column with the provided value in all rows
zeroModelPredictionsDF = dataDF.select(col('quality'), lit(avgQuality).alias('prediction'))
# evaluate the 'zero' model
zeroModelRmse = evaluator.evaluate(zeroModelPredictionsDF)
print("RMSE of 'zero model' = %g" % zeroModelRmse)
So we did a little bit better. But this is actually a very biased 'best case' estimate of the RMSE.
Why?
We will first split our data into traning and testing set and then train a regression model only on the traning set and use the test set for evaluation.
Rather then using our transformed dataDF
we will chaing the preprocessing (vector assembling) and model traning into a simple Pipeline
.
In general pipelines may include may steps dealing with feature preprocessing, extraction etc.
# split the input data into traning and test dataframes with 70% to 30% weights
(trainingDF, testDF) = inputDF.randomSplit([0.7, 0.3])
from pyspark.ml import Pipeline
# construct the `Pipeline` that with two stages: the `vector assembler` and `regresion model estimator`
pipeline = Pipeline(stages=[assembler, lr])
# train the pipleline on the traning data
lrPipelineModel = pipeline.fit(trainingDF)
# make predictions
traningPredictionsDF = lrPipelineModel.transform(trainingDF)
testPredictionsDF = lrPipelineModel.transform(testDF)
# evaluate the model on test and traning data
print("RMSE on traning data = %g" % evaluator.evaluate(traningPredictionsDF))
print("RMSE on test data = %g" % evaluator.evaluate(testPredictionsDF))
As we can see the the RMSE on test data is slighly worse than on the traning data. The difference is not significant though.
Why?
For many models their perfomance signficanly depends on the model parameters. They need to be chosen for each data set differnly and the process of finding the best parameters is called 'model tuning'.
In our previous example we used to parameters for LinearRegression
: regParam=0.3, elasticNetParam=0.3
with ad hoc values.
Perhaps we can get a better model if we can tune these parameters.
Spark ML comes with a ready to use parameter optimiser that uses cross-validation to select the best set of parameter from the provided search grid.
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
# create a search grid with the cross-product of the parameter values (9 pairs)
search_grid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.0, 0.3, 0.6]) \
.addGrid(lr.elasticNetParam, [0.4, 0.6, 0.8]).build()
# use `CrossValidator` to tune the model
cv = CrossValidator(estimator = pipeline, estimatorParamMaps = search_grid, evaluator = evaluator, numFolds = 3)
cvModel = cv.fit(trainingDF)
# evaluate the tuned model
cvTestPredictionsDF = cvModel.transform(testDF)
print("RMSE on test data with CV = %g" % evaluator.evaluate(cvTestPredictionsDF))
This model is slightly better than the guessed model.
We can also check the cross-validation estimates of RMSE for each of the tested models:
print(cvModel.avgMetrics)
Linear regression is a very simple model. Perhaps we can do better with a more complex one? Let's try to use RandomForest.
from pyspark.ml.regression import RandomForestRegressor
# define the random forest estimator
rf = RandomForestRegressor(featuresCol="features", labelCol="quality", numTrees=100, maxBins=128, maxDepth=20, \
minInstancesPerNode=5, seed=33)
rfPipeline = Pipeline(stages=[assembler, rf])
# train the random forest model
rfPipelineModel = rfPipeline.fit(trainingDF)
Let's evaluate the model:
rfTrainingPredictions = rfPipelineModel.transform(trainingDF)
rfTestPredictions = rfPipelineModel.transform(testDF)
print("Random Forest RMSE on traning data = %g" % evaluator.evaluate(rfTrainingPredictions))
print("Random Forest RMSE on test data = %g" % evaluator.evaluate(rfTestPredictions))
Random forest does indeed better than linear regression. Please also notice the difference bewteen the peformance on the traning ang testing sets.
We can 'extract' the actual random forest model from the Pipeline
stage. We can use it for example to look at variable importance measure:
rfModel = rfPipelineModel.stages[1]
rfModel.featureImportances
The model as well as the entire pipelines can be saved to disk and then loaded when needed.
# save the random forest pipeline to the disk
rfPipelineModel.write().overwrite().save('output/rf.model')
# load the andom forest pipeline from the dist
from pyspark.ml import PipelineModel
loadedModel = PipelineModel.load('output/rf.model')
loadedPredictionsDF = loadedModel.transform(testDF)
# evaluate the model again to see if we get the same performance
print("Loaded model RMSE = %g" % evaluator.evaluate(loadedPredictionsDF))
Here we will use Principal Component Analysis to reduce the data dimesionalit so that we can plot the in 2D space.
As before we will create a multi step pipeline that will:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import PCA
from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
all_assembler = VectorAssembler(
inputCols=featureColumns,
outputCol="features")
normalizer = Normalizer(inputCol="features", outputCol="norm_features")
pca = PCA(k=2, inputCol="norm_features", outputCol="pca_features")
pca_pipeline = Pipeline(stages=[all_assembler, normalizer, pca])
pca_model = pca_pipeline.fit(inputDF)
display(pca_model.transform(inputDF).select('features', 'norm_features', 'pca_features').limit(3))
The pca_features
column now contains the reduced 2D representation of all other features.
We can now use it to visualise a 30% sample of the data. The color represents quality of wine.
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (10,7)
pca_data = pca_model.transform(inputDF)
sampling_fraction = 0.5
pca_xy = np.matrix(map(lambda r:r.pca_features.array, pca_data.sample(False, sampling_fraction, 13).collect()))
pca_colors = map(lambda r: float(r.quality),inputDF.select('quality').sample(False, sampling_fraction, 13).collect())
plt.close()
plt.scatter(pca_xy[:,0], pca_xy[:,1], c=pca_colors, alpha=0.4, cmap=plt.get_cmap('RdYlGn'), edgecolors='none', s=50)
plt.grid(True)
plt.show()
display()