2.1 Structured Data Introduction

This notebook demonstrates the basic of processing of structured data with Spark SQL.

Spark SQL is a higer level API for working with structured data. The data are represented in DataFrames - table like object with columns and rows conceptually similar to panadas or R data fames.

spark is the main entry point for SparkSQL related operations. It is an instance of SparkSession and pyspark automatically creates one for you.

Let's look a some way to create and display DataFrames:

In [1]:
# import pyspark.sql classes and functions
from pyspark.sql import *


# The most direct way to create a DataFrame is to 
# build is from a list of `Row`s

# Create a few `Row`s describing product items 

item1 = Row(id=1, name="iPhone 6", price=1000.00)
item2 = Row(id=2, name="iPhone 7", price=1200.00)
item3 = Row(id=2, name="Samsung Galaxy", price=900.00)

# Create a `DataFrame` from the list `Row`
itemsDF = spark.createDataFrame([item1,  item2, item3])

# Each `DataFrame` is associate with a `schema` which defines names 
# and types of columns in the DataFrame. 
# `createDataFrame` by default infers the schema from the provided Rows 
# but later we will see how to specify the schema explicitely.

# Let's print out the schema

print("Inferred schema:")

itemsDF.printSchema()

# Display the DataFrame with the `show()` function

itemsDF.show()
Inferred schema:
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)

+---+--------------+------+
| id|          name| price|
+---+--------------+------+
|  1|      iPhone 6|1000.0|
|  2|      iPhone 7|1200.0|
|  2|Samsung Galaxy| 900.0|
+---+--------------+------+

We can also define rows with specific types:

In [2]:
# Create a Review Row definition
Review = Row('date', 'product_id', 'user_id', 'rating', 'comment')

# Create some Reviews

review1 = Review('2017-01-01', 1, 'jonh', 4, 'Very nice')
review2 = Review('2017-01-02', 1, 'karl', 3, None)
review3 = Review('2017-01-02', 1, 'adam', 5, 'Super')
review4 = Review('2017-01-03', 2, 'greg', 3, None)

# Create a `DataFrame` from the list Rows with infered schema

reviewsDF = spark.createDataFrame([review1,  review2, review3, review4])

# Print out the inferred schema
reviewsDF.printSchema()
root
 |-- date: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- rating: long (nullable = true)
 |-- comment: string (nullable = true)

If the inferred schema is not satisfactory we can define the explicit one:

In [3]:
from pyspark.sql.types import *

# Define the explicit schema with the column name, types and optionallity
reviewSchema = StructType([
    StructField('date', StringType(), True),
    StructField('product_id', LongType(), False),
    StructField('user_id', StringType(), False),
    StructField('rating', IntegerType(), True),
    StructField('text', StringType(), True)
])

# Create a `DataFrame` from the list Rows with specified schema
reviewsDF = spark.createDataFrame([review1,  review2, review3, review4], schema=reviewSchema)
reviewsDF.printSchema()

# We can use the `display` for a nicely formatted preview of the DataFrame content
display(reviewsDF)
root
 |-- date: string (nullable = true)
 |-- product_id: long (nullable = false)
 |-- user_id: string (nullable = false)
 |-- rating: integer (nullable = true)
 |-- text: string (nullable = true)

date product_id user_id rating text
0 2017-01-01 1 jonh 4 Very nice
1 2017-01-02 1 karl 3 None
2 2017-01-02 1 adam 5 Super
3 2017-01-03 2 greg 3 None

Now let's have a look at some basic operations and way of working with DataFrames:

In [4]:
# count the rows in the DataFrame
reviewsDF.count()
Out[4]:
4
In [5]:
# We can use transoformation operations to produce new `DataFrames`
# for example use `filter` to include only rows for which column `product_id` has value 1 
# and `sort` to oder the results by the `rating` column.
# As was the case with RDDs the transformations are `lazy`.

filteredDF = reviewsDF.filter(reviewsDF.product_id == 1).sort(reviewsDF.rating)
display(filteredDF)
date product_id user_id rating text
0 2017-01-02 1 karl 3 None
1 2017-01-01 1 jonh 4 Very nice
2 2017-01-02 1 adam 5 Super
In [6]:
from pyspark.sql.functions import col

# We can also `filter' using complex expressions,
# for example: Reviews with `product_id` == 1 and `user_id` == "jonh"
# (Use `&` instead of `and`  and `|` instead of or)

# The are two ways to reference columns in expressions: either using the `col` function 
# with the colum name (as below) or by referring to its DataFrame (above)

filteredDF = reviewsDF.filter((col('product_id') == 1) & (col('user_id') == 'jonh'))
display(filteredDF)
date product_id user_id rating text
0 2017-01-01 1 jonh 4 Very nice
In [7]:
from pyspark.sql.functions import desc, log

# We can use `select` choose the columns or to create derrived columns 
# and `alias` to rename them.

# Here we create a derrived column `logRating` from the log(`rating`)

selectedDF = reviewsDF \
    .select(col('product_id'), col('user_id'), log(col('rating')).alias('logRating')) \
    .sort(desc('logRating'))
display(selectedDF)
product_id user_id logRating
0 1 adam 1.609438
1 1 jonh 1.386294
2 1 karl 1.098612
3 2 greg 1.098612
In [8]:
# We can group the data by a colums (or columns) and compute aggregate statistics for each group
# for example the average `rating` for each `product_id`:

grouppedDF = reviewsDF.groupBy(col('product_id')).avg('rating')
display(grouppedDF)
product_id avg(rating)
0 1 4.0
1 2 3.0
In [9]:
# We can joing two `DataFrames` together on a common column 
# (here the `product_id`)

reviewsWithItemsDF = reviewsDF.join(itemsDF,  itemsDF.id == reviewsDF.product_id)
display(reviewsWithItemsDF)
date product_id user_id rating text id name price
0 2017-01-01 1 jonh 4 Very nice 1 iPhone 6 1000.0
1 2017-01-02 1 karl 3 None 1 iPhone 6 1000.0
2 2017-01-02 1 adam 5 Super 1 iPhone 6 1000.0
3 2017-01-03 2 greg 3 None 2 iPhone 7 1200.0
4 2017-01-03 2 greg 3 None 2 Samsung Galaxy 900.0
In [10]:
# we can convert (small) SparkSQL `DataFrames` to `pandas` data frames

reviewsWithItemsPD = reviewsWithItemsDF.limit(3).toPandas()
print(reviewsWithItemsPD)
         date  product_id user_id  rating       text  id      name   price
0  2017-01-01           1    jonh       4  Very nice   1  iPhone 6  1000.0
1  2017-01-02           1    karl       3       None   1  iPhone 6  1000.0
2  2017-01-02           1    adam       5      Super   1  iPhone 6  1000.0
In [11]:
# Finally we can save a `DataFrame` in one of the supported formats, 
# for example `csv`

reviewsDF.write.csv('output/reviews.csv', mode='overwrite')

Let's preview the results:

In [12]:
%%sh

ls -l output/reviews.csv

echo
echo "Content:"

cat output/reviews.csv/*.csv
total 32
-rw-r--r--  1 szu004  staff   0 10 Jul 16:25 _SUCCESS
-rw-r--r--  1 szu004  staff  30 10 Jul 16:25 part-00000-17ad680f-9ea3-4e5b-9147-fcea196092af.csv
-rw-r--r--  1 szu004  staff  21 10 Jul 16:25 part-00001-17ad680f-9ea3-4e5b-9147-fcea196092af.csv
-rw-r--r--  1 szu004  staff  26 10 Jul 16:25 part-00002-17ad680f-9ea3-4e5b-9147-fcea196092af.csv
-rw-r--r--  1 szu004  staff  21 10 Jul 16:25 part-00003-17ad680f-9ea3-4e5b-9147-fcea196092af.csv

Content:
2017-01-01,1,jonh,4,Very nice
2017-01-02,1,karl,3,
2017-01-02,1,adam,5,Super
2017-01-03,2,greg,3,

To find out more about SparkSQL and DataFrames please check the:

You can now play around modifying pieces of the code.

When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt.