Cook Book

Some common patterns.

Copy a Tuple

Tuple orig = new Tuple();
orig.add( "some value" );
// copy constructor
Tuple copy = new Tuple( orig );

Nest a Tuple in a Tuple

Tuple parent = new Tuple();
Tuple child = new Tuple( "value1", "value2" );
parent.add( child );

Split a stream and name the branches

pipe = new Each( pipe, new SomeFunction() )
Pipe lhs = new Pipe( "lhs", pipe );
Pipe rhs = new Pipe( "rhs", pipe );

Copy a value

pipe = new Each( pipe, new Fields( "field" ), new Identity( new Fields( "copy" ) ), 
  Fields.ALL );

Rename a field

// two incoming fields: "other" and "field"
pipe = new Each( pipe, new Fields( "field" ), new Identity( new Fields( "renamed" ) ), 
  new Fields( "other", "renamed" ) );

Convert strings to primitive types

Class[] types = new Class[] {Long.class, Float.class, Boolean.class};
pipe = new Each( pipe, new Fields( "longField", "floatField", "booleanField" ), 
  new Identity( types ) );

Reorder fields

// two incoming fields: "field1" and "field2"
pipe = new Each( pipe, new Fields( "field2", "field1" ), new Identity() );

Discard unused fields

// two incoming fields: "field1" and "field2"
pipe = new Each( pipe, new Fields( "field2" ), new Identity() );

Sort values passed to Aggregator

pipe = new GroupBy( pipe, new Fields( "group1", "group2" ), 
  new Fields( "value" ) );

Insert constant values into a value stream

pipe = new Each( pipe, new Insert( new Fields( "field1", "field2" ), 
  "value1", "value2" ), Fields.ALL );

Create 'timestamp' from date/time fields

// build 'datetime' string.
FieldFormatter formatter = new FieldFormatter( new Fields( "datetime" ), 
  "%s:%s:%s:%s:%s:%s.%s" );
Fields timeFields = new Fields( "year", "month", "day", "hour", "minute", 
  "second", "millisecond" );
pipe = new Each( pipe, timeFields, formatter, Fields.ALL );

pipe = new Each( pipe, new Fields( "datetime" ), 
  new DateParser( new Fields( "ts" ), "yyyy:MM:dd:HH:mm:ss.SSS" ), Fields.ALL );

Create date/time string from 'timestamp' field

DateFormatter timeFormatter = new DateFormatter( 
  new Fields( "datetime" ), "HH:mm:ss.SSS" );
// pass field "ts" into the timeFormatter function, which returns a "datetime" field
pipe = new Each( pipe, new Fields( "ts" ), timeFormatter, Fields.ALL );

Get 'DISTINCT' (unique) values from a Tuple stream

// group on all values
pipe = new GroupBy( pipe, Fields.ALL );
// only take the first tuple in the grouping, ignore the rest
pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );

Merge two streams with the same column names

// merge streams, group on 'last' field name
// lhs = 'first', 'last', 'middle'
// rhs = 'first', 'last', 'middle'
pipe = new GroupBy( lhs, rhs, new Fields("last") );

Passing properties to an Operation

// set propery on Flow
Properties props = new Properties();
props.set( "key", "value");
FlowConnector conn = new FlowConnector( props );
....
// get property from within an Operation (Function,Filter,etc)
String value = flowProcess.getProperty( "key" );

Passing a JobConf to a Flow and Operation

// set JobConf on Flow
JobConf jobConf = new JobConf();
jobConf.set( "key", "value" );
Properties props = new Properties();
MultiMapReducePlanner.setJobConf( properties, jobConf );
FlowConnector conn = new FlowConnector( props );
....
// get property from within an Operation (Function,Filter,etc)
String value = flowProcess.getProperty( "key" );
// or
JobConf jobConf = ((HadoopFlowProcess) flowProcess).getJobConf();