Cascading for the Impatient, Part 6

In our fifth installment of this series we showed how to implement TF-IDF in Cascading application. If you haven’t read that yet, it’s probably best to start there.

Today’s post extends the TF-IDF app to show best practices for test-driven development (TDD) at scale. We’ll incorporate unit tests into the build (should have done so sooner), plus show how to leverage TDD features which are unique to Cascading: checkpoints, traps, assertions, etc. These features are based on using Checkpoint, Debug, and AssertMatches.

We’ll keep building on this example to show how to leverage “local mode”.

Theory

At first glance, the notion of test-driven development (TDD) might seem a bit antithetical in the context of Big Data. After all, TDD is all about short development cycles, writing automated test cases which are intended to fail, and lots of refactoring. Those descriptions would not appear to fit with batch jobs involving terabytes of data and huge clusters running apps that take days to complete.

Stated in a different way, according to Kent Beck, TDD “encourages simple designs and inspires confidence.” That statement does actually fit well with Cascading. The API is intended to provide simple design patterns for working with data – GroupBy, Join, Count, Regex, Filter – so that the need for writing custom functions becomes relatively rare. That speaks to “encouraging simple designs” directly. The practice in Cascading of modeling business process and orchestrating MapReduce workflows – that speaks to “inspiring confidence” in a big way.

So now we’ll let the cat out of the bag for a little secret… Working with unstructured data at scale has been shown to be quite valuable (Google, Amazon, LinkedIn, Twitter, etc.) however most of the “heavy lifting” which we perform in MapReduce workflows is essentially cleaning up data. DJ Patil explained this point quite eloquently in Data Jujitsu: “It’s impossible to overstress this: 80% of the work in any data project is in cleaning the data … Work done up front in getting clean data will be amply repaid over the course of the project.”

Cleaning up the data allows for subsequent use of sampling techniques, dimensional reduction, and other practices which help alleviate some of the bottlenecks which might otherwise be encountered in Big Data. In other words, there are great use cases for formalisms which help demonstrate that “dirty” data at scale has been cleaned up. Those turn out to be quite valuable in practice.

However, TDD practices tend to be based on unit tests or mocks … how does one write a quick unit test for a Godzilla-sized dataset?

The short answer is: you don’t. However, you can greatly reduce the need for writing unit test coverage by limiting the amount of custom code required. Hopefully we’ve shown that aspect of Cascading by now. Beyond that aspect, you can use sampling techniques to quantify the confidence for an app running correctly. You can also define system tests at scale in relatively simple ways. Furthermore, you can define contingencies for what to do when assumptions fail … as they inevitably do, at scale.

Let’s discuss sampling… generally speaking, large MapReduce workflows are relatively opaque processes which are difficult to observe. However, Cascading provides two techniques for observing portions of a workflow. One very simple approach is to insert a Debug into a pipe, to see the tuple values passing through a particular part of a workflow. Debug output goes to the log instead of a file, but it can be turned off, e.g., with a command line option. If the data is large, one can use a Sample filter to sample the tuple values which get written to the log.

Another approach is to use a Checkpoint, which forces intermediate data to be written out to HDFS. This may also become important for performance reasons, i.e., forcing results to disk to avoid recomputing – e.g., when there are multiple uses for the output of a pipe downstream such as with the right side of a HashJoin. Sampling may be performed either before (like with Debug) or after the data gets persisted to HDFS.

Next, let’s talk about system tests. Cascading include support for stream assertions. These provide mechanisms for asserting that the values in a tuple stream meet certain criteria – similar to the assert keyword in Java, or an assert not null in a unit test. We can assert patterns strictly as unit tests during development, then run testing against regression data. For performance reasons, we might use command line options to turn off assertions in production. Or keep them, if a use case requires that level of guarantees.

Lastly, what to do when assumptions fail? One lesson of working with data at scale is that the best assumptions will inevitably fail. Unexpected things happen, and 80% of the work will be cleaning up problems. Cascading defines failure traps which capture data that causes an Operation to fail, e.g., throw an Exception. For example, perhaps 99% of the cases in your log files can be rolled up into a set of standard reports… but 1% requires manual review. Great, process the 99% which work and shunt the 1% failure cases into a special file, marked “for manual review”. Keep in mind, however, that traps are intended for handling exceptional cases. If you know in advance how to categorize good vs. bad data, then use a filter instead of a trap.

Meanwhile, a conceptual diagram for this implementation of TF-IDF in Cascading is shown as:

Source

Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:

git clone git://github.com/Cascading/Impatient.git

For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.

Let’s add a unit test and show how that works into this example. In the Gradle build script build.gradle we need to modify the compile task to include JUnit and other testing dependencies:

compile( 'cascading:cascading-hadoop:2.0.1' ) { transitive = true }
testCompile( 'org.apache.hadoop:hadoop-test:1.0.3' )
testCompile( 'junit:junit:4.8.+' )

Then we’ll add a test task:

test {
  include 'impatient/**'
  //makes the standard streams (err and out) visible at console when running tests
  testLogging.showStandardStreams = true
  //listening to test execution events
  beforeTest { descriptor ->
     logger.lifecycle("Running test: " + descriptor)
  }
  onOutput { descriptor, event ->
     logger.lifecycle("Test: " + descriptor + " produced standard out/err: " + event.message )
  }
}

A little restructuring of the source directories is requried – see our GitHub code repo, where it’s all set up property. Then we add a unit test for our custom function to “scrub” tokens, which was created in Part 3. This goes into a new class ScrubTest.java:

public class ScrubTest
  {
  @Test
  public void testMain() throws Exception
    {
    ScrubTest tester = new ScrubTest();
    Fields fieldDeclaration = new Fields( "doc_id", "token" );
    ScrubFunction scrub = new ScrubFunction( fieldDeclaration );
    assertEquals( "Scrub", "foo bar", scrub.scrubText( "FoO BAR  " ) );
    }
  }

This is a particularly good place for a unit test. Scrubbing tokens is a likely point at which edge cases get encountered at scale. In practice, you’d probably want to define even more unit tests.

Next, going back to the Main.java module, let’s add sink taps for writing out trapped data and checkpointed data:

String trapPath = args[ 4 ];
String checkPath = args[ 5 ];
Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
Tap checkTap = new Hfs( new TextDelimited( true, "\t" ), checkPath );

Next we’ll modify the head of the existing pipe assembly for TF-IDF to incorporate a Stream Assertion. We use an AssertMatches to define the expected pattern for input data. Then we apply AssertionLevel.STRICT to force validation of the data:

// use a stream assertion to validate the input data
Pipe docPipe = new Pipe( "token" );
AssertMatches assertMatches = new AssertMatches( "doc\\d+\\s.*" );
docPipe = new Each( docPipe, AssertionLevel.STRICT, assertMatches );

Next we’ll add a Debug and DebugLevel.VERBOSE to the D branch, to trace the tuple values in the flow there:

// example use of a debug, to observe tuple stream; turn off below
dfPipe = new Each( dfPipe, DebugLevel.VERBOSE, new Debug( true ) );

Next we’ll add a Checkpoint after the join of the DF and D branches. That forces the tuples at this point in the workflow to be persisted to HDFS:

// create a checkpoint, to observe the intermediate data in DF stream
Checkpoint idfCheck = new Checkpoint( "checkpoint", idfPipe );
Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfCheck, df_token );

Next we have a relatively more complex set of taps to connect in the FlowDef, to include output data for TDD-related features:

// connect the taps, pipes, traps, checkpoints, etc., into a flow                                                                                         
FlowDef flowDef = FlowDef.flowDef()
 .setName( "tfidf" )
 .addSource( docPipe, docTap )
 .addSource( stopPipe, stopTap )
 .addTailSink( tfidfPipe, tfidfTap )
 .addTailSink( wcPipe, wcTap )
 .addTrap( docPipe, trapTap )
 .addCheckpoint( idfCheck, checkTap );

Last, we’ll specify the verbosity level for the debug trace, and the strictness level for the stream assertion:

// set to DebugLevel.VERBOSE for trace, or DebugLevel.NONE in production
flowDef.setDebugLevel( DebugLevel.VERBOSE );
// set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production
flowDef.setAssertionLevel( AssertionLevel.STRICT );

Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 6 now uses twelve mappers and nine reducers. In other words, we added one mapper as the overhead for gaining lots of test features.

The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the sections which were added since Part 5:

If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.

Build

The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:

gradle ideaModule

To build the sample app from the command line use:

gradle clean jar

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:

rm -rf output
hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf output/trap output/check

The output log should include a warning, based on the stream assertion, which looks like this:

12/08/06 14:15:07 WARN stream.TrapHandler: exception trap on branch: 'token', for fields: [{2}:'doc_id', 'text'] tuple: ['zoink', 'null']
cascading.operation.AssertionException: argument tuple: ['zoink', 'null'] did not match: doc\d+\s.*
    at cascading.operation.assertion.BaseAssertion.throwFail(BaseAssertion.java:107)
    at cascading.operation.assertion.AssertMatches.doAssert(AssertMatches.java:84)
    at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:57)
    at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:33)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
    at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
    at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:124)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)

That is expected behavior. We directed the API to show warning when stream assertions failed. The data which caused this warning will get trapped.

Not too far after that point in the log, there should be debug output which looks like the following:

12/08/06 14:15:46 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[ ['df_count', 'df_token', 'lhs_join']]"][DF/93669/]
['df_count', 'df_token', 'lhs_join']
['1', 'air', '1']
['3', 'area', '1']
['1', 'australia', '1']
['1', 'broken', '1']

… plus several more lines. That is the result of our debug trace.

Output text gets stored in the partition file output/tfidf which you can then verify:

more output/tfidf/part-00000
more output/trap/part-m-00001-00000 
more output/check/part-00000

Notice the data tuple output/trap:

zoink   null

That did not match the regex doc\\d+\\s.* which was specified by the stream assertion.

Here’s a log file from our run of the sample app, part 6. If your run looks terribly different, something is probably not set up correctly.

To run this same app on the Amazon AWS Elastic MapReduce service, based on their command line interface, use the following commands. Be sure to replace temp.cascading.org with your own S3 bucket name:

s3cmd put build/libs/impatient.jar s3://temp.cascading.org/impatient/part6.jar
s3cmd put data/rain.txt s3://temp.cascading.org/impatient/
s3cmd put data/en.stop s3://temp.cascading.org/impatient/

elastic-mapreduce --create --name "TF-IDF" \
  --jar s3n://temp.cascading.org/impatient/part6.jar \
  --arg s3n://temp.cascading.org/impatient/rain.txt \
  --arg s3n://temp.cascading.org/impatient/out/wc \
  --arg s3n://temp.cascading.org/impatient/en.stop \
  --arg s3n://temp.cascading.org/impatient/out/tfidf \
  --arg s3n://temp.cascading.org/impatient/out/trap \
  --arg s3n://temp.cascading.org/impatient/out/check

Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]

Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.

Stay tuned for the next installments of our Cascading for the Impatient series.