Cascading for the Impatient, Part 1

The lesson today is how to write a simple Cascading 2.0 app. The goal is clear and concise: create the simplest application possible in Cascading, while following best practices. No bangs, no whistles, just good solid code.

https://github.com/Cascading/Impatient/tree/master/part1

Here’s a brief Java program, about a dozen lines long. It copies lines of text from file “A” to file “B”. It uses 1 mapper in Apache Hadoop. No reducer needed. A conceptual diagram for this implementation in Cascading is shown as:

Certainly this same work could be performed in much quicker ways, such as using cp on Linux. However this Cascading example is merely a starting point. We’ll build on this example, adding new pieces of code to explore features and strengths of Cascading. We’ll keep building until we have a MapReduce implementation of TF-IDF for scoring the relative “importance” of keywords in a set of documents. In other words, Text Mining 101. What you might find when you peek inside Lucene for example, or some other text indexing framework. Moreover, we’ll show how to use TDD features of Cascading, to build robust MapReduce apps for scale.

Source

Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:

git clone git://github.com/Cascading/Impatient.git

For quick reference, the source code, input data, and a log for this example are listed in a gist.

First, we create a source tap to specify the input data. That data happens to be formatted as tab-separated values (TSV) with a header row:

String inPath = args[ 0 ];
Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );

Next we create a sink tap to specify the output data, which is also TSV:

String outPath = args[ 1 ];
Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );

Then we create a pipe to connect the taps:

Pipe copyPipe = new Pipe( "copy" );

Here comes the fun part. Get your tool belt ready, because we need to do a little plumbing… Connect the taps and pipes into a flow:

FlowDef flowDef = FlowDef.flowDef()
.addSource( copyPipe, inTap )
.addTailSink( copyPipe, outTap );

The notion of a workflow lives at the heart of Cascading. Instead of thinking in terms of mapper and reducer steps in a MapReduce job, we prefer to think about apps. Real-world apps tend to use lots of job steps. Those are connected and have dependencies, which are typically specified by a directed acyclic graph (DAG). Cascading uses FlowDef objects to define how a MapReduce app — a.k.a., a DAG of MapReduce job steps — must be connected.

Now that we have a flow defined, the last line of code runs it:

flowConnector.connect( flowDef ).complete();

Place those source lines all into a Main method, then build a JAR file. You should be good to go.

If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.

Build

The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/rain

Notice how those command line arguments align with args[] in the source. The file data/rain.txt gets copied, TSV row by TSV row. Output text gets stored in the partition file output/rain which you can then verify:

more output/rain/part-00000

Again, here’s a log file from our run of the sample app. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Plenty of experienced Cascading users are discussing taps and pipes and flows there, and eager to help. Or visit one of our user group meetings. [Coming up soon...]

Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.

For those who are familiar with Apache Pig, we have included a comparable script:

copyPipe = LOAD '$inPath' USING PigStorage('\t', 'tagsource');
STORE copyPipe INTO '$outPath' using PigStorage('\t', 'tagsource');

To run that, use:

rm -rf output
pig -p inPath=./data/rain.txt -p outPath=./output/rain ./src/scripts/copy.pig

That’s it in a nutshell, our simplest app possible in Cascading. Not quite a “Hello World”, but more like a “Hi there, bus stop”. Or something. Stay tuned for the next installments of our Cascading for the Impatient series.