Pipe assemblies define what work should be done against a tuple stream, where during runtime tuple streams are read from Tap sources and are written to Tap sinks. Pipe assemblies may have multiple sources and multiple sinks and they can define splits, merges, and joins to manipulate how the tuple streams interact.

There are only five Pipe types: Pipe, Each, GroupBy, CoGroup, Every, and SubAssembly.
The cascading.pipe.Pipe class is used
to name branches of pipe assemblies. These names are used during
planning to bind Taps as either sources or sinks (or as traps, an
advanced topic). It is also the base class for all other pipes
described below.
The cascading.pipe.Each pipe applies
a Function or Filter
Operation to each Tuple that passes through it.
cascading.pipe.GroupBy manages one
input Tuple stream and does exactly as it sounds, that is, groups
the stream on selected fields in the tuple stream.
GroupBy also allows for "merging" of two or
more tuple stream that share the same field names.
cascading.pipe.CoGroup allows for
"joins" on a common set of values, just like a SQL join. The
output tuple stream of CoGroup is the
joined input tuple streams, where a join can be an Inner, Outer,
Left, or Right join.
The cascading.pipe.Every pipe applies
an Aggregator (like count, or sum) or
Buffer (a sliding window) Operation to
every group of Tuples that pass through it.
The cascading.pipe.SubAssembly pipe
allows for nesting reusable pipe assemblies into a Pipe class for
inclusion in a larger pipe assembly. See the section on SubAssemblies.
Pipe assemblies are created by chaining
cascading.pipe.Pipe classes and
Pipe subclasses together. Chaining is
accomplished by passing previous Pipe instances
to the constructor of the next Pipe
instance.
Example 3.1. Chaining Pipes
// the "left hand side" assembly head Pipe lhs = new Pipe( "lhs" ); lhs = new Each( lhs, new SomeFunction() ); lhs = new Each( lhs, new SomeFilter() ); // the "right hand side" assembly head Pipe rhs = new Pipe( "rhs" ); rhs = new Each( rhs, new SomeFunction() ); // joins the lhs and rhs Pipe join = new CoGroup( lhs, rhs ); join = new Every( join, new SomeAggregator() ); join = new GroupBy( join ); join = new Every( join, new SomeAggregator() ); // the tail of the assembly join = new Each( join, new SomeFunction() );
The above example, if visualized, would look like the diagram below.

Besides defining the paths tuple streams take through splits, merges, grouping, and joining, pipe assemblies transform or filter the stored values in each Tuple. This is accomplished by applying an Operation to each Tuple or group of Tuples as the tuple stream passes through the pipe assembly. To do that, the values in the Tuple typically are given field names, in the same fashion columns are named in a database so that they can be referenced or selected.
Operations
(cascading.operation.Operation) accept an
input argument Tuple, and output zero or more result Tuples.
There are a few sub-types of operations defined below. Cascading
has a number of generic Operations that can be reused, or
developers can create their own custom Operations.
In Cascading, we call each record of data a Tuple
(cascading.tuple.Tuple), and a series of
Tuples are a tuple stream. Think of a Tuple as an Array of
values where each value can be any
java.lang.Comparable Java type.
Fields (cascading.tuple.Fields)
either declare the field names in a Tuple. Or reference values
in a Tuple as a selector. Fields can either be string names
("first_name"), integer positions (-1 for the last
value), or a substitution ( Fields.ALL to select
all values in the Tuple, like an asterisk (*) in
SQL).
The Each and Every
pipe types are the only pipes that can be used to apply Operations to
the tuple stream.
The Each pipe applies an Operation to
"each" Tuple as it passes through the pipe assembly. The
Every pipe applies an Operation to "every"
group of Tuples as they pass through the pipe assembly, on the tail
end of a GroupBy or
CoGroup pipe.
new Each( previousPipe, argumentSelector, operation, outputSelector )
new Every( previousPipe, argumentSelector, operation, outputSelector )
Both the Each and
Every pipe take a Pipe instance, an argument
selector, Operation instance, and a output selector on the
constructor. Where each selector is a Fields instance.
The Each pipe may only apply
Functions and Filters to
the tuple stream as these operations may only operate on one Tuple at
a time.
The Every pipe may only apply
Aggregators and Buffers
to the tuple stream as these operations may only operate on groups of
tuples, one grouping at a time.

The argument selector selects values from the input Tuple to be
passed to the Operation as argument values. The output selector
selects the output Tuple from an "appended" version of the input Tuple
and the Operation result Tuple. The output Tuple becomes the input
Tuple to the next pipe in the pipe assembly. If a
Function emits more than one Tuple, this
process will be repeated for each result Tuple against the original
input Tuple, depending on the output selector, input Tuple values
could be duplicated across each output Tuple.

If the argument selector is not given, the whole input Tuple
(Fields.ALL) is passed to the Operation as argument
values. If the result selector is not given on an
Each pipe, the Operation results are returned
by default (Fields.RESULTS), replacing the input Tuple
values in the tuple stream. This really only applies to
Functions, as Filters
either discard the input Tuple, or return the input Tuple intact.
There is no opportunity to provide an output selector.

For the Every pipe, the Aggregator
results are appended to the input Tuple (Fields.ALL) by
default.
It is important to note that the Every
pipe associates Aggregator results with the current group Tuple. For
example, if you are grouping on the field "department" and counting
the number of "names" grouped by that department, the output Fields
would be ["department","num_employees"]. This is true for both
Aggregator, seen above, and
Buffer.
If you were also adding up the salaries associated with each
"name" in each "department", the output Fields would be
["department","num_employees","total_salaries"]. This is only true for
chains of Aggregator Operations, you may not
chain Buffer Operations.

For the Every pipe when used with a
Buffer the behavior is slightly different.
Instead of associating the Buffer results with the current grouing
Tuple, they are associated with the current values Tuple, just like an
Each pipe does with a
Function. This might be slightly more
confusing, but provides much more flexibility.
The GroupBy and
CoGroup pipes serve two roles. First, they emit
sorted grouped tuple streams allowing for Operations to be applied to
sets of related Tuple instances. Where "sorted" means the tuple groups
are emitted from the GroupBy and
CoGroup pipes in sort order of the field values
the groups were grouped on.
Second, they allow for two streams to be either merged or joined. Where merging allows for two or more tuple streams originating from different sources to be treated as a single stream. And joining allows two or more streams to be "joined" (in the SQL sense) on a common key or set of Tuple values in a Tuple.
It is not required that an Every follow
either GroupBy or CoGroup, an
Each may follow immediately after. But an
Every many not follow an
Each.
It is important to note, for both GroupBy
and CoGroup, the values being grouped on must
be the same type. If your application attempts to
GroupBy on the field "size", but the value
alternates between a String and a
Long, Hadoop will fail internally attempting to
apply a Java Comparator to them. This also
holds true for the sort-by fields in
GroupBy.
GroupBy accepts one or more tuple
streams. If two or more, they must all have the same field
names.
Example 3.2. Grouping a Tuple Stream
Pipe merge = new GroupBy( assembly, new Fields( "group1", "group2" ) );
The example above simply creates a new tuple stream where Tuples
with the same values in "group1" and "group2" can be processed as a
set by an Aggregator or
Buffer Operation. The resulting stream of
tuples will be sorted by the values in "group1" and "group2".
Example 3.3. Merging a Tuple Stream
Pipe[] pipes = Pipe.pipes( lhs, rhs ); Pipe merge = new GroupBy( pipes, new Fields( "group1", "group2" ) );
This example merges two streams ("lhs" and "rhs") into one tuple stream and groups the resulting stream on the fields "group1" and "group2", in the same fashion as the previous example.
CoGroup accepts two or more tuple streams and does not require any common field names. The grouping fields must be provided for each tuple stream.
Example 3.4. Joining a Tuple Stream
Fields lhsFields = new Fields( "fieldA", "fieldB" ); Fields rhsFields = new Fields( "fieldC", "fieldD" ); Pipe merge = new CoGroup( lhs, lhsFields, rhs, rhsFields, new InnerJoin() );
This example joins two streams ("lhs" and "rhs") on common values. Note that common field names are not required here. Actually, if there were any common field names, the Cascading planner would throw an error as duplicate field names are not allowed.
This is significant because of the nature of joining streams.
The first stage of joining has to do with identifying field names that represent the grouping key for a given stream. The second stage is emitting a new Tuple with the joined values, this includes the grouping values, and the other values.

In the above example, we see what "logically" happens during a
join. Here we join two streams on the "url" field which happens to be
common to both streams. The result is simply two Tuple instances with
the same "url" appended together into a new Tuple. In practice this
would fail since the result Tuple has duplicate field names. The
CoGroup pipe has the
declaredFields argument allowing the developer
to declare new unique field names for the resulting tuple.
Example 3.5. Joining a Tuple Stream with Duplicate Fields
Fields common = new Fields( "url" ); Fields declared = new Fields( "url1", "word", "wd_count", "url2", "sentence", "snt_count" ); Pipe merge = new CoGroup( lhs, common, rhs, common, declared, new InnerJoin() );

Here we see an example of what the developer could have named the fields so the planner would not fail.
It is important to note that Cascading could just magically create a new Tuple by removing the duplicate grouping fields names so the user isn't left renaming them. In the above example, the duplicate "url"columns could be collapsed into one, as they are the same value. This is not done because field names are a user convenience, the primary mechanism to manipulate Tuples is through positions, not field names. So the result of every Pipe (Each, Every, CoGroup, GroupBy) needs to be deterministic. This gives Cascading a big performance boost, provides a means for sub-assemblies to be built without coupling to any "domain level" concepts (like "first_name", or "url), and allows for higher level abstractions to be built on-top of Cascading simply.
In the example above, we explicitly set a Joiner class to join
our data. The reason CoGroup is named "CoGroup"
and not "Join" is because joining data is done after all the parallel
streams are co-grouped by their common keys. The details are not
terribly important, but note that a "bag" of data for every input
tuple stream must be created before an join operation can be
performed. Each bag consists of all the Tuple instances associated
with a given grouping Tuple.

Above we see two bags, one for each tuple stream ("lhs" and
"rhs"). Each Tuple in bag is independent but all Tuples in both bags
have the same "url" value since we are grouping on "url", from the
previous example. A Joiner will match up every Tuple on the "lhs" with
a Tuple on the "rhs". An InnerJoin is the most common. This is where
each Tuple on the "lhs" is matched with every Tuple on the "rhs". This
is the default behaviour one would see in SQL when doing a join. If
one of the bags was empty, no Tuples would be joined. An OuterJoin
allows for either bag to be empty, and if that is the case, a Tuple
full of null values would be substituted.

Above we see all supported Joiner types.
LHS = [0,a] [1,b] [2,c] RHS = [0,A] [2,C] [3,D]Using the above simple data sets, we will define each join type where the values are joined on the first position, a numeric value. Note when Cascading joins Tuples, the resulting Tuple will contain all the incoming values. The duplicate common key(s) is not discarded if given. And on outer joins, where there is no equivalent key in the alternate stream,
null
values are used as placeholders.An Inner join will only return a joined Tuple if neither bag has is empty.
[0,a,0,A] [2,c,2,C]
An Outer join will join if either the left or right bag is empty.
[0,a,0,A] [1,b,null,null] [2,c,2,C] [null,null,3,D]
A Left join can also be stated as a Left Inner and Right Outer join, where it is fine if the right bag is empty.
[0,a,0,A] [1,b,null,null] [2,c,2,C]
A Right join can also be stated as a Left Outer and Right Inner join, where it is fine if the left bag is empty.
[0,a,0,A] [2,c,2,C] [null,null,3,D]
A Mixed join is where 3 or more tuple streams are
joined, and each pair must be joined differently. See the
cascading.pipe.cogroup.MixedJoin class
for more details.
A custom join is where the developer subclasses the
cascading.pipe.cogroup.Joiner
class.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.