Technical Overview
Here are a few Cascading pipe assembly patterns with description of how they translate into MapReduce jobs. Keep in mind that these abstractions allow for reassembly and that the linkage between these jobs are handled internally.
First a quick review of some core concepts.
There are five core components. The Tuple, Pipe, Tap, Flow, and Cascade. Pipe instances are assembled into assemblies, and connect with Tap instances into a Flow.
Optionally, multiple Flow instances are connected into a Cascade, typically when a given Flow must participate in multiple processes. Cascades act like build or make files, when run they only execute Flows that have stale targets.
A Tap represents a resource like a data file on the local filesystem, on a Hadoop distributed file system, or even on Amazon S3. Note that Taps are both sinks and sources when shared between Flow instances.
There are three subtypes of Pipe; Each, Group, and Every. The Each pipe applies a function or filter to each record that passes through it. The Every pipe applies an aggregate function (like count, or sum) to every group of records that pass through it.
The Group pipe performs the grouping on any set of fields in the Record. Further, Group is subtyped into GroupBy and CoGroup, where GroupBy manages one input, and CoGroup can handle two or more inputs. By default CoGroup performs an inner join.
In Cascading, we call each record a Tuple, and a series of tuples are a tuple stream. Pipes apply functions to each Tuple in the stream. Group groups the Tuples on fields in the Tuple.
In the diagrams below, E, G, and A represent Each, Group, and Every pipe types, respectively. T represents a Tap, either a source or a sink, depending on which end it is.
Consider the following diagram.
It consists of a source and sink Tap at each end of the assembly. An Each is connected to a Group and that is connected to an Every. Fe and Fa are functions given to each pipe. In code, this would look something like this.
// this assembly counts the number of times a given ip address makes a // request from our server Tap source = new Dfs( new TextLine( new Fields( "offset", "line" ) ), inputFileApache ); Pipe pipe = new Pipe( "parse log" ); RegexParser parser = new RegexParser( new Fields( "ip" ), "^[^ ]*" ); pipe = new Each( pipe, new Fields( "line" ), parser, new Fields( "ip" ) ); pipe = new GroupBy( pipe, new Fields( "ip" ) ); Count count = new Count( new Fields( "count" ); pipe = new Every( pipe, count ), new Fields( "ip", "count" ) ); Tap sink = new Dfs( new TextLine(), outputPath + "/count", true ); Flow flow = new FlowConnector( jobConf ).connect( source, sink, pipe );
Again consider the diagram above. All the Tap and Pipe instances are part of the Flow. But during runtime, the Pipe instances are organized into Steps. A Step represents a single MapReduce job. If you are up on your MapReduce, you will recognize that the grouping is done in the reducer, and the values grouped on are emitted by the mapper. Thus we have Stacks, or specifically a Map Stack and a Reduce Stack.
You can see that our Each and Every pipe instances, and thier functions, live in either one or the other stack.
Now consider this diagram.
This assembly is twice as long and has two Group pipe instances. Subsequently it has two Steps, and they in turn have corresponding Stacks. The grayed out Tap is added at runtime to complete the assembly so it can execute.
The new Tap writes and reads from a temporary file that is removed when the job completes. Within a Flow, temporary Tap instances are added between each Step and they are cleaned up after every Flow execution.
Two more diagrams. The first is a split. The second is a join.
This split is interesting because in a single Flow we have two parrallel branches that will execute simultaneously, if possible. And that the first Each is shared between each Step branch.
It is also very important to note that there are two Each instances in the second Step. Cascading will chain multiple Each instances into a single Map Stack, and multiple Every instances into a single Reduce Stack. It can also add any trailing Each instances into the Reduce Stack when necessary to reduce the overal number of MapReduce jobs.
Also interesting is that a join can happen in a single step, though there are two source Tap instances.
For more examples, check out our wiki. Specifically...