Cascading for the Impatient, Part 2

In our first installment of this series we showed how to create the simplest possible Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.

Today’s lesson takes the same app and stretches it a bit further. Undoubtedly you’ve seen Word Count before. We’d feel remiss if we did not provide a Word Count example. It’s the “Hello World” of MapReduce apps. Fortunately, this code is one of the basic steps toward developing a TF-IDF implementation. How convenient. We’ll also show how to use Cascading to generate a visualization of your MapReduce app.

Theory

Our example code in Part 1 of this series showed how to move data from point A to point B. It was simply a distributed file copy — loading data via distributed tasks, an instance of the “L” in ETL.

That may seem overly simple, and it may seem like Cascading is overkill for that kind of work. However, moving important data from point A to point B reliably can be a crucial job to perform. This helps illustrate one of the key reasons to use Cascading.

Let’s use an analogy of building a small ferris wheel. With a little bit of imagination and some background in welding, a person could cobble together one using old bicycles parts. In fact, those DIY ferris wheels show up at events such as Maker Faire. Starting out, a person might construct a little ferris wheel, just for demo. It might not hold anything larger than hamsters, but it’s not a hard problem. With a bit more skill, a person could probably build a somewhat larger instance, one that’s big enough for small children to ride.

Let me ask this: how robust would a DIY ferris wheel need to be before you let your kids ride on it? That’s precisely part of the challenge at an event like Maker Faire. Makers must be able to build a device such as a ferris wheel out of spare bicycle parts which is robust enough that strangers will let their kids ride. Let’s hope those welds were made using best practices and good materials, to avoid catastrophes.

That’s a key reason why Cascading was created. When you need to move a few Gb from point A to point B, it’s probably simple enough to write a Bash script, or just use a single command line copy. When your work requires some reshaping of the data, then a few lines of Python will probably work fine. Run that Python code from your Bash script and you’re done. I’ve used that approach many times, when it fit the use case requirements. However, suppose you’re not moving just Gb around? Suppose you’re moving Tb, or Pb? Bash scripts won’t get you very far. Also think about this: suppose your app not only needs to move data from point A to point B, but it must run within the constraints of an enterprise IT shop. Millions of dollars and potentially even some jobs ride on the fact that your app performs correctly. Day in and day out. That’s not unlike strangers trusting a ferris wheel; they want to make sure it wasn’t just built out of spare bicycle parts by some amateur welder. Robustness is key.

Or taking this analogy a few steps in another interesting direction… Perhaps you’re not only moving data and reshaping it a little, but you’re applying some interesting machine learning algorithms, some natural language processing, resequencing genes… who knows. Those imply lots of resource use, lots of potential expense in case of failures. Or lots of customer exposure. You’ll want to use an application framework which is significantly more robust than a bunch of scripts cobbled together.

With Cascading, you can package your entire MapReduce application, including its orchestration and testing, within a single JAR file. You define all of that within the context of one programming language — whether that language may be Java, Scala, Clojure, Python, Ruby, etc. That way your tests are included within a single program, not spread across several scripts written in different languages. Having a single JAR file define the app also allows for typical tooling required in enterprise IT: unit tests, stream assertions, revision control, continuous integration, Maven repos, role-based configuration management, advanced schedulers, monitoring and notifications, etc.

Those are key reasons why we make Cascading, why people use it for robust MapReduce apps which run at scale.

Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:

Source

Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:

git clone git://github.com/Cascading/Impatient.git

For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.

Note that the names of the taps have changed. Instead of inTap and outTap, we’re now using docTap and wcTap. We’ll be adding more taps, so this will help to have more descriptive names. Makes it simpler to follow all the plumbing.

Previously we defined a simple pipe to connect the taps. This example shows a more complex pipe. We use a generator inside of an Each, to split the document text into a token stream. We use a regex to split on word boundaries:

Fields token = new Fields( "token" );
Fields text = new Fields( "text" );
RegexSplitGenerator splitter = new RegexSplitGenerator( token, regex_string );
// only returns "token"
Pipe docPipe = new Each( "token", text, splitter, Fields.RESULTS );

Unfortunately, markdown on the GitHub wiki seems to turn psychotic when it parses that regex string value, even with escapes. See the gist or the src code repo.

Out of that pipe, we’ll get a tuple stream of token to feed into the count. You can change the regex to handle more complex cases of splitting tokens — without having to rewrite a different generator class.

Next, we use a GroupBy to count the occurrences of each token:

Pipe wcPipe = new Pipe( "wc", docPipe );
wcPipe = new GroupBy( wcPipe, token );
wcPipe = new Every( wcPipe, Fields.ALL, new Count(), Fields.ALL );

From that pipe, we’ll have a resulting tuple stream of token and count for the output. So we connect up the plumbing with a FlowDef:

FlowDef flowDef = FlowDef.flowDef().setName( "wc" )
 .addSource( docPipe, docTap )
 .addTailSink( wcPipe, wcTap );

Finally, we generate a DOT file, to depict the Cascading flow graphically. You can load the DOT file into OmniGraffle or Visio. Those diagrams are really helpful for troubleshooting MapReduce workflows in Cascading:

Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();

Place those source lines all into a Main method, then build a JAR file. You should be good to go.

The diagram for the Cascading flow will be in the dot/ subdirectory. Here we have annotated it to show where the mapper and reducer phases are running:

If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.

Build

The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/rain

Notice how those command line arguments align with args[] in the source. The file data/rain.txt gets copied, TSV row by TSV row. Output text gets stored in the partition file output/rain which you can then verify:

more output/rain/part-00000

Again, here’s a log file from our run of the sample app, part 2. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon...]

Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.

For those who are familiar with Apache Pig, we have included a comparable script:

docPipe = LOAD '$docPath' USING PigStorage('\t', 'tagsource') AS (doc_id, text);
docPipe = FILTER docPipe BY doc_id != 'doc_id';
-- specify a regex operation to split the "document" text lines into a token stream
tokenPipe = FOREACH docPipe GENERATE doc_id, FLATTEN(TOKENIZE(text, ' [](),.')) AS token;
tokenPipe = FILTER tokenPipe BY token MATCHES '\\w.*';
-- determine the word counts
tokenGroups = GROUP tokenPipe BY token;
wcPipe = FOREACH tokenGroups GENERATE group AS token, COUNT(tokenPipe) AS count;
-- output
STORE wcPipe INTO '$wcPath' using PigStorage('\t', 'tagsource');
EXPLAIN -out dot/wc_pig.dot -dot wcPipe;

To run that, use:

rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc ./src/scripts/wc.pig

So that’s our Word Count example. Twenty lines of yummy goodness. Stay tuned for the next installments of our Cascading for the Impatient series.