%%sh
# preview the file first
head -n 5 data/nsw_temp.csv
Let's create a DataFrame from this file:
# `spark.read` provides methods for reading various data formats
# including `csv()`
airTemperatureDF = spark.read.csv('data/nsw_temp.csv')
airTemperatureDF.printSchema()
By default the file is assumed to not have the header and all the columns are assumed to be strings.
This can however be modified with additional options, for example in our case we want to obtain column names from the first line (header) and also infer the types of the colums from the data.
# the file is actually malformend as it includes multiple header like lines embedded in the data
# we can use `mode` = 'DROPMALFORMED' to ignore these lines
airTemperatureDF = spark.read.csv('data/nsw_temp.csv', inferSchema=True, header=True, mode = 'DROPMALFORMED')
airTemperatureDF.printSchema()
display(airTemperatureDF.limit(5))
Other options include specifying the separator, providing the explicit schema, specifying the NA string and reading a compressed file. For the complete list of options please check the documentation for DataFrameReader.
We will create a DataFrom from the JSON file at: data/tweets.json
, which contains JSON encode tweets (one per line).
%%sh
# Let's preview the file first
head -n 2 data/tweets.json
# load the DataFrame from a JSON file with `spark.read.json()`
tweetsDF = spark.read.json('data/tweets.json')
tweetsDF.printSchema()
display(tweetsDF.limit(3))
Similarly as with the csv
files SparkSQL infers the schema (both the column names and the types) from the data.
SparkSQL can also read data from parquet
and orc
files and extensions are available for additional formats (e.g. avro
)
We can save any DataFrame (original or tranformed) to a csv
file:
from pyspark.sql.functions import col, desc
# create a derived DataFrame
tweetsByLocationDF = tweetsDF.groupBy('ORIGINAL_TWEET_LOCATION').count().sort(desc('count')).limit(10)
display(tweetsByLocationDF)
# Save the derived DataFrame to a CSV file.
# The `mode` parameter specifies the behavior of the write operation when the data already exists.
# By default an exception is thrown, but setting it to 'overwrite' will overwrite the exising data.
tweetsByLocationDF.write.csv('output/tweets_by_location.csv', mode='overwrite', header=True)
%%sh
# preview the output
ls -l output/tweets_by_location.csv
echo
echo "Content:"
head -n 5 output/tweets_by_location.csv/part*
Similarly we can DataFrames in the parquet
format, which is much more efficient both in terms of the data size and the performance of processing. Let's save the NSW Air Temperature Data
in parquet
format.
import re
# we need to rename the colums first because the names of `parquet` columns cannot containt
# spaces or other special characters.
renamedDF = airTemperatureDF.select(*[col(c).alias(re.sub(r'[ ()]','_', c)) for c in airTemperatureDF.columns])
renamedDF.printSchema()
renamedDF.write.parquet('output/nsw_temp.parquet', mode='overwrite')
%%sh
# let's preview the results
ls -lh output/nsw_temp.parquet
echo
# let's check the size of the `parquet` file
echo "'cvs' data size:"
ls -lh data/nsw_temp.csv
parquet
file is compressed with snappy
(or gz
) and it's an order of magnitude smaller than the original csv
file.
For more information on writing DataFrames please check the documentation for DataFrameWriter
You can now play around modifying pieces of the code.
When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt
.