1.3 RDD Text Processing

This notebook demonstrates some of the more advanced uses of Spark RDD API for text processing.

We will analyze the text of 'The Art of War' by Sun Tzu trying to find words that the best characterise each of its chapters.

Let's have look at the data first (data/artwar.1b.txt file):

In [1]:
%%sh

echo "BEGINNING:"
head -n 22 'data/artwar.1b.txt' | tail -n 10
echo 
echo "ENDING:"
tail -n 20 'data/artwar.1b.txt' | head -n 12
BEGINNING:

Translated by Lionel Giles

I. Laying Plans

1. Sun Tzu said: The art of war is of vital importance to the State.

2. It is a matter of life and death, a road either to safety or to
ruin. Hence it is a subject of inquiry which can on no account be
neglected. 

ENDING:

27. Hence it is only the enlightened ruler and the wise general who
will use the highest intelligence of the army for purposes of spying
and thereby they achieve great results. Spies are a most important
element in water, because on them depends an army's ability to move.

THE END

----------------------------------------------------------------------

Copyright statement:
The Internet Classics Archive by Daniel C. Stevenson, Web Atomics.

It's a text file with some intro content at the beginnig, the chapters are numbered with Roman numbers the paragraphs with Arabic numbers and there is the 'THE END' text as the end of the last chapter.

Out first task is to split the document by chapters.

As the first step let's find the starting line of each of the chapters:

In [2]:
import re

# returns True if the `line` is the heading (starts with a Roman number followed by the dot)
# or the "THE END" marker
def is_heading(line):
    return re.match(r'^[IVX]+\..*', line) or line.startswith("THE END")


# create an RDD with the lines of the text
aowTextRDD = sc.textFile('data/artwar.1b.txt')

# number each line using `zipWithIndex`
# and select the heading lines togehter with their indexes

chaptersAndBeginnings = aowTextRDD.zipWithIndex() \
    .filter(lambda (line, index): is_heading(line)).collect()
    
display(chaptersAndBeginnings)
[(u'I. Laying Plans', 15),
 (u'II. Waging War', 106),
 (u'III. Attack by Stratagem', 184),
 (u'IV. Tactical Dispositions', 266),
 (u'V. Energy', 337),
 (u'VI. Weak Points and Strong', 426),
 (u'VII. Maneuvering', 562),
 (u'VIII. Variation in Tactics', 692),
 (u'IX. The Army on the March', 748),
 (u'X. Terrain', 908),
 (u'XI. The Nine Situations', 1033),
 (u'XII. The Attack by Fire', 1287),
 (u'XIII. The Use of Spies', 1365),
 (u'THE END', 1467)]
In [3]:
# let's create some useful intermediate variables

chapterBeginnings = [ i for (title,i) in chaptersAndBeginnings]
chapterTitles = [ title for (title,i) in chaptersAndBeginnings[0:-1]]
noOfChapters = len(chapterTitles)

print("Number of chapters: %s" % noOfChapters)
print("Chapter titles: %s" % chapterTitles)
print("Chapter start lines: %s " % chapterBeginnings)
Number of chapters: 13
Chapter titles: [u'I. Laying Plans', u'II. Waging War', u'III. Attack by Stratagem', u'IV. Tactical Dispositions', u'V. Energy', u'VI. Weak Points and Strong', u'VII. Maneuvering', u'VIII. Variation in Tactics', u'IX. The Army on the March', u'X. Terrain', u'XI. The Nine Situations', u'XII. The Attack by Fire', u'XIII. The Use of Spies']
Chapter start lines: [15, 106, 184, 266, 337, 426, 562, 692, 748, 908, 1033, 1287, 1365, 1467] 

Now let's assign each line with its chapter number and filter out the lines before the first and after the last chapter.

We will use the python bisect.bisect() function, which find the insertion point (index) in a sorted list to keep it sorted.

In out case the sorted list is chapterBeginnings from above, and bisect with a line numer will return the 1-based chapter number the line belongs to; 0 for the lines before the first chapter and 14 for the lines after the last one.

In [4]:
import bisect

# we can use sc.broadcast() to publish a large object to all executors and improve the performance

chapterBeginningsBR = sc.broadcast([ i for (title, i) in chaptersAndBeginnings])

# to refer to the value of the broadcasted variable in the computation use the `value` property

linesWithChapterNoRDD = aowTextRDD.zipWithIndex() \
    .filter(lambda (line,index): line and not is_heading(line)) \
    .map(lambda (line, index): (bisect.bisect_left(chapterBeginningsBR.value, index) - 1, line)) \
    .filter(lambda (chapterNo, line): chapterNo >= 0 and chapterNo < noOfChapters) 
    
linesWithChapterNoRDD.take(5)   
Out[4]:
[(0, u'1. Sun Tzu said: The art of war is of vital importance to the State.'),
 (0, u'2. It is a matter of life and death, a road either to safety or to'),
 (0, u'ruin. Hence it is a subject of inquiry which can on no account be'),
 (0, u'neglected. '),
 (0, u'3. The art of war, then, is governed by five constant factors, to')]
In [5]:
import operator
from itertools import islice

# find the top 10 words by count in each chapter

topFrequentWordsByChapterRDD = linesWithChapterNoRDD\
    .flatMap(lambda (chapterNo, line): ((chapterNo, word) for word in re.split('[^a-z\']+', line.lower()) if word)) \
    .map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \
    .reduceByKey(operator.add) \
    .map(lambda ((chapterNo, word), count): (chapterNo, (word, count))) \
    .sortBy(lambda (chapterNo, (word, count)): count, False) \
    .groupByKey() \
    .map(lambda (chapterNo, wordAndCountIterator): (chapterNo, list(islice(wordAndCountIterator, 10))))


# `collectAsMap` can be used to collect a pair RDD to a python `dict`

topFrequentWordsByChapter = topFrequentWordsByChapterRDD.collectAsMap()
print("Result at dict: %s" % topFrequentWordsByChapter.items()[0:2])
Result at dict: [(0, [(u'the', 35), (u'to', 20), (u'and', 19), (u'of', 19), (u'is', 17), (u'be', 12), (u'in', 11), (u'are', 9), (u'a', 8), (u'which', 8)]), (1, [(u'the', 38), (u'of', 26), (u'and', 18), (u'be', 17), (u'to', 16), (u'a', 12), (u'will', 12), (u'in', 11), (u'is', 11), (u'your', 7)])]

And now we can format it nicely:

In [6]:
print("Top frequent words by chapter:")
for chapterNo, title in enumerate(chapterTitles):
    print("%s: %s" % (title, " ".join([ word for word,_ in topFrequentWordsByChapter[chapterNo] ]) ))
Top frequent words by chapter:
I. Laying Plans: the to and of is be in are a which
II. Waging War: the of and be to a will in is your
III. Attack by Stratagem: the to is of in will and army if a
IV. Tactical Dispositions: the of to is a in and victory enemy defeat
V. Energy: the of a to and is in be more it
VI. Weak Points and Strong: the to be and in of he his is can
VII. Maneuvering: the of and to is a in an be army
VIII. Variation in Tactics: the of to be in not which his must and
IX. The Army on the March: the to and is of a in are if that
X. Terrain: the to and is a of are if you in
XI. The Nine Situations: the of to and ground is a on in be
XII. The Attack by Fire: the to is of an fire a in not be
XIII. The Use of Spies: the of and to spies be is a in it

Unfortunatelly the most frequent words in each chapter do not provide good summary because many of them are common in all chapters.

To address this issue we can look at ways to take into account the information of how specific each word is to each chapter.

One of the common approaches in text analysis is to use the statistics of td-idf (Term Frequency - Inverse Document Frequency)

In this approach the term frequency in the document is weighted with the log of inverse of document frequency.

For example in our case:

  • a word that appears in all 13 chapters will have IDF = log(13/13) = 0
  • a word that appears only in one chapter will have the IDF log(13/1) = ~2.56

Let's see if this approach works:

In [7]:
# calculate the chapter counts for each word
chapterCounts = linesWithChapterNoRDD \
    .flatMap(lambda (chapterNo, line): ((word, chapterNo) for word in re.split('[^a-z\']+', line.lower()) if word)) \
    .distinct() \
    .countByKey()

sorted(chapterCounts.items(), key=lambda kv:kv[1], reverse=True)[0:10]
Out[7]:
[(u'army', 13),
 (u'said', 13),
 (u'tzu', 13),
 (u'is', 13),
 (u'in', 13),
 (u'if', 13),
 (u'the', 13),
 (u'for', 13),
 (u'may', 13),
 (u'that', 13)]
In [8]:
from math import log

# calculat (in python) the IDF

idf = dict( (w, log(float(noOfChapters)/c)) for w,c in chapterCounts.items())
sorted(idf.items(), key=lambda kv:kv[1], reverse=True)[0:5]
Out[8]:
[(u'secondly', 2.5649493574615367),
 (u'dynasty', 2.5649493574615367),
 (u'rapid', 2.5649493574615367),
 (u'woods', 2.5649493574615367),
 (u'aides', 2.5649493574615367)]
In [9]:
# find the top 10 words  by IF-IDF in each chapter
idfBR = sc.broadcast(idf)

topIDIDFWordsByChapter = linesWithChapterNoRDD\
    .flatMap(lambda (chapterNo,line): ((chapterNo, word) for word in re.split('[^a-z\']+', line.lower()) if word)) \
    .map(lambda chapterNoAndWord: (chapterNoAndWord, 1)) \
    .reduceByKey(operator.add) \
    .map(lambda ((chapterNo, word), count): (chapterNo, (word, count * idfBR.value[word]))) \
    .sortBy(lambda (chapterNo, (word, tfidf)): tfidf, False) \
    .groupByKey() \
    .map(lambda (chapterNo, wordAndTfidfIterator): (chapterNo, list(islice(wordAndTfidfIterator, 10)))) \
    .collectAsMap()
In [10]:
# format the output nicely
print("Top TD-IDF words by chapter:")
for chapterNo, title in enumerate(chapterTitles):
    print("%s: %s" % (title, " ".join([ word for word,_ in topIDIDFWordsByChapter[chapterNo] ]) ))
Top TD-IDF words by chapter:
I. Laying Plans: calculations counsel moral law seem deliberations determine believe seeking hearkens
II. Waging War: chariots substance thousand wagons people's ardor spent twenty damped tenths
III. Attack by Stratagem: bulwark win cities ignorant entire next besiege months walled destroy
IV. Tactical Dispositions: estimation quantity measurement balancing defeating defeat sign fighter skilled mistakes
V. Energy: energy indirect yet postulates decision combination effected combined simulated direct
VI. Weak Points and Strong: weaken strengthen van water left rear what places hasten succor
VII. Maneuvering: maneuvering lost drums goal banners fall art deviation refrain alone
VIII. Variation in Tactics: leads dangerous faults roads trouble schemes sensitive varying solicitude affect
IX. The Army on the March: get places river dust sent advancing about grass sign shows
X. Terrain: result six gone towards halfway responsible garrisoned unaware regard ground
XI. The Nine Situations: ground would i facile intersecting serious they desperate dispersive contentious
XII. The Attack by Fire: fire burn days season aid stay developments special proper wind
XIII. The Use of Spies: spies spy converted doomed surviving purposes information inward subtle was

This (arguably) is a much better summary.

You can now play around modifying pieces of the code.

When you are done and you are running off the local machine remember to close the notebook with File/Close and Halt.