2.2 Structured Data - File Formats

This notebook demonstrates using SparkSQL to read, write various structured and semi-structured data formats and convert data between them.

Reading CSV files

Let's start by loading the NSW Air Temperature data set from data/nsw_temp.csv in the CSV format:

In [1]:
%%sh

# preview the file first
head -n 5 data/nsw_temp.csv
Product code,Bureau of Meteorology station number,Year,Month,Day,Maximum temperature (Degree C),Days of accumulation of maximum temperature,Quality
IDCJAC0010,061087,1965,01,01,25.6,,Y
IDCJAC0010,061087,1965,01,02,32.2,1,Y
IDCJAC0010,061087,1965,01,03,23.1,1,Y
IDCJAC0010,061087,1965,01,04,25.6,1,Y

Let's create a DataFrame from this file:

In [2]:
# `spark.read` provides methods for reading various data formats
# including `csv()`

airTemperatureDF = spark.read.csv('data/nsw_temp.csv')
airTemperatureDF.printSchema()
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)

By default the file is assumed to not have the header and all the columns are assumed to be strings.

This can however be modified with additional options, for example in our case we want to obtain column names from the first line (header) and also infer the types of the colums from the data.

In [3]:
# the file is actually malformend as it includes multiple header like lines embedded in the data
# we can use `mode` = 'DROPMALFORMED' to ignore these lines

airTemperatureDF = spark.read.csv('data/nsw_temp.csv', inferSchema=True, header=True, mode = 'DROPMALFORMED')
airTemperatureDF.printSchema()
root
 |-- Product code: string (nullable = true)
 |-- Bureau of Meteorology station number: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Maximum temperature (Degree C): double (nullable = true)
 |-- Days of accumulation of maximum temperature: integer (nullable = true)
 |-- Quality: string (nullable = true)

In [4]:
display(airTemperatureDF.limit(5))
Product code Bureau of Meteorology station number Year Month Day Maximum temperature (Degree C) Days of accumulation of maximum temperature Quality
0 IDCJAC0010 61087 1965 1 1 25.6 NaN Y
1 IDCJAC0010 61087 1965 1 2 32.2 1.0 Y
2 IDCJAC0010 61087 1965 1 3 23.1 1.0 Y
3 IDCJAC0010 61087 1965 1 4 25.6 1.0 Y
4 IDCJAC0010 61087 1965 1 5 26.7 1.0 Y

Other options include specifying the separator, providing the explicit schema, specifying the NA string and reading a compressed file. For the complete list of options please check the documentation for DataFrameReader.

Working with JSON files

We will create a DataFrom from the JSON file at: data/tweets.json, which contains JSON encode tweets (one per line).

In [5]:
%%sh

# Let's preview the file first

head -n 2 data/tweets.json
{"CAPTURE_ID":2,"CAPTURED_AT":"2014-05-01 00:00:35","ID":461505248142438400,"CREATED_AT":"2014-05-01 00:00:00","TEXT":"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\/\/t.co\/r5apnHxHAK","SCREEN_NAME":"ia_yeah","USER_ID":1100797128,"LANGUAGE":"en","PROFILE_IMAGE_URL":"http:\/\/pbs.twimg.com\/profile_images\/456806985443192832\/hFOsb_G__normal.jpeg","USER_CREATED_AT":"2013-01-18 23:12:49","COUNT":60262,"TIME_ZONE":"Hawaii","UTC_OFFSET":-36000,"FOLLOWERS":298,"FRIENDS":177,"FROM_USER_NAME":"MIDDLE FINGER☠","RETWEET_FLAG":"Y","PROFILE_IMAGE_URL_HTTPS":"https:\/\/pbs.twimg.com\/profile_images\/456806985443192832\/hFOsb_G__normal.jpeg","ORIGINAL_TWEET_ID":461505066588192768,"ORIGINAL_TWEET_LOCATION":"Gold Coast, Australia"}
{"CAPTURE_ID":2,"CAPTURED_AT":"2014-05-01 00:00:35","ID":461505250135142400,"CREATED_AT":"2014-05-01 00:00:00","TEXT":"RT @allisimpson: thank you for surprising me @radiodisney & @codysimpson 😊❤️ #RDMAs #RDMAafterparty #sweet16 http:\/\/t.co\/r5apnHxHAK","SCREEN_NAME":"NinaMustifasari","USER_ID":554329827,"LANGUAGE":"en","PROFILE_IMAGE_URL":"http:\/\/pbs.twimg.com\/profile_images\/460202598784843776\/fVYnO_uN_normal.jpeg","LOCATION":"Not Ireland but Pati,Indonesia","USER_CREATED_AT":"2012-04-15 20:34:55","COUNT":15707,"TIME_ZONE":"Jakarta","UTC_OFFSET":25200,"FOLLOWERS":974,"FRIENDS":415,"FROM_USER_NAME":"Nina Mustifasari","RETWEET_FLAG":"Y","PROFILE_IMAGE_URL_HTTPS":"https:\/\/pbs.twimg.com\/profile_images\/460202598784843776\/fVYnO_uN_normal.jpeg","ORIGINAL_TWEET_ID":461505066588192768,"ORIGINAL_TWEET_LOCATION":"Gold Coast, Australia"}
In [6]:
# load the DataFrame from a JSON file with `spark.read.json()`
tweetsDF = spark.read.json('data/tweets.json')
tweetsDF.printSchema()
root
 |-- CAPTURED_AT: string (nullable = true)
 |-- CAPTURE_ID: long (nullable = true)
 |-- COUNT: long (nullable = true)
 |-- CREATED_AT: string (nullable = true)
 |-- FOLLOWERS: long (nullable = true)
 |-- FRIENDS: long (nullable = true)
 |-- FROM_USER_NAME: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- IN_REPLY_TO_STATUS_ID: long (nullable = true)
 |-- LANGUAGE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ORIGINAL_TWEET_ID: long (nullable = true)
 |-- ORIGINAL_TWEET_LOCATION: string (nullable = true)
 |-- PROFILE_IMAGE_URL: string (nullable = true)
 |-- PROFILE_IMAGE_URL_HTTPS: string (nullable = true)
 |-- RETWEET_FLAG: string (nullable = true)
 |-- SCREEN_NAME: string (nullable = true)
 |-- TEXT: string (nullable = true)
 |-- TIME_ZONE: string (nullable = true)
 |-- TO_USER: string (nullable = true)
 |-- TO_USER_ID: long (nullable = true)
 |-- USER_CREATED_AT: string (nullable = true)
 |-- USER_ID: long (nullable = true)
 |-- UTC_OFFSET: long (nullable = true)

In [7]:
display(tweetsDF.limit(3))
CAPTURED_AT CAPTURE_ID COUNT CREATED_AT FOLLOWERS FRIENDS FROM_USER_NAME ID IN_REPLY_TO_STATUS_ID LANGUAGE ... PROFILE_IMAGE_URL_HTTPS RETWEET_FLAG SCREEN_NAME TEXT TIME_ZONE TO_USER TO_USER_ID USER_CREATED_AT USER_ID UTC_OFFSET
0 2014-05-01 00:00:35 2 60262 2014-05-01 00:00:00 298 177 MIDDLE FINGER☠ 461505248142438400 None en ... https://pbs.twimg.com/profile_images/456806985... Y ia_yeah RT @allisimpson: thank you for surprising me @... Hawaii None None 2013-01-18 23:12:49 1100797128 -36000
1 2014-05-01 00:00:35 2 15707 2014-05-01 00:00:00 974 415 Nina Mustifasari 461505250135142400 None en ... https://pbs.twimg.com/profile_images/460202598... Y NinaMustifasari RT @allisimpson: thank you for surprising me @... Jakarta None None 2012-04-15 20:34:55 554329827 25200
2 2014-05-01 00:00:35 2 22131 2014-05-01 00:00:01 3115 370 Amber (◕‿◕✿) 461505252017975296 None en ... https://pbs.twimg.com/profile_images/460048163... N ambershutup I also really was sour cream Brisbane None None 2011-07-06 15:56:18 330160256 36000

3 rows × 26 columns

Similarly as with the csv files SparkSQL infers the schema (both the column names and the types) from the data.

SparkSQL can also read data from parquet and orc files and extensions are available for additional formats (e.g. avro)

Writing CSV files

We can save any DataFrame (original or tranformed) to a csv file:

In [8]:
from pyspark.sql.functions import col, desc

# create a derived DataFrame
tweetsByLocationDF = tweetsDF.groupBy('ORIGINAL_TWEET_LOCATION').count().sort(desc('count')).limit(10)
display(tweetsByLocationDF)
ORIGINAL_TWEET_LOCATION count
0 None 3489
1 Gold Coast, Australia 699
2 NC 284
3 Brisbane, Australia 131
4 Brisbane 64
5 Nc 34
6 Surfers Paradise 21
7 Emerald City 14
8 Townsville 13
9 The City of Townsville 12
In [9]:
# Save the derived DataFrame to a CSV file.
# The `mode` parameter  specifies the behavior of the write operation when the data already exists.
# By default an exception is thrown, but setting it to 'overwrite' will overwrite the exising data.

tweetsByLocationDF.write.csv('output/tweets_by_location.csv', mode='overwrite', header=True)
In [10]:
%%sh

# preview the output

ls -l output/tweets_by_location.csv

echo
echo "Content:"

head -n 5 output/tweets_by_location.csv/part*
total 8
-rw-r--r--  1 szu004  staff    0 10 Jul 20:36 _SUCCESS
-rw-r--r--  1 szu004  staff  191 10 Jul 20:36 part-00000-e2998e26-2859-4678-b3b1-14c221aee6d4.csv

Content:
ORIGINAL_TWEET_LOCATION,count
,3489
"Gold Coast, Australia",699
NC,284
"Brisbane, Australia",131

Similarly we can DataFrames in the parquet format, which is much more efficient both in terms of the data size and the performance of processing. Let's save the NSW Air Temperature Data in parquet format.

In [11]:
import re

# we need to rename the colums first because the names of `parquet` columns cannot containt
# spaces or other special characters.

renamedDF = airTemperatureDF.select(*[col(c).alias(re.sub(r'[ ()]','_', c)) for c in airTemperatureDF.columns])
renamedDF.printSchema()
renamedDF.write.parquet('output/nsw_temp.parquet', mode='overwrite')
root
 |-- Product_code: string (nullable = true)
 |-- Bureau_of_Meteorology_station_number: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Maximum_temperature__Degree_C_: double (nullable = true)
 |-- Days_of_accumulation_of_maximum_temperature: integer (nullable = true)
 |-- Quality: string (nullable = true)

In [12]:
%%sh

# let's preview the results
ls -lh output/nsw_temp.parquet

echo

# let's check the size of the `parquet` file
echo "'cvs' data size:"

ls -lh data/nsw_temp.csv
total 840
-rw-r--r--  1 szu004  staff     0B 10 Jul 20:37 _SUCCESS
-rw-r--r--  1 szu004  staff   133K 10 Jul 20:37 part-00000-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet
-rw-r--r--  1 szu004  staff   133K 10 Jul 20:37 part-00001-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet
-rw-r--r--  1 szu004  staff   135K 10 Jul 20:37 part-00002-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet
-rw-r--r--  1 szu004  staff    11K 10 Jul 20:37 part-00003-414fca95-21b9-4577-9876-5b6f78c6a350.snappy.parquet

'cvs' data size:
-rw-r--r--  1 szu004  staff    13M  9 Jul 20:00 data/nsw_temp.csv

parquet file is compressed with snappy (or gz) and it's an order of magnitude smaller than the original csv file.

For more information on writing DataFrames please check the documentation for DataFrameWriter

You can now play around modifying pieces of the code.

When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt.