This notebook demonstrates the basic of processing of structured data with Spark SQL.
Spark SQL is a higer level API for working with structured data. The data are represented in DataFrames
- table like object with columns and rows conceptually similar to panadas
or R
data fames.
spark
is the main entry point for SparkSQL related operations. It is an instance of SparkSession and pyspark automatically creates one for you.
Let's look a some way to create and display DataFrames
:
# import pyspark.sql classes and functions
from pyspark.sql import *
# The most direct way to create a DataFrame is to
# build is from a list of `Row`s
# Create a few `Row`s describing product items
item1 = Row(id=1, name="iPhone 6", price=1000.00)
item2 = Row(id=2, name="iPhone 7", price=1200.00)
item3 = Row(id=2, name="Samsung Galaxy", price=900.00)
# Create a `DataFrame` from the list `Row`
itemsDF = spark.createDataFrame([item1, item2, item3])
# Each `DataFrame` is associate with a `schema` which defines names
# and types of columns in the DataFrame.
# `createDataFrame` by default infers the schema from the provided Rows
# but later we will see how to specify the schema explicitely.
# Let's print out the schema
print("Inferred schema:")
itemsDF.printSchema()
# Display the DataFrame with the `show()` function
itemsDF.show()
We can also define rows with specific types:
# Create a Review Row definition
Review = Row('date', 'product_id', 'user_id', 'rating', 'comment')
# Create some Reviews
review1 = Review('2017-01-01', 1, 'jonh', 4, 'Very nice')
review2 = Review('2017-01-02', 1, 'karl', 3, None)
review3 = Review('2017-01-02', 1, 'adam', 5, 'Super')
review4 = Review('2017-01-03', 2, 'greg', 3, None)
# Create a `DataFrame` from the list Rows with infered schema
reviewsDF = spark.createDataFrame([review1, review2, review3, review4])
# Print out the inferred schema
reviewsDF.printSchema()
If the inferred schema is not satisfactory we can define the explicit one:
from pyspark.sql.types import *
# Define the explicit schema with the column name, types and optionallity
reviewSchema = StructType([
StructField('date', StringType(), True),
StructField('product_id', LongType(), False),
StructField('user_id', StringType(), False),
StructField('rating', IntegerType(), True),
StructField('text', StringType(), True)
])
# Create a `DataFrame` from the list Rows with specified schema
reviewsDF = spark.createDataFrame([review1, review2, review3, review4], schema=reviewSchema)
reviewsDF.printSchema()
# We can use the `display` for a nicely formatted preview of the DataFrame content
display(reviewsDF)
Now let's have a look at some basic operations and way of working with DataFrames
:
# count the rows in the DataFrame
reviewsDF.count()
# We can use transoformation operations to produce new `DataFrames`
# for example use `filter` to include only rows for which column `product_id` has value 1
# and `sort` to oder the results by the `rating` column.
# As was the case with RDDs the transformations are `lazy`.
filteredDF = reviewsDF.filter(reviewsDF.product_id == 1).sort(reviewsDF.rating)
display(filteredDF)
from pyspark.sql.functions import col
# We can also `filter' using complex expressions,
# for example: Reviews with `product_id` == 1 and `user_id` == "jonh"
# (Use `&` instead of `and` and `|` instead of or)
# The are two ways to reference columns in expressions: either using the `col` function
# with the colum name (as below) or by referring to its DataFrame (above)
filteredDF = reviewsDF.filter((col('product_id') == 1) & (col('user_id') == 'jonh'))
display(filteredDF)
from pyspark.sql.functions import desc, log
# We can use `select` choose the columns or to create derrived columns
# and `alias` to rename them.
# Here we create a derrived column `logRating` from the log(`rating`)
selectedDF = reviewsDF \
.select(col('product_id'), col('user_id'), log(col('rating')).alias('logRating')) \
.sort(desc('logRating'))
display(selectedDF)
# We can group the data by a colums (or columns) and compute aggregate statistics for each group
# for example the average `rating` for each `product_id`:
grouppedDF = reviewsDF.groupBy(col('product_id')).avg('rating')
display(grouppedDF)
# We can joing two `DataFrames` together on a common column
# (here the `product_id`)
reviewsWithItemsDF = reviewsDF.join(itemsDF, itemsDF.id == reviewsDF.product_id)
display(reviewsWithItemsDF)
# we can convert (small) SparkSQL `DataFrames` to `pandas` data frames
reviewsWithItemsPD = reviewsWithItemsDF.limit(3).toPandas()
print(reviewsWithItemsPD)
# Finally we can save a `DataFrame` in one of the supported formats,
# for example `csv`
reviewsDF.write.csv('output/reviews.csv', mode='overwrite')
Let's preview the results:
%%sh
ls -l output/reviews.csv
echo
echo "Content:"
cat output/reviews.csv/*.csv
To find out more about SparkSQL and DataFrames
please check the:
You can now play around modifying pieces of the code.
When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt
.