This notebook demonstrates various ways of analysing data with SparkSQL DataFrames.
We will analyse the Air Temperature data set from BOM,  which contains historial maximum daily temperatures from multiple weather stations in New South Wales going back to the 19th century.
Let's have look at the data first (data/nsw_temp.csv file):
%%sh 
head -n 5 data/nsw_temp.csv
#setup display options for local mode notebooks
import pandas as pd
pd.set_option('display.max_rows', 5)
# load the 
badBomDF = spark.read.csv('data/nsw_temp.csv', inferSchema='True', header=True)
badBomDF.printSchema()
# will fail because the file is corrupted
# badBomDF.groupBy().avg('Maximum temperature (Degree C)')
As mentioned in the previous section this file is actually corrupted as it has multiple header lines embedde in the data.
We can clean it up by filtering out all the header lines.
%%sh
# remove the condioned file if exists
rm -rf 'output/nsw_temp_ok.csv'
# select only the data lines (starting with IDC) and save the correct file to `output/nsw_temp_ok.csv`
sc.textFile('data/nsw_temp.csv').filter(lambda l:l.startswith('IDC')).saveAsTextFile('output/nsw_temp_ok.csv')
# create a DataFrame with customised column names from an RDD
bomDF = spark.createDataFrame(spark.read.csv('output/nsw_temp_ok.csv', inferSchema='True', header=False).rdd, 
        schema = ['product_id', 'station_id', 'year', 'month', 'day', 'max_temp', 'days_of_acc', 'quality'])
# cache the DataFrame for performance
bomDF.cache()
bomDF.printSchema()
# select the average `max_temp` (from all data)
bomDF.groupBy().avg('max_temp')
# count the number of different 'year' values (in all data)
bomDF.select('year').distinct().count()
# select the number of observations for each year
bomDF.groupBy('year').count().sort('year')
# visualise the number of observations per year
from matplotlib import pyplot as plt
plt.rcParams["figure.figsize"] = (8,6)
plt.close()
bomDF.groupBy('year').count().sort('year').toPandas().set_index('year').plot()
display()
# find the station with most observations
from pyspark.sql.functions import *
bomDF.groupBy('station_id').count().sort(desc('count')).limit(1)
# find the yearly average and standard deviation of `max_temp` for station `66062`
bomDF.where(col('station_id') == 66062).groupBy('year') \
    .agg(avg('max_temp').alias('avg_max_temp'), stddev('max_temp').alias('sd_max_temp')).sort('year')
# ind the monthly average and standard deviation of `max_temp` for all station 
# in the years between 1990 and 2001
tempByStationAndMonth = bomDF.where((col('year') > 1990) &  (col('year')< 2001)) \
    .groupBy(col('station_id'), col('month')) \
    .agg(avg('max_temp').alias('avg_max_temp'), stddev('max_temp').alias('sd_max_temp')) \
    .sort(col('station_id'), col('month'))
tempByStationAndMonth
# visualise the average monthly `max_temp` per station
import pandas as pd
tempByStationAndMonthPD = tempByStationAndMonth.toPandas()
plt.close()
tempByStationAndMonthPD.set_index(['month', 'station_id']).unstack()['avg_max_temp'].plot()
display()
from pyspark.sql.functions import *
from pyspark.sql.types import *
# define a UDF uncertanity 
uncertanity = udf(lambda temp, quality: temp * (0.1 if quality == 'Y' else 0.3) if temp is not None else None 
                  , DoubleType())
# apply the UDF uncertanity
bomDF.select(uncertanity(col('max_temp'), col('quality')).alias('uncertanity'))
Window functions allow users of Spark SQL to calculate results such as the rank of a given row or a moving average over a range of input row. For more information on Window functions please see: Introduction to Window Functions
# extend the bom DataFrame with the `date` column
bomWithDateDF = bomDF.withColumn('date', 
                                 to_date(format_string("%04d-%02d-%02d", col('year'), col('month'), col('day'))))        
from pyspark.sql import Window
# rank the stations each day in year 2000 based on the `max_temp` 
# (the stations with higher temperatures are ranked higher)
bomRankDF = bomWithDateDF.dropna().select(col('date'), col('station_id'), col('max_temp'),  rank() \
            .over(Window.partitionBy(col('date')).orderBy(desc('max_temp')) \
                     .rowsBetween(Window.unboundedPreceding, Window.currentRow)).alias('rank')) \
    .where(col('year')==2000).sort('date', 'rank', 'station_id')
display(bomRankDF.limit(10))
# find the number of days in the year 2000 when each station was the `hottest` one
hotestStationDF = bomRankDF.where(col('rank') == 1).groupBy('station_id').count().sort(desc('count'))
display(hotestStationDF)
plt.close()
hotestStationDF.toPandas().set_index('station_id').plot(kind='bar')
display()
# use Window functions to calculate 14 days moving average for station `66062` in the year `2000`
from pyspark.sql import Window
plt.close()
bomWithDateDF.where((col('station_id') == 66062) & (col('year') == 2000)) \
             .select(col('date'), col('max_temp'), avg(col('max_temp')) \
             .over(Window.orderBy(col('date')) \
                   .partitionBy(col('station_id')).rowsBetween(-14,Window.currentRow )).alias('run_avg')).sort('date')\
    .toPandas().set_index('date').plot()
display()
SparkSQL DataFrames can be also queries with SQL (Structured Query Language), commonly used in to query relational databases.
# register the DataFrame as a table named `bom`
bomDF.registerTempTable('bom')
# run a simple query that counts number of rows in table `bom`
display(sql('SELECT COUNT(*) FROM bom'))
# run a query that computes the average monthly `max_temp` for all stations 
# in years after 2000.
display(sql('''SELECT station_id, month, avg(max_temp) AS avg_max_temperature FROM bom
        WHERE year > 2000 GROUP by station_id, month ORDER BY station_id, month
    '''))
In the Databricks Platform the SQL queries can be entered direclty in %sql cells.
#%sql
#SELECT station_id, month, avg(max_temp) AS avg_max_temperature FROM bom
#        WHERE year > 2000 GROUP by station_id, month ORDER BY station_id, month