Lingual is now available for download or build. See the Lingual page for details, or visit the Lingual project page.
Lingual’s Architecture
Julian Hyde discusses how Optiq and Cascading work together to become Lingual.
Cascading Lingual – True SQL for Cascading and Hadoop
Announcing Lingual, a new framework that executes ANSI SQL queries as Cascading applications on Apache Hadoop clusters.
Read more about it on the Lingual project page, signup for announcements on the mail list, or read the press release.
Cascading 2.2 WIP and CoercibleTypes
Cascading 2.2 is starting to take shape for those interested in test driving emerging features.
Of note is “field type” support. This allows fields read from an input file to have type information retained through to where the data is sinked/stored to a file.
This is important for a few reasons:
- Detecting incompatible comparisons during joins and sorting at planner time
- Retain canonical types in a Tuple
- Reading and writing field type information from/into long term archive files (Avro, Thrift, etc)
- Reducing intermediate file size by guaranteeing field type information
- Custom type coercion via CoercibleType interface
The CoercibleType interface is of particular importance.
Consider reading a CSV file with a date column, like 28/Dec/2012:16:17:12:931 -0800.
Internally date information is best handled as a long timestamp. But when externalized as a String, it should read as a date string, not a stringified long value.
The DateType implementation of CoercibleType can be used when declaring the date field. Given the correct string date format string, the value of the date field will be stored as its canonical type, long.
So if an Operation or sink Scheme wants the value as a string, by calling tupleEntry.getString("date"), it will be automatically converted back to the proper date string.
Or to store a long value of the date string, the code can call tupleEntry.setString("date", "28/Dec/2012:16:17:12:931 -0800"), resulting in
tupleEntry.getObject("date") instanceof Long is true.
CoercibleType isn’t a replacement for data-cleansing code that can handle contingencies in the data, but for data that is known to be clean, even data emitted from prior Cascading Flows, it is quite handy.
This opens up the door for more complex types that may have multiple representations. Consider a hypothetical Person object that can be serialized as binary to disk, but has a JSON String representation, or has a Map Object in memory/runtime representation.
See conjars or the Concurrent site for downloads.
Cascading 2.1
We are happy to announce that Cascading 2.1 is now publicly available for download.
http://www.cascading.org/downloads/
This release includes a number of new features. Specifically:
- Restartable Flows using Checkpointing
- Improved memory utilization and gc
- Refactored build system, source and javadoc jars now available through conjars.org
For more details see:
Cascading for the Impatient, Part 6
In our fifth installment of this series we showed how to implement TF-IDF in Cascading application. If you haven’t read that yet, it’s probably best to start there.
Today’s post extends the TF-IDF app to show best practices for test-driven development (TDD) at scale. We’ll incorporate unit tests into the build (should have done so sooner), plus show how to leverage TDD features which are unique to Cascading: checkpoints, traps, assertions, etc. These features are based on using Checkpoint, Debug, and AssertMatches.
We’ll keep building on this example to show how to leverage “local mode”.
Theory
At first glance, the notion of test-driven development (TDD) might seem a bit antithetical in the context of Big Data. After all, TDD is all about short development cycles, writing automated test cases which are intended to fail, and lots of refactoring. Those descriptions would not appear to fit with batch jobs involving terabytes of data and huge clusters running apps that take days to complete.
Stated in a different way, according to Kent Beck, TDD “encourages simple designs and inspires confidence.” That statement does actually fit well with Cascading. The API is intended to provide simple design patterns for working with data – GroupBy, Join, Count, Regex, Filter – so that the need for writing custom functions becomes relatively rare. That speaks to “encouraging simple designs” directly. The practice in Cascading of modeling business process and orchestrating MapReduce workflows – that speaks to “inspiring confidence” in a big way.
So now we’ll let the cat out of the bag for a little secret… Working with unstructured data at scale has been shown to be quite valuable (Google, Amazon, LinkedIn, Twitter, etc.) however most of the “heavy lifting” which we perform in MapReduce workflows is essentially cleaning up data. DJ Patil explained this point quite eloquently in Data Jujitsu: “It’s impossible to overstress this: 80% of the work in any data project is in cleaning the data … Work done up front in getting clean data will be amply repaid over the course of the project.”
Cleaning up the data allows for subsequent use of sampling techniques, dimensional reduction, and other practices which help alleviate some of the bottlenecks which might otherwise be encountered in Big Data. In other words, there are great use cases for formalisms which help demonstrate that “dirty” data at scale has been cleaned up. Those turn out to be quite valuable in practice.
However, TDD practices tend to be based on unit tests or mocks … how does one write a quick unit test for a Godzilla-sized dataset?
The short answer is: you don’t. However, you can greatly reduce the need for writing unit test coverage by limiting the amount of custom code required. Hopefully we’ve shown that aspect of Cascading by now. Beyond that aspect, you can use sampling techniques to quantify the confidence for an app running correctly. You can also define system tests at scale in relatively simple ways. Furthermore, you can define contingencies for what to do when assumptions fail … as they inevitably do, at scale.
Let’s discuss sampling… generally speaking, large MapReduce workflows are relatively opaque processes which are difficult to observe. However, Cascading provides two techniques for observing portions of a workflow. One very simple approach is to insert a Debug into a pipe, to see the tuple values passing through a particular part of a workflow. Debug output goes to the log instead of a file, but it can be turned off, e.g., with a command line option. If the data is large, one can use a Sample filter to sample the tuple values which get written to the log.
Another approach is to use a Checkpoint, which forces intermediate data to be written out to HDFS. This may also become important for performance reasons, i.e., forcing results to disk to avoid recomputing – e.g., when there are multiple uses for the output of a pipe downstream such as with the right side of a HashJoin. Sampling may be performed either before (like with Debug) or after the data gets persisted to HDFS.
Next, let’s talk about system tests. Cascading include support for stream assertions. These provide mechanisms for asserting that the values in a tuple stream meet certain criteria – similar to the assert keyword in Java, or an assert not null in a unit test. We can assert patterns strictly as unit tests during development, then run testing against regression data. For performance reasons, we might use command line options to turn off assertions in production. Or keep them, if a use case requires that level of guarantees.
Lastly, what to do when assumptions fail? One lesson of working with data at scale is that the best assumptions will inevitably fail. Unexpected things happen, and 80% of the work will be cleaning up problems. Cascading defines failure traps which capture data that causes an Operation to fail, e.g., throw an Exception. For example, perhaps 99% of the cases in your log files can be rolled up into a set of standard reports… but 1% requires manual review. Great, process the 99% which work and shunt the 1% failure cases into a special file, marked “for manual review”. Keep in mind, however, that traps are intended for handling exceptional cases. If you know in advance how to categorize good vs. bad data, then use a filter instead of a trap.
Meanwhile, a conceptual diagram for this implementation of TF-IDF in Cascading is shown as:
Source
Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:
git clone git://github.com/Cascading/Impatient.git
For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.
Let’s add a unit test and show how that works into this example. In the Gradle build script build.gradle we need to modify the compile task to include JUnit and other testing dependencies:
compile( 'cascading:cascading-hadoop:2.0.1' ) { transitive = true }
testCompile( 'org.apache.hadoop:hadoop-test:1.0.3' )
testCompile( 'junit:junit:4.8.+' )
Then we’ll add a test task:
test {
include 'impatient/**'
//makes the standard streams (err and out) visible at console when running tests
testLogging.showStandardStreams = true
//listening to test execution events
beforeTest { descriptor ->
logger.lifecycle("Running test: " + descriptor)
}
onOutput { descriptor, event ->
logger.lifecycle("Test: " + descriptor + " produced standard out/err: " + event.message )
}
}
A little restructuring of the source directories is requried – see our GitHub code repo, where it’s all set up property. Then we add a unit test for our custom function to “scrub” tokens, which was created in Part 3. This goes into a new class ScrubTest.java:
public class ScrubTest
{
@Test
public void testMain() throws Exception
{
ScrubTest tester = new ScrubTest();
Fields fieldDeclaration = new Fields( "doc_id", "token" );
ScrubFunction scrub = new ScrubFunction( fieldDeclaration );
assertEquals( "Scrub", "foo bar", scrub.scrubText( "FoO BAR " ) );
}
}
This is a particularly good place for a unit test. Scrubbing tokens is a likely point at which edge cases get encountered at scale. In practice, you’d probably want to define even more unit tests.
Next, going back to the Main.java module, let’s add sink taps for writing out trapped data and checkpointed data:
String trapPath = args[ 4 ]; String checkPath = args[ 5 ]; Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath ); Tap checkTap = new Hfs( new TextDelimited( true, "\t" ), checkPath );
Next we’ll modify the head of the existing pipe assembly for TF-IDF to incorporate a Stream Assertion. We use an AssertMatches to define the expected pattern for input data. Then we apply AssertionLevel.STRICT to force validation of the data:
// use a stream assertion to validate the input data Pipe docPipe = new Pipe( "token" ); AssertMatches assertMatches = new AssertMatches( "doc\\d+\\s.*" ); docPipe = new Each( docPipe, AssertionLevel.STRICT, assertMatches );
Next we’ll add a Debug and DebugLevel.VERBOSE to the D branch, to trace the tuple values in the flow there:
// example use of a debug, to observe tuple stream; turn off below dfPipe = new Each( dfPipe, DebugLevel.VERBOSE, new Debug( true ) );
Next we’ll add a Checkpoint after the join of the DF and D branches. That forces the tuples at this point in the workflow to be persisted to HDFS:
// create a checkpoint, to observe the intermediate data in DF stream Checkpoint idfCheck = new Checkpoint( "checkpoint", idfPipe ); Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfCheck, df_token );
Next we have a relatively more complex set of taps to connect in the FlowDef, to include output data for TDD-related features:
// connect the taps, pipes, traps, checkpoints, etc., into a flow FlowDef flowDef = FlowDef.flowDef() .setName( "tfidf" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( tfidfPipe, tfidfTap ) .addTailSink( wcPipe, wcTap ) .addTrap( docPipe, trapTap ) .addCheckpoint( idfCheck, checkTap );
Last, we’ll specify the verbosity level for the debug trace, and the strictness level for the stream assertion:
// set to DebugLevel.VERBOSE for trace, or DebugLevel.NONE in production flowDef.setDebugLevel( DebugLevel.VERBOSE ); // set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production flowDef.setAssertionLevel( AssertionLevel.STRICT );
Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 6 now uses twelve mappers and nine reducers. In other words, we added one mapper as the overhead for gaining lots of test features.
The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the sections which were added since Part 5:
If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.
Build
The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:
gradle ideaModule
To build the sample app from the command line use:
gradle clean jar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Run
Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:
$ hadoop version Hadoop 1.0.3
Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf output/trap output/check
The output log should include a warning, based on the stream assertion, which looks like this:
12/08/06 14:15:07 WARN stream.TrapHandler: exception trap on branch: 'token', for fields: [{2}:'doc_id', 'text'] tuple: ['zoink', 'null']
cascading.operation.AssertionException: argument tuple: ['zoink', 'null'] did not match: doc\d+\s.*
at cascading.operation.assertion.BaseAssertion.throwFail(BaseAssertion.java:107)
at cascading.operation.assertion.AssertMatches.doAssert(AssertMatches.java:84)
at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:57)
at cascading.flow.stream.ValueAssertionEachStage.receive(ValueAssertionEachStage.java:33)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:124)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
That is expected behavior. We directed the API to show warning when stream assertions failed. The data which caused this warning will get trapped.
Not too far after that point in the log, there should be debug output which looks like the following:
12/08/06 14:15:46 INFO hadoop.FlowReducer: sinking to: TempHfs["SequenceFile[ ['df_count', 'df_token', 'lhs_join']]"][DF/93669/] ['df_count', 'df_token', 'lhs_join'] ['1', 'air', '1'] ['3', 'area', '1'] ['1', 'australia', '1'] ['1', 'broken', '1']
… plus several more lines. That is the result of our debug trace.
Output text gets stored in the partition file output/tfidf which you can then verify:
more output/tfidf/part-00000 more output/trap/part-m-00001-00000 more output/check/part-00000
Notice the data tuple output/trap:
zoink null
That did not match the regex doc\\d+\\s.* which was specified by the stream assertion.
Here’s a log file from our run of the sample app, part 6. If your run looks terribly different, something is probably not set up correctly.
To run this same app on the Amazon AWS Elastic MapReduce service, based on their command line interface, use the following commands. Be sure to replace temp.cascading.org with your own S3 bucket name:
s3cmd put build/libs/impatient.jar s3://temp.cascading.org/impatient/part6.jar
s3cmd put data/rain.txt s3://temp.cascading.org/impatient/
s3cmd put data/en.stop s3://temp.cascading.org/impatient/
elastic-mapreduce --create --name "TF-IDF" \
--jar s3n://temp.cascading.org/impatient/part6.jar \
--arg s3n://temp.cascading.org/impatient/rain.txt \
--arg s3n://temp.cascading.org/impatient/out/wc \
--arg s3n://temp.cascading.org/impatient/en.stop \
--arg s3n://temp.cascading.org/impatient/out/tfidf \
--arg s3n://temp.cascading.org/impatient/out/trap \
--arg s3n://temp.cascading.org/impatient/out/check
Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]
Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.
Stay tuned for the next installments of our Cascading for the Impatient series.
Cascading for the Impatient, Part 5
In our fourth installment of this series we showed how to use HashJoin on two pipes, to perform “stop words” filtering at scale in a Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.
Today’s lesson builds on that same Word Count app and now implements TF-IDF in Cascading. We’ll show how to use a SumBy and a CoGroup to aggregate the data needed, and then how to use an ExpressionFunction to calculate the TF-IDF weights. We also continue to show best practices for workflow orchestration and test-driven development (TDD) at scale.
Theory
Fortunately, most all of the data required to calculate TF-IDF weight was already available in our Word Count example in Part 4. However, we’ll need to revise the overall workflow, adding more pipe assemblies to it.
TF-IDF calculates a metric for each token which indicates how “important” that token is to a document within the context of a collection of documents. The metric is calculated based on relative frequencies. On one hand, tokens which appear in most documents tend to have very low TF-IDF weights. On the other hand, tokens which are less common but appear multiple times in a few documents tend to have very high TF-IDF weights. Consequently, the TF-IDF algorithm gets used to drive the indexing in some text search engines, such as Apache Lucene. In particular, TF-IDF provides an effective way to rank documents for a search query. For a good discussion of this in gory detail, see the Similarity class in Lucene.
Note that in the literature, token and term may be used interchangeably for this sample app. More advanced text analytics might look at sequences of words, in which case a term becomes a more complex structure. However, we’re only looking at single words.
We’ll need the following components to calculate TF-IDF:
- term count: number of times a given term appears in a given document
- document frequency: how frequently a given term appears across all documents
- number of terms: total number of terms in a given document
- document count: total number of documents
Slight modifications to Word Count provides the means to get both term count and document frequency, along with the other two components which get calculated almost as by-products. In this sense, we get to leverage Cascading by re-using the results of some pipes within our workflow. A conceptual diagram for this implementation of TF-IDF in Cascading is shown as:
Source
Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:
git clone git://github.com/Cascading/Impatient.git
For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.
First, let’s add another sink tap to write the TF-IDF weights as an output data set:
String tfidfPath = args[ 3 ]; Tap tfidfTap = new Hfs( new TextDelimited( true, "\t" ), tfidfPath );
Next we’ll modify the existing pipe assemblies for Word Count, beginning immediately after the “stop words” filter. We add the following line to retain only the doc_id and token fields:
tokenPipe = new Retain( tokenPipe, fieldSelector );
Then we re-use the intermediate results from tokenPipe, creating three different branches in the workflow. The first addresses term counts:
// one branch of the flow tallies the token counts for term frequency (TF) Pipe tfPipe = new Pipe( "TF", tokenPipe ); tfPipe = new GroupBy( tfPipe, new Fields( "doc_id", "token" ) ); Fields tf_count = new Fields( "tf_count" ); tfPipe = new Every( tfPipe, Fields.ALL, new Count( tf_count ), Fields.ALL ); Fields tf_token = new Fields( "tf_token" ); tfPipe = new Rename( tfPipe, token, tf_token );
At that point, we have TF values for each token.
In a second branch we’ll calculate D, the total number of documents in a way which can be consumed later in a join. This uses a built-in partial aggregate operation called SumBy:
// one branch counts the number of documents (D) Fields doc_id = new Fields( "doc_id" ); Fields tally = new Fields( "tally" ); Fields rhs_join = new Fields( "rhs_join" ); Fields n_docs = new Fields( "n_docs" ); Pipe dPipe = new Unique( "D", tokenPipe, doc_id ); dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL ); dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL ); dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class );
This part may seem less than intuitive… and it is a bit odd. We need a total document count as a field, in each tuple for the RHS of the join. That keeps our processing parallel, allowing this calculation to scale-out horizontally.
The third branch calculates DF as a step toward inverse document frequency per token:
// one branch tallies the token counts for document frequency (DF) Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL ); dfPipe = new GroupBy( dfPipe, token ); Fields df_count = new Fields( "df_count" ); Fields df_token = new Fields( "df_token" ); Fields lhs_join = new Fields( "lhs_join" ); dfPipe = new Every( dfPipe, Fields.ALL, new Count( df_count ), Fields.ALL ); dfPipe = new Rename( dfPipe, token, df_token ); dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL );
Now we have all the components needed to calculate TF-IDF weights. We’ll use two kinds of joins – a HashJoin followed by a CoGroup – to merge the three branches together:
// join to bring together all the components for calculating TF-IDF // the D side of the join is smaller, so it goes on the RHS Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join ); // the IDF side of the join is smaller, so it goes on the RHS Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfPipe, df_token );
Then we calculate the weights using an ExpressionFunction in Cascading:
// calculate the TF-IDF weights, per token, per document Fields tfidf = new Fields( "tfidf" ); String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )"; ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class ); Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" ); tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL ); fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" ); tfidfPipe = new Retain( tfidfPipe, fieldSelector ); tfidfPipe = new Rename( tfidfPipe, tf_token, token );
Now we can get back to the remainder of the workflow. We’ll keep the actual Word Count metrics, since those are useful for testing:
// keep track of the word counts, which are useful for QA Pipe wcPipe = new Pipe( "wc", tfPipe ); Fields count = new Fields( "count" ); wcPipe = new SumBy( wcPipe, tf_token, tf_count, count, long.class ); wcPipe = new Rename( wcPipe, tf_token, token );
Last, we’ll add another sink tap to the FlowDef, to include output data for our TF-IDF weights:
// connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef() .setName( "tfidf" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( tfidfPipe, tfidfTap ) .addTailSink( wcPipe, wcTap );
We’ll change the name of the resulting Flow too, to keep our code properly descriptive:
// write a DOT file and run the flow Flow tfidfFlow = flowConnector.connect( flowDef ); tfidfFlow.writeDOT( "dot/tfidf.dot" ); tfidfFlow.complete();
Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in Cascading for Part 5 now uses eleven mappers and nine reducers. That amount jumped by 5x since our previous example.
The diagram for the Cascading flow will be in the `dot/` subdirectory after the app runs. Here we have annotated it to show where the *mapper* and *reducer* phases are running, and also the sections which were added since _Part 4_:
If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.
Build
The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:
gradle ideaModule
To build the sample app from the command line use:
gradle clean jar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Run
Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:
$ hadoop version Hadoop 1.0.3
Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf
Output text gets stored in the partition file output/tfidf which you can then verify:
more output/tfidf/part-00000
BTW, did you notice what the TF-IDF weights for the tokens rain and shadow were? Those represent what the documents have in common. How do those compare with weights for the other tokens? Conversely, consider the weights for australia (high weight) or area (different weights).
Here’s a log file from our run of the sample app, part 5. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]
Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.
For those familiar with Apache Pig, we have included a comparable script, and to run that:
rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop -p tfidfPath=./output/tfidf ./src/scripts/tfidf.pig
Stay tuned for the next installments of our Cascading for the Impatient series.
Cascading for the Impatient, Part 4
In our third installment of this series we showed how to write a custom Operation for a Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.
Today’s lesson takes that same Word Count app and expands on it to implement a stop words filter, which is a list of tokens to nix from the stream. We’ll show how to use HashJoin on two pipes, so we can perform that filtering at scale. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.
Theory
The first question to consider is, why do we want to use a stop words list? After all, the TF-IDF algorithm is supposed to filter out the less significant words anyway. Why would we need to include additional filtering if the TF-IDF is implemented correctly?
Use of a stop words list originated in work by Hans Peter Luhn at IBM Research, during the dawn of computing. The reasons for it are two-fold. On one hand, consider that the most common words in any given natural language are generally not useful for text analytics. For example in English, words such as “the”, “of”, “and” are probably not what you want to search, and probably not interesting for Word Count metrics. They represent the long tail of the token distribution: high frequency, low semantic value. Consequently, they cause the bulk of the processing. Natural languages tend to have on the order of 10^5 words, so the potential size of any stop words list is nicely bounded. Filtering those high-frequency words out of the token stream reduces the amount processing required later in the workflow, dramatically.
On the other hand, you may also want to remove some words explicitly from the token stream. This almost always comes up in practice, especially when working with public discussions such as social network comments. Think about it, what are some of the most common words posted online in comments? Words which are not the most common words in “polite” English? Based on the math for TF-IDF, those would tend to get ranked highest. Do you really want those words to bubble up to the “most significant” positions in your text analytics? In automated systems which leverage unsupervised learning, this can lead to highly embarrassing situations. Caveat machinator.
Next, let’s consider about working with a Joiner in Cascading. We will have two pipes, one for the “scrubbed” token stream and another for the stop words list. We want to filter all instances of tokens from the stop words list out of the token stream. If we weren’t working in MapReduce, a naive approach would simply load the stop words list into a hashtable, then iterate through our token stream to lookup each token in the hashtable and delete it if found. If we were coding in Hadoop directly, a less naive approach would be to put the stop words list into the distributed cache and have a job step which loads it during setup, then rinse/lather/repeat from the naive coding approach described above.
Instead we want to leverage the workflow orchestration in Cascading. One might try to write a custom operation in Cascading, as we did in Part 3 — e.g., a custom Filter. That sounds like extra work, plus also extra code to verify and maintain, when the built-in primatives will to tend to be more efficient anyway.
Cascading provides for joins on pipes, and conceptually a Left Outer Join would solve our requirement to filter stop words. Think of joining the token stream with the stop words list. When the result is non-null, the join has identified a stop word. Discard it.
Understand that there’s a big problem with using joins in MapReduce. Outside of the context of a relational database, arbitrary joins do not work efficiently. Suppose you have N items in one tuple stream and M items in another, and want to join them? In the general case, for an arbitrary join, that requires N x M operations and also introduces a data dependeny, such that the join cannot be performed in parallel. If both N and M are relatively large, say in the millions of tuples, then we’d end up processing 10^12 operations on a single processor — which kind of defeats the purpose, in terms of leveraging MapReduce.
Fortunately, if some of that data is sparse then we can use specific variants of joins to compute efficiently in parallel. Cascading includes a HashJoin which joins two or more tuple streams into a single stream via a Joiner — when all but one tuple stream are small enough to fit into memory. In other words, given some insights about the “shape” of the data, when you have a large data set (non-sparse) you can join with one or more small data sets (sparse) in memory.
A join has a left-hand side (LHS) and a right-hand side (RHS); in Cascading we put the sparser data on the right-hand side. So the HashJoin implements a non-blocking “asymmetrical join” or “replicated join”, where the left-most side will not block (accumulate into memory) in order to complete the join, but the right-most sides will.
Recall that stop words lists tend to be bounded at approximately 10^5, which is relatively sparse when compared with an arbitrarily large token stream. In typical “web scale” text analytics use cases for TF-IDF, that might be in the range billions of tokens, i.e., several orders of magnitude larger than our largest possible stop words list. Sounds like a great use case for HashJoin.
A conceptual diagram for this implementation of Word Count in Cascading is shown as:
Source
Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:
git clone git://github.com/Cascading/Impatient.git
For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code.
This example in Part 4 uses a HashJoin in Cascading to implement a stop words list, filtering some words out of the token stream prior to counting.
First, let’s add another source tap to read the stop words list as an input data set:
String stopPath = args[ 2 ]; Fields stop = new Fields( "stop" ); Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
Next we’ll insert another pipe into the assembly, placing tokenPipe between our “scrub” and “count” sections of our workflow. That’s where the HashJoin gets performed, implementing a left join:
// perform a left join to remove stop words, discarding the rows // which joined with stop words, i.e., were non-null after left join Pipe stopPipe = new Pipe( "stop" ); Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
Next we discard the non-null results from the left join, using a RegexFilter:
tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
Now this new tokenPipe can be fitted back into the wcPipe which we used before. The workflow continues on much the same from there:
Pipe wcPipe = new Pipe( "wc", tokenPipe );
Last, we’ll add the additional source tap to the FlowDef, to include input data for our stop words list:
// connect the taps, pipes, etc., into a flow FlowDef flowDef = FlowDef.flowDef() .setName( "wc" ) .addSource( docPipe, docTap ) .addSource( stopPipe, stopTap ) .addTailSink( wcPipe, wcTap );
Modify the Main method to make those changes, then build a JAR file. You should be good to go. For those keeping score, the resulting physical plan in MapReduce for Part 4 still uses one mapper and one reducer.
The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 3:
If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.
Build
The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:
gradle ideaModule
To build the sample app from the command line use:
gradle clean jar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Run
Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:
$ hadoop version Hadoop 1.0.3
Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc data/en.stop
Output text gets stored in the partition file output/wc which you can then verify:
more output/wc/part-00000
Here’s a log file from our run of the sample app, part 4. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]
Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.
For those familiar with Apache Pig, we have included a comparable script, and to run that:
rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop ./src/scripts/wc.pig
Stay tuned for the next installments of our Cascading for the Impatient series.
Cascading Software Development Kit
The Cascading SDK is now available for download.
The SDK includes Cascading source and jars, and many of the Cascading based tools like Load and Multitool.
It also includes at Amazon Elastic MapReduce install script (bootstrap action) that will pre-install all included tools on the master node.
Cascading for the Impatient, Part 3
In our second installment of this series we showed how to implement Word Count as a Cascading 2.0 application. If you haven’t read that yet, it’s probably best to start there.
Today’s lesson takes the same app and stretches it even more. We’ll show how to write a custom Operation. Again, this code is leading toward an implementation of TF-IDF in Cascading. We’ll show best practices for workflow orchestration and test-driven development (TDD) at scale.
Theory
This example in Part 3 uses a custom operation in Cascading to “scrub” the token stream, prior to counting the tokens. Previously we used a RegexSplitGenerator to tokenize the text, which is a built-in operation and works pretty well. However, one thing you’ll find in working with most any text analytics at scale is that there are lots and lots of edge cases. Cleaning up the edge cases generally represents the bulk of the engineering work in text analytics. Hyphens, exponents, different kinds of quotes, etc. If you try to incorporate every possible edge case into a regex, that tends to becomes complex and brittle. Identifying edge cases is an iterative process, based on learnings over time, based on operations at scale. Also, each application will tend to have its own nuances, which makes it difficult to leverage standard libraries for text processing.
So we subclass the Operation class in Cascading and work through all the edge cases, adding more as they get identified. An added benefit is that we can also add unit tests to our custom class, with test coverage increasing as each new issue gets found. More about unit tests later.
Operations get used in Cascading to perform the “T” part of ETL. In other words, operations act on the data to transform the tuple stream, filter it, analyze it, etc. Think what roles command line utilities such as grep or awk perform in Unix shell scripts. Cascading provides a rich library of standard operations, to encode the “business logic” of transforming “Big Data”. It’s relatively simple to develop your own custom operations, and our text “scrubbing” example here shows a good use case. However, if you find yourself starting to develop lots of custom operations every time you begin to write a Cascading app, that’s a good indication that you need to step back and reevaluate. On one hand, the standard operations have been developed over the years and they tend to cover a large class of MapReduce applications. On the other hand, if you aren’t careful while defining a custom operation, you may inadvertently introduce a bottleneck. The standard operations encapsulate best practices and design patterns for parallelism. Something to think about.
Meanwhile, a conceptual diagram for this implementation of Word Count in Cascading is shown as:
Source
Download source for this example on GitHub. You’ll need to clone the whole of this multi-part series:
git clone git://github.com/Cascading/Impatient.git
For quick reference, the source code and a log for this example are listed in a gist. The input data stays the same as in the earlier code
First, let’s make room for using our custom operation ScrubFunction to clean up the token stream. We place it into the docPipe assembly, immediately after the regex which splits the raw text:
Fields scrubArguments = new Fields( "doc_id", "token" ); docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );
Next, we need to define a constructor for our custom operation:
public ScrubFunction( Fields fieldDeclaration )
{
super( 2, fieldDeclaration );
}
The fieldDeclaration parameter allows us to name the tuple fields from the flow. Since we’ll output the results as TSV, a header will be created from the tuple fields.
Next, we define an operate method. In other words, we define the function which operates on the tuple stream. This is essentially a wrapper which pulls tuples from the input stream, applies our scrubText method to each token, then inserts new tuples into the output stream:
public void operate( FlowProcess flowProcess, FunctionCall functionCall )
{
TupleEntry argument = functionCall.getArguments();
String doc_id = argument.getString( 0 );
String token = scrubText( argument.getString( 1 ) );
if( token.length() > 0 )
{
Tuple result = new Tuple();
result.add( doc_id );
result.add( token );
functionCall.getOutputCollector().add( result );
}
}
Last but not least, we define the scrubText method to clean up tokens. This version is relatively simple, and in practice it would have many more cases. It’s also relatively simple to write unit tests against:
public String scrubText( String text )
{
return text.trim().toLowerCase();
}
Place that first set of source lines all into a Main method, create an additional class for ScrubFunction for the rest of the source shown, then build a JAR file. You should be good to go.
The diagram for the Cascading flow will be in the dot/ subdirectory after the app runs. Here we have annotated it to show where the mapper and reducer phases are running, and also the section which was added since Part 2:
If you want to read in more detail about the classes in the Cascading API which were used, see the Cascading 2.0 User Guide and JavaDoc.
Build
The build for this example is based on using Gradle. The script is in build.gradle and to generate an IntelliJ project use:
gradle ideaModule
To build the sample app from the command line use:
gradle clean jar
What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo – almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org
Run
Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:
$ hadoop version Hadoop 1.0.3
Be sure to set your HADOOP_HOME environment variable. Then clear the output directory (Apache Hadoop insists, if you’re running in standalone mode) and run the app:
rm -rf output hadoop jar ./build/libs/impatient.jar data/rain.txt output/wc
Output text gets stored in the partition file output/wc which you can then verify:
more output/wc/part-00000
Here’s a log file from our run of the sample app, part 3. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascading-user email forum. Or visit one of our user group meetings. [Coming up real soon…]
Also, compare these other excellent implementations of the example apps here – by Sujit Pal in Scalding and by Paul Lam in Cascalog.
For those familiar with Apache Pig, we have included a comparable script, and to run that:
rm -rf output
mkdir -p dot
pig -p docPath=./data/rain.txt -p wcPath=./output/wc -p stopPath=./data/en.stop ./src/scripts/wc.pig
Stay tuned for the next installments of our Cascading for the Impatient series.







