The lesson today is how to write a simple Cascading 2.0 app. The goal is clear and concise: create the simplest application possible in Cascading, while following best practices. No bangs, no whistles, just good solid code.
Here’s a brief Java program, about a dozen lines long. It copies lines of text from file “A” to file “B”. It uses 1 mapper in Apache Hadoop. No reducer needed. A conceptual diagram for this implementation in Cascading is shown as:
Certainly this same work could be performed in much quicker ways, such as using
cp on Linux. However this Cascading example is merely a starting point. We’ll build on this example, adding new pieces of code to explore features and strengths of Cascading. We’ll keep building until we have a MapReduce implementation of TF-IDF for scoring the relative “importance” of keywords in a set of documents. In other words, Text Mining 101. What you might find when you peek inside Lucene for example, or some other text indexing framework. Moreover, we’ll show how to use TDD features of Cascading, to build robust MapReduce apps for scale.
Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:
git clone git://github.com/Cascading/Impatient.git
For quick reference, the source code, input data, and a log for this example are listed in a gist.
First, we create a source tap to specify the input data. That data happens to be formatted as tab-separated values (TSV) with a header row:
String inPath = args[ 0 ];
Tap inTap = new Hfs( new TextDelimited( true, "\t" ), inPath );
Next we create a sink tap to specify the output data, which is also TSV:
String outPath = args[ 1 ];
Tap outTap = new Hfs( new TextDelimited( true, "\t" ), outPath );
Then we create a pipe to connect the taps:
Pipe copyPipe = new Pipe( "copy" );
Here comes the fun part. Get your tool belt ready, because we need to do a little plumbing… Connect the taps and pipes into a flow:
FlowDef flowDef = FlowDef.flowDef()
.addSource( copyPipe, inTap )
.addTailSink( copyPipe, outTap );
The notion of a workflow lives at the heart of Cascading. Instead of thinking in terms of mapper and reducer steps in a MapReduce job, we prefer to think about apps. Real-world apps tend to use lots of job steps. Those are connected and have dependencies, which are typically specified by a directed acyclic graph (DAG). Cascading uses FlowDef objects to define how a MapReduce app — a.k.a., a DAG of MapReduce job steps — must be connected.
Now that we have a flow defined, the last line of code runs it:
flowConnector.connect( flowDef ).complete();
Place those source lines all into a
Main method, then build a JAR file. You should be good to go.
To build the sample app from the command line use:
gradle clean jar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:
$ hadoop version
Be sure to set your
HADOOP_HOME environment variable. Then clear the
output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/rain
Notice how those command line arguments align with
args in the source. The file
data/rain.txt gets copied, TSV row by TSV row. Output text gets stored in the partition file
output/rain which you can then verify:
Again, here’s a log file from our run of the sample app. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Plenty of experienced Cascading users are discussing taps and pipes and flows there, and eager to help. Or visit one of our user group meetings. [Coming up soon...]
For those who are familiar with Apache Pig, we have included a comparable script:
copyPipe = LOAD '$inPath' USING PigStorage('\t', 'tagsource');
STORE copyPipe INTO '$outPath' using PigStorage('\t', 'tagsource');
To run that, use:
rm -rf output
pig -p inPath=./data/rain.txt -p outPath=./output/rain ./src/scripts/copy.pig
That’s it in a nutshell, our simplest app possible in Cascading. Not quite a “Hello World”, but more like a “Hi there, bus stop”. Or something. Stay tuned for the next installments of our Cascading for the Impatient series.