The cascading.operation.Identify function
is used to "shape" a tuple stream. Here are some common patterns.
Here Identity passes its arguments out as results, thanks
to the Fields.ARGS field declaration.
// incoming -> "ip", "time", "method", "event", "status", "size"
Identity identity = new Identity( Fields.ARGS );
pipe = new Each( pipe, new Fields( "ip", "method" ), identity,
Fields.RESULTS );
// outgoing -> "ip", "method"
In practice the field declaration can be left out as
Field.ARGS is the default declaration for the
Identity function. Additionally Fields.RESULTs can
be left off as it is the default for the
Every pipe.
// incoming -> "ip", "time", "method", "event", "status", "size" pipe = new Each( pipe, new Fields( "ip", "method" ), new Identity() ); // outgoing -> "ip", "method"
Here Identity renames the incoming arguments. Since Fields.RESULTS is implied, the incoming Tuple is replaced by the arguments selected and given new field names as declared on Identity.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, new Fields( "ip", "method" ), identity ); // outgoing -> "address", "request"
In the above example, if there were more fields than "ip" and "method", it would work fine, all the extra fields would be discarded. If the same was true for the next example, the planner would fail.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, Fields.ALL, identity ); // outgoing -> "address", "request"
Since Fields.ALL is the default argument
selector for the Each pipe, it can be
left out.
// incoming -> "ip", "method" Identity identity = new Identity( new Fields( "address", "request" ) ); pipe = new Each( pipe, identity ); // outgoing -> "address", "request"
Here we rename a single field, but return it along with an input Tuple field as the result.
// incoming -> "ip", "time", "method", "event", "status", "size" Fields fieldSelector = new Fields( "address", "method" ); Identity identity = new Identity( new Fields( "address" ) ); pipe = new Each( pipe, new Fields( "ip" ), identity, fieldSelector ); // outgoing -> "address", "method"
Here we replace the Tuple String values "status" and
"size" with int and
long, respectively.
// incoming -> "ip", "time", "method", "event", "status", "size" Identity identity = new Identity( Integer.TYPE, Long.TYPE ); pipe = new Each( pipe, new Fields( "status", "size" ), identity ); // outgoing -> "status", "size"
Or we can replace just the Tuple String value "status"
with int while keeping all the other
values in the output Tuple.
// incoming -> "ip", "time", "method", "event", "status", "size"
Identity identity = new Identity( Integer.TYPE );
pipe = new Each( pipe, new Fields( "status" ), identity,
Fields.REPLACE );
// outgoing -> "ip", "time", "method", "event", "status", "size"
Copyright © 2007-2008 Concurrent, Inc. All Rights Reserved.