6.3 Failure Traps

A Flow with Failure Traps

Failure Traps are the same as a Tap sink (opposed to a source), except being bound to a particular tail element of the pipe assembly, traps can be bound to intermediate pipe assembly segments, like to a Stream Assertion.

Whenever an operation fails and throws an exception, and there is an associated trap, the offending Tuple will be saved to the resource specified by the trap Tap. This allows the job to continue processing without any data loss.

By design, clusters are hardware fault tolerant. Lose a node, the cluster continues working.

But software fault tolerance is a little different. Failure Traps provide a means for the processing to continue without losing track of the data that caused the fault. For high fidelity applications, this may not be so attractive, but low fidelity applications (like web page indexing) this can dramatically improve processing reliability.

Example 6.7. Setting Traps

// ...some useful pipes here

// name this pipe assembly segment
assembly = new Pipe( "assertions", assembly );

AssertNotNull notNull = new AssertNotNull();
assembly = new Each( assembly, AssertionLevel.STRICT, notNull );

AssertSizeEquals equals = new AssertSizeEquals( 6 );
assembly = new Each( assembly, AssertionLevel.STRICT, equals );

AssertMatchesAll matchesAll = new AssertMatchesAll( "(GET|HEAD|POST)" );
assembly = new Each( assembly, new Fields("method"),
                     AssertionLevel.STRICT, matchesAll );

// ...some more useful pipes here

Map<String,Tap> traps = new HashMap<String,Tap>();

traps.put( "assertions", trap );

FlowConnector flowConnector = new FlowConnector();
Flow flow =
  flowConnector.connect( "log-parser", source, sink, traps, assembly );

In the above example, we bind our trap Tap to the pipe assembly segment named "assertions". Note how we can name branches and segments by using a single Pipe instance and it applies to all subsequent Pipe instances.

[Note]Note

Even though Hadoop 0.19.x supports file appends on HDFS, this functionality has not leaked up to the interfaces that allow Mapper and Reducers to output data. Subsequently, the Cascading planner will throw PlannerExceptions if a trap is bound to a pipe assembly segment that crosses a Map or Reduce boundary. To get around these errors, either insert new Pipe instances that rename the current branch, or give GroupBy or CoGroup explicit names. This is done to prevent subsequent Map or Reduce tasks from writing to the same path after the Tap has been written to and closed in a previous Map or Reduce task.

Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.