Cook Book
Some common patterns.
Copy a Tuple
Tuple orig = new Tuple(); orig.add( "some value" ); // copy constructor Tuple copy = new Tuple( orig );
Nest a Tuple in a Tuple
Tuple parent = new Tuple(); Tuple child = new Tuple( "value1", "value2" ); parent.add( child );
Split a stream and name the branches
pipe = new Each( pipe, new SomeFunction() ) Pipe lhs = new Pipe( "lhs", pipe ); Pipe rhs = new Pipe( "rhs", pipe );
Copy a value
pipe = new Each( pipe, new Fields( "field" ), new Identity( new Fields( "copy" ) ), Fields.ALL );
Rename a field
// two incoming fields: "other" and "field" pipe = new Each( pipe, new Fields( "field" ), new Identity( new Fields( "renamed" ) ), new Fields( "other", "renamed" ) );
Convert strings to primitive types
Class[] types = new Class[] {Long.class, Float.class, Boolean.class};
pipe = new Each( pipe, new Fields( "longField", "floatField", "booleanField" ),
new Identity( types ) );
Reorder fields
// two incoming fields: "field1" and "field2" pipe = new Each( pipe, new Fields( "field2", "field1" ), new Identity() );
Discard unused fields
// two incoming fields: "field1" and "field2" pipe = new Each( pipe, new Fields( "field2" ), new Identity() );
Sort values passed to Aggregator
pipe = new GroupBy( pipe, new Fields( "group1", "group2" ), new Fields( "value" ) );
Insert constant values into a value stream
pipe = new Each( pipe, new Insert( new Fields( "field1", "field2" ), "value1", "value2" ), Fields.ALL );
Create 'timestamp' from date/time fields
// build 'datetime' string. FieldFormatter formatter = new FieldFormatter( new Fields( "datetime" ), "%s:%s:%s:%s:%s:%s.%s" ); Fields timeFields = new Fields( "year", "month", "day", "hour", "minute", "second", "millisecond" ); pipe = new Each( pipe, timeFields, formatter, Fields.ALL ); pipe = new Each( pipe, new Fields( "datetime" ), new DateParser( new Fields( "ts" ), "yyyy:MM:dd:HH:mm:ss.SSS" ), Fields.ALL );
Create date/time string from 'timestamp' field
DateFormatter timeFormatter = new DateFormatter( new Fields( "datetime" ), "HH:mm:ss.SSS" ); // pass field "ts" into the timeFormatter function, which returns a "datetime" field pipe = new Each( pipe, new Fields( "ts" ), timeFormatter, Fields.ALL );
Get 'DISTINCT' (unique) values from a Tuple stream
// group on all values pipe = new GroupBy( pipe, Fields.ALL ); // only take the first tuple in the grouping, ignore the rest pipe = new Every( pipe, Fields.ALL, new First(), Fields.RESULTS );
Merge two streams with the same column names
// merge streams, group on 'last' field name
// lhs = 'first', 'last', 'middle'
// rhs = 'first', 'last', 'middle'
pipe = new GroupBy( lhs, rhs, new Fields("last") );
Passing properties to an Operation
// set propery on Flow Properties props = new Properties(); props.set( "key", "value"); FlowConnector conn = new FlowConnector( props ); .... // get property from within an Operation (Function,Filter,etc) String value = flowProcess.getProperty( "key" );
Passing a JobConf to a Flow and Operation
// set JobConf on Flow JobConf jobConf = new JobConf(); jobConf.set( "key", "value" ); Properties props = new Properties(); MultiMapReducePlanner.setJobConf( properties, jobConf ); FlowConnector conn = new FlowConnector( props ); .... // get property from within an Operation (Function,Filter,etc) String value = flowProcess.getProperty( "key" ); // or JobConf jobConf = ((HadoopFlowProcess) flowProcess).getJobConf();