Cascading for the Impatient, Part 3

In our second installment of this series we showed how to implement Word Count as a Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.

Today’s lesson takes the same app and stretches it even more. We’ll show how to write a custom Operation. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

This example in Part 3 uses a custom operation in Cascading to “scrub” the token stream, prior to counting the tokens. Previously we used a RegexSplitGenerator to tokenize the text, which is a built-in operation and works pretty well. However, one thing you’ll find in working with most any text analytics at scale is that there are lots and lots of edge cases. Cleaning up the edge cases generally represents the bulk of the engineering work in text analytics. Hyphens, exponents, different kinds of quotes, etc. If you try to incorporate every possible edge case into a regex, that tends to becomes complex and brittle. Identifying edge cases is an iterative process, based on learnings over time, based on operations at scale. Also, each application will tend to have its own nuances, which makes it difficult to leverage standard libraries for text processing.

So we subclass the Operation class in Cascading and work through all the edge cases, adding more as they get identified. An added benefit is that we can also add unit tests to our custom class, with test coverage increasing as each new issue gets found. More about unit tests later.

Operations get used in Cascading to perform the “T” part of ETL. In other words, operations act on the data to transform the tuple stream, filter it, analyze it, etc. Think what roles command line utilities such as grep or awk perform in Unix shell scripts. Cascading provides a rich library of standard operations, to encode the “business logic” of transforming “Big Data”. It’s relatively simple to develop your own custom operations, and our text “scrubbing” example here shows a good use case. However, if you find yourself starting to develop lots of custom operations every time you begin to write a Cascading app, that’s a good indication that you need to step back and reevaluate. On one hand, the standard operations have been developed over the years and they tend to cover a large class of MapReduce applications. On the other hand, if you aren’t careful while defining a custom operation, you may inadvertently introduce a bottleneck. The standard operations encapsulate best practices and design patterns for parallelism. Something to think about.

Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:

Source

Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:

git clone git://github.com/Cascading/Impatient.git

For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code

First, let’s make room for using our custom operation ScrubFunction to clean up the token stream. We place it into the docPipe assembly, immediately after the regex which splits the raw text:

Fields scrubArguments = new Fields( "doc_id", "token" );
docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );

Next, we need to define a constructor for our custom operation:

public ScrubFunction( Fields fieldDeclaration )
  {
  super( 2, fieldDeclaration );
  }

The fieldDeclaration parameter allows us to name the tuple fields from the flow. Since we’ll output the results as TSV, a header will be created from the tuple fields.

Next, we define an operate method. In other words, we define the function which operates on the tuple stream. This is essentially a wrapper which pulls tuples from the input stream, applies our scrubText method to each token, then inserts new tuples into the output stream:

public void operate( FlowProcess flowProcess, FunctionCall functionCall )
  {
  TupleEntry argument = functionCall.getArguments();
  String doc_id = argument.getString( 0 );
  String token = scrubText( argument.getString( 1 ) );

  if( token.length() > 0 )
    {
    Tuple result = new Tuple();
    result.add( doc_id );
    result.add( token );
    functionCall.getOutputCollector().add( result );
    }
  }

Last but not least, we define the scrubText method to clean up tokens. This version is relatively simple, and in practice it would have many more cases. It’s also relatively simple to write unit tests against:

public String scrubText( String text )
  {
  return text.trim().toLowerCase();
  }

Place that first set of source lines all into a Main method, create an additional class for ScrubFunction for the rest of the source shown, then build a JAR file. You should be good to go.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 2:

If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.

Build

The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo – almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc

Output text gets stored in the partition file output/wc which you can then verify:

more output/wc/part-00000

Here’s a log file from our run of the sample app, part 3. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]

Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.

For those familiar with Apache Pig, we have included a comparable script, and to run that:

rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop ./src/scripts/wc.pig

Stay tuned for the next installments of our Cascading for the Impatient series.