Cascading for the Impatient, Part 4

In our third installment of this series we showed how to write a custom Operation for a Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.

Today’s lesson takes that same Word Count app and expands on it to implement a stop words filter, which is a list of tokens to nix from the stream. We’ll show how to use HashJoin on two pipes, so we can perform that filtering at scale. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

The first question to consider is, why do we want to use a stop words list? After all, the TF-IDF algorithm is supposed to filter out the less significant words anyway. Why would we need to include additional filtering if the TF-IDF is implemented correctly?

Use of a stop words list originated in work by Hans Peter Luhn at IBM Research, during the dawn of computing. The reasons for it are two-fold. On one hand, consider that the most common words in any given natural language are generally not useful for text analytics. For example in English, words such as “the”, “of”, “and” are probably not what you want to search, and probably not interesting for Word Count metrics. They represent the long tail of the token distribution: high frequency, low semantic value. Consequently, they cause the bulk of the processing. Natural languages tend to have on the order of 10^5 words, so the potential size of any stop words list is nicely bounded. Filtering those high-frequency words out of the token stream reduces the amount processing required later in the workflow, dramatically.

On the other hand, you may also want to remove some words explicitly from the token stream. This almost always comes up in practice, especially when working with public discussions such as social network comments. Think about it, what are some of the most common words posted online in comments? Words which are not the most common words in “polite” English? Based on the math for TF-IDF, those would tend to get ranked highest. Do you really want those words to bubble up to the “most significant” positions in your text analytics? In automated systems which leverage unsupervised learning, this can lead to highly embarrassing situations. Caveat machinator.

Next, let’s consider about working with a Joiner in Cascading. We will have two pipes, one for the “scrubbed” token stream and another for the stop words list. We want to filter all instances of tokens from the stop words list out of the token stream. If we weren’t working in MapReduce, a naive approach would simply load the stop words list into a hashtable, then iterate through our token stream to lookup each token in the hashtable and delete it if found. If we were coding in Hadoop directly, a less naive approach would be to put the stop words list into the distributed cache and have a job step which loads it during setup, then rinse/lather/repeat from the naive coding approach described above.

Instead we want to leverage the workflow orchestration in Cascading. One might try to write a custom operation in Cascading, as we did in Part 3 — e.g., a custom Filter. That sounds like extra work, plus also extra code to verify and maintain, when the built-in primatives will to tend to be more efficient anyway.

Cascading provides for joins on pipes, and conceptually a Left Outer Join would solve our requirement to filter stop words. Think of joining the token stream with the stop words list. When the result is non-null, the join has identified a stop word. Discard it.

Understand that there’s a big problem with using joins in MapReduce. Outside of the context of a relational database, arbitrary joins do not work efficiently. Suppose you have N items in one tuple stream and M items in another, and want to join them? In the general case, for an arbitrary join, that requires N x M operations and also introduces a data dependeny, such that the join cannot be performed in parallel. If both N and M are relatively large, say in the millions of tuples, then we’d end up processing 10^12 operations on a single processor — which kind of defeats the purpose, in terms of leveraging MapReduce.

Fortunately, if some of that data is sparse then we can use specific variants of joins to compute efficiently in parallel. Cascading includes a HashJoin which joins two or more tuple streams into a single stream via a Joiner — when all but one tuple stream are small enough to fit into memory. In other words, given some insights about the “shape” of the data, when you have a large data set (non-sparse) you can join with one or more small data sets (sparse) in memory.

A join has a left-hand side (LHS) and a right-hand side (RHS); in Cascading we put the sparser data on the right-hand side. So the HashJoin implements a non-blocking “asymmetrical join” or “replicated join”, where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most sides will.

Recall that stop words lists tend to be bounded at approximately 10^5, which is relatively sparse when compared with an arbitrarily large token stream. In typical “web scale” text analytics use cases for TF-IDF, that might be in the range billions of tokens, i.e., several orders of magnitude larger than our largest possible stop words list. Sounds like a great use case for HashJoin.

A conceptual diagram for this implementation of Word Count in Cascading is shown as:

Source

Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:

git clone git://github.com/Cascading/Impatient.git

For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.

This example in Part 4 uses a HashJoin in Cascading to implement a stop words list, filtering some words out of the token stream prior to counting.

First, let’s add another source tap to read the stop words list as an input data set:

String stopPath = args[ 2 ];
Fields stop = new Fields( "stop" );
Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );

Next we’ll insert another pipe into the assembly, placing tokenPipe between our “scrub” and “count” sections of our workflow. That’s where the HashJoin gets performed, implementing a left join:

// perform a left join to remove stop words, discarding the rows
// which joined with stop words, i.e., were non-null after left join
Pipe stopPipe = new Pipe( "stop" );
Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );

Next we discard the non-null results from the left join, using a RegexFilter:

tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );

Now this new tokenPipe can be fitted back into the wcPipe which we used before. The workflow continues on much the same from there:

Pipe wcPipe = new Pipe( "wc", tokenPipe );

Last, we’ll add the additional source tap to the FlowDef, to include input data for our stop words list:

// connect the taps, pipes, etc., into a flow                                                                                                             
FlowDef flowDef = FlowDef.flowDef()
 .setName( "wc" )
 .addSource( docPipe, docTap )
 .addSource( stopPipe, stopTap )
 .addTailSink( wcPipe, wcTap );

Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 4 still uses one mapper and one reducer.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 3:

If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.

Build

The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop

Output text gets stored in the partition file output/wc which you can then verify:

more output/wc/part-00000

Here’s a log file from our run of the sample app, part 4. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]

Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.

For those familiar with Apache Pig, we have included a comparable script, and to run that:

rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop ./src/scripts/wc.pig

Stay tuned for the next installments of our Cascading for the Impatient series.