A Buffer expects set of argument Tuples in
the same grouping, and may return zero or more result Tuples.
The Buffer is very similar to an
Aggregator except it receives the current
Grouping Tuple and an iterator of all the arguments it expects for every
value Tuple in the current grouping, all on the same method call. This
is very similar to the typical Reducer interface, and is best used for
operations that need greater visibility to the previous and next
elements in the stream. For example, smoothing a series of time-stamps
where there are missing values.
An Buffer may only be used with an
Every pipe, and it may only follow a
GroupBy or CoGroup pipe
type.
To create a custom Buffer, subclass the
class cascading.operation.BaseOperation and implement the
interfacecascading.operation.Buffer. Because
BaseOperation has been subclassed, the operate
method, as defined on the Buffer interface, is the only
method that must be implemented.
Example 5.7. Custom Buffer
public class SomeBuffer extends BaseOperation implements Buffer
{
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
// get the group values for the current grouping
TupleEntry group = bufferCall.getGroup();
// get all the current argument values for this grouping
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
// create a Tuple to hold our result values
Tuple result = new Tuple();
while( arguments.hasNext() )
{
TupleEntry argument = arguments.next();
// insert some values into the result Tuple based on the arguemnts
}
// return the result Tuple
bufferCall.getOutputCollector().add( result );
}
}
Buffer should declare both the number of argument values they expect, and the field names of the Tuple they will return.
Buffers must accept 1 or more values in a Tuple as arguments, by
default they will accept any number ( Operation.ANY) of
values. Cascading will verify the number of arguments selected match the
number of arguments expected.
Buffers may optionally declare the field names they return, by
default Buffers declare
Fields.UNKNOWN.
Both declarations must be done on the constructor, either by
passing default values to the super constructor, or by
accepting the values from the user via a constructor
implementation.
Example 5.8. Average Buffer
public class AverageBuffer extends BaseOperation implements Buffer
{
public AverageBuffer()
{
super( 1, new Fields( "average" ) );
}
public AverageBuffer( Fields fieldDeclaration )
{
super( 1, fieldDeclaration );
}
public void operate( FlowProcess flowProcess, BufferCall bufferCall )
{
// init the count and sum
long count = 0;
long sum = 0;
// get all the current argument values for this grouping
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
while( arguments.hasNext() )
{
count++;
sum += arguments.next().getInteger( 0 );
}
// create a Tuple to hold our result values
Tuple result = new Tuple( sum / count );
// return the result Tuple
bufferCall.getOutputCollector().add( result );
}
}
The example above implements a fully functional buffer that accepts one value in argument Tuple, adds all these argument Tuples in the current grouping, and returns the result divided by the number of argument tuples counted in a new Tuple.
The first constructor assumes a default field name this
Buffer will return, but it is a best practice to
always give the user the option to override the declared field names to
prevent any field name collisions that would cause the planner to
fail.
Note this example is somewhat fabricated, in practice a
Aggregator should be implemented to compute
averages. A Buffer would be better suited for
"running averages" across very large spans, for example.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.