A Gentle Introduction
Before reading on, check out the Cascading presentation (PDF) and About page to get up to speed on the core concepts behind Cascading.
In this introduction we will simply read an Apache log file, parse it into a few fields, and then write it back out as a text file. This is pretty much the simplest thing you can do with Cascading, but once you get these basics down, you can continue to add more complexity as needed.
Instructions on how to download and run this example are at the bottom of this page.
Our final 'pipe assembly' will look something like the diagram below. On the left, we take a "Source" file, pass it through a "Parser" function, and write it out to a "Sink" file. The "Each" element says we apply the "Parser" to each line in the file.

So, how do we represent this in code?
The first thing we do is create the "source" Tap that can read from the local filesystem. A Tap class takes a Scheme object and a String path to the file we are going to read. In this example, we will be using the TextLine scheme. This tells the Lfs Tap ('Lfs' for 'local file system') that the file we are reading is composed of lines of text.
By default the TextLine scheme declares two field names, "offset" and "line". Offset is just the number of bytes from the head of the file, this value is inherited from Hadoop.
// create Source tap to read a resource from the local file system // be default the TextLine scheme declares two fields, "offset" and "line" Tap localLogTap = new Lfs( new TextLine(), inputPath );
Next we create a Fields object with all the field names we will be using, define the regular expression (regex) we will use to parse each line from the log file, and declare the regex groups we want to bind to the fields. In this example, regex group 1 will be bound to the 'ip' fields, group 2 to the 'time' fields, etc.
// declare the field names we will parse out of the log file
Fields apacheFields = new Fields( "ip", "time", "method", "event", "status", "size" );
// define the regular expression to parse the log file with
String apacheRegex = "^([^ ]*) +[^ ]* +[^ ]* +\\[([^]]*)\\] +\\\"([^ ]*) ([^ ]*)" +
"[^ ]*\\\" ([^ ]*) ([^ ]*).*$";
// declare the groups from the above regex we want to keep. each regex group will be
// given a field name from 'apacheFields', above, respectively
int[] allGroups = {1, 2, 3, 4, 5, 6};
Here we create a RegexParser instance with the above values. The RegexParser will take a single field value as input, apply the above regular expression, and will return values for each field as a result.
// create the stream parser RegexParser parser = new RegexParser( apacheFields, apacheRegex, allGroups );
For the RegexParser to do any work, it needs to be plugged into a pipe assembly. So we create an Each Pipe instance. The Each pipe applies a given Function (like RegexParser) to each 'record' or tuple in the input file. In our case, our TextLine scheme will create a new tuple for each line, where the tuples have the fields "offset" and "line". Then our Each pipe will take the "line" value and pass it into the RegexParser function. The parser function will return 6 fields, one for each regex group. Finally, our Each pipe will throw away (by default) the incoming tuple, and replace it in the stream with the results of the RegexParser.
// create the parser pipe element, with the name 'parser', // and with the input field name 'line' Pipe importPipe = new Each( "parser", new Fields( "line" ), parser );
Now we create a new "sink" Tap to store our tuple stream values into. Here we use a Hfs ("Hfs" for the Hadoop default file system) Tap class. If you have a cluster configured, the Hfs instance will use the configured cluster filesystem. If your Hadoop installation is (by default) configured in 'local' mode, the Hfs tap will use the local filesystem.
We must also declare the output file type, in this case we want to save the stream as a text file, so we use TextLine again. And so that all the fields are written, we must tell the TextLine Scheme what fields it expects by passing in the "apacheFields" Fields instance.
// create a Sink tap to write to the default filesystem Tap remoteLogTap = new Hfs( new TextLine( apacheFields ), outputPath );
To finish the assembly we must connect our source, pipe, and sink together.
// connect the assembly to the SOURCE and SINK taps Flow parsedLogFlow = new FlowConnector().connect( localLogTap, remoteLogTap, importPipe );
Optionally we can print out the new Flow to disk so we can see what it does. This is very useul for very complex assemblies.
// optionally print out the parsedLogFlow to a graph file for import // into a graphics package. this is useful for visualizing the flow // to help with debugging //parsedLogFlow.writeDOT( "logparser.dot" );
Finally we start the flow. The start() method returns immediately. We call complete() to block the application until the Flow completes.
// start execution of the flow (either locally or on the cluster) parsedLogFlow.start(); // block until the flow completes parsedLogFlow.complete();
A working version of this 'logparser' example can be found on the downoads page. For the example to run, you must also download the latest version of Cascading, and a copy of Hadoop. Unarchive all the files into the same parent directory so it looks something like this.
~/working/ /hadoop-0.xx.yy /cascading <- you may need to rename the directory /logparser
Next both JAVA_HOME, HADOOP_HOME, and PATH must be created or updated.
export JAVA_HOME=/path/to/java/ export HADOOP_HOME=~/working/hadoop-0.xx.yy export PATH=$HADOOP_HOME/bin:$PATH
To compile and run with Hadoop, do the following.
cd logparser ant jar hadoop jar ./build/logparser.jar data/apache.200.txt output
Note that if you are running a cluster, the above 'output' directory will how up in the Hadoop filesystem (HDFS), not in the current local directory.