An Aggregator expects set of argument
Tuples in the same grouping, and may return zero or more result
Tuples.
An Aggregator may only be used with an
Every pipe, and it may only follow a
GroupBy,CoGroup, or
another Every pipe type.
To create a custom Aggregator, subclass the
class cascading.operation.BaseOperation and implement the
interfacecascading.operation.Aggregator. Because
BaseOperation has been subclassed, the start,
aggregate, and complete methods, as defined on
the Aggregator interface, are the only methods that must be
implemented.
Example 5.5. Custom Aggregator
public class SomeAggregator extends BaseOperation<SomeAggregator.Context>
implements Aggregator<SomeAggregator.Context>
{
public static class Context
{
Object value;
}
public void start( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the group values for the current grouping
TupleEntry group = aggregatorCall.getGroup();
// create a new custom context object
Context context = new Context();
// optionally, populate the context object
// set the context object
aggregatorCall.setContext( context );
}
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// get the current argument values
TupleEntry arguments = aggregatorCall.getArguments();
// get the context for this grouping
Context context = aggregatorCall.getContext();
// update the context object
}
public void complete( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// insert some values into the result Tuple based on the context
// return the result Tuple
aggregatorCall.getOutputCollector().add( result );
}
}
Aggregators should declare both the number of argument values they expect, and the field names of the Tuple they will return.
Aggregators must accept 1 or more values in a Tuple as arguments,
by default they will accept any number ( Operation.ANY) of
values. Cascading will verify the number of arguments selected match the
number of arguments expected.
Aggregators may optionally declare the field names they return, by
default Aggregators declare
Fields.UNKNOWN.
Both declarations must be done on the constructor, either by
passing default values to the super constructor, or by
accepting the values from the user via a constructor
implementation.
Example 5.6. Add Tuples Aggregator
public class AddTuplesAggregator
extends BaseOperation<AddTuplesAggregator.Context>
implements Aggregator<AddTuplesAggregator.Context>
{
public static class Context
{
long value = 0;
}
public AddTuplesAggregator()
{
// expects 1 argument, fail otherwise
super( 1, new Fields( "sum" ) );
}
public AddTuplesAggregator( Fields fieldDeclaration )
{
// expects 1 argument, fail otherwise
super( 1, fieldDeclaration );
}
public void start( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
// set the context object, starting at zero
aggregatorCall.setContext( new Context() );
}
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
TupleEntry arguments = aggregatorCall.getArguments();
Context context = aggregatorCall.getContext();
// add the current argument value to the current sum
context.value += arguments.getInteger( 0 );
}
public void complete( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
// create a Tuple to hold our result values
Tuple result = new Tuple();
// set the sum
result.add( context.value );
// return the result Tuple
aggregatorCall.getOutputCollector().add( result );
}
}
The example above implements a fully functional
Aggregator that accepts one value in the argument
Tuple, adds all these argument Tuples in the current grouping, and
returns the result as a new Tuple.
The first constructor assumes a default field name this
Aggregator will return, but it is a best practice
to always give the user the option to override the declared field names
to prevent any field name collisions that would cause the planner to
fail.
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.