This notebook demonstrates some of the more advanced uses of Spark RDD API for text processing.
We will analyze the text of 'The Art of War' by Sun Tzu trying to find words that the best characterise each of its chapters.
Let's have look at the data first (data/artwar.1b.txt
file):
%%sh
echo "BEGINNING:"
head -n 22 'data/artwar.1b.txt' | tail -n 10
echo
echo "ENDING:"
tail -n 20 'data/artwar.1b.txt' | head -n 12
It's a text file with some intro content at the beginnig, the chapters are numbered with Roman numbers the paragraphs with Arabic numbers and there is the 'THE END' text as the end of the last chapter.
Out first task is to split the document by chapters.
As the first step let's find the starting line of each of the chapters:
import re
# returns True if the `line` is the heading (starts with a Roman number followed by the dot)
# or the "THE END" marker
def is_heading(line):
return re.match(r'^[IVX]+\..*', line) or line.startswith("THE END")
# create an RDD with the lines of the text
aowTextRDD = sc.textFile('data/artwar.1b.txt')
# number each line using `zipWithIndex`
# and select the heading lines togehter with their indexes
chaptersAndBeginnings = aowTextRDD.zipWithIndex() \
.filter(lambda (line, index): is_heading(line)).collect()
display(chaptersAndBeginnings)
# let's create some useful intermediate variables
chapterBeginnings = [ i for (title,i) in chaptersAndBeginnings]
chapterTitles = [ title for (title,i) in chaptersAndBeginnings[0:-1]]
noOfChapters = len(chapterTitles)
print("Number of chapters: %s" % noOfChapters)
print("Chapter titles: %s" % chapterTitles)
print("Chapter start lines: %s " % chapterBeginnings)
Now let's assign each line with its chapter number and filter out the lines before the first and after the last chapter.
We will use the python bisect.bisect()
function, which find the insertion point (index) in a sorted list to keep it sorted.
In out case the sorted list is chapterBeginnings
from above, and bisect
with a line numer will return the 1-based chapter number the line belongs to; 0 for the lines before the first chapter and 14 for the lines after the last one.
import bisect
# we can use sc.broadcast() to publish a large object to all executors and improve the performance
chapterBeginningsBR = sc.broadcast([ i for (title, i) in chaptersAndBeginnings])
# to refer to the value of the broadcasted variable in the computation use the `value` property
linesWithChapterNoRDD = aowTextRDD.zipWithIndex() \
.filter(lambda (line,index): line and not is_heading(line)) \
.map(lambda (line, index): (bisect.bisect_left(chapterBeginningsBR.value, index) - 1, line)) \
.filter(lambda (chapterNo, line): chapterNo >= 0 and chapterNo < noOfChapters)
linesWithChapterNoRDD.take(5)
import operator
from itertools import islice
# find the top 10 words by count in each chapter
topFrequentWordsByChapterRDD = linesWithChapterNoRDD\
.flatMap(lambda (chapterNo, line): ((chapterNo, word) for word in re.split('[^a-z\']+', line.lower()) if word)) \
.map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \
.reduceByKey(operator.add) \
.map(lambda ((chapterNo, word), count): (chapterNo, (word, count))) \
.sortBy(lambda (chapterNo, (word, count)): count, False) \
.groupByKey() \
.map(lambda (chapterNo, wordAndCountIterator): (chapterNo, list(islice(wordAndCountIterator, 10))))
# `collectAsMap` can be used to collect a pair RDD to a python `dict`
topFrequentWordsByChapter = topFrequentWordsByChapterRDD.collectAsMap()
print("Result at dict: %s" % topFrequentWordsByChapter.items()[0:2])
And now we can format it nicely:
print("Top frequent words by chapter:")
for chapterNo, title in enumerate(chapterTitles):
print("%s: %s" % (title, " ".join([ word for word,_ in topFrequentWordsByChapter[chapterNo] ]) ))
Unfortunatelly the most frequent words in each chapter do not provide good summary because many of them are common in all chapters.
To address this issue we can look at ways to take into account the information of how specific each word is to each chapter.
One of the common approaches in text analysis is to use the statistics of td-idf (Term Frequency - Inverse Document Frequency)
In this approach the term frequency in the document is weighted with the log
of inverse of document frequency.
For example in our case:
Let's see if this approach works:
# calculate the chapter counts for each word
chapterCounts = linesWithChapterNoRDD \
.flatMap(lambda (chapterNo, line): ((word, chapterNo) for word in re.split('[^a-z\']+', line.lower()) if word)) \
.distinct() \
.countByKey()
sorted(chapterCounts.items(), key=lambda kv:kv[1], reverse=True)[0:10]
from math import log
# calculat (in python) the IDF
idf = dict( (w, log(float(noOfChapters)/c)) for w,c in chapterCounts.items())
sorted(idf.items(), key=lambda kv:kv[1], reverse=True)[0:5]
# find the top 10 words by IF-IDF in each chapter
idfBR = sc.broadcast(idf)
topIDIDFWordsByChapter = linesWithChapterNoRDD\
.flatMap(lambda (chapterNo,line): ((chapterNo, word) for word in re.split('[^a-z\']+', line.lower()) if word)) \
.map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \
.reduceByKey(operator.add) \
.map(lambda ((chapterNo, word), count): (chapterNo, (word, count * idfBR.value[word]))) \
.sortBy(lambda (chapterNo, (word, tfidf)): tfidf, False) \
.groupByKey() \
.map(lambda (chapterNo, wordAndTfidfIterator): (chapterNo, list(islice(wordAndTfidfIterator, 10)))) \
.collectAsMap()
# format the output nicely
print("Top TD-IDF words by chapter:")
for chapterNo, title in enumerate(chapterTitles):
print("%s: %s" % (title, " ".join([ word for word,_ in topIDIDFWordsByChapter[chapterNo] ]) ))
This (arguably) is a much better summary.
You can now play around modifying pieces of the code.
When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt
.