Multitool

Multitool is a simple command line interface for building large-scale data processing jobs based on Cascading. It provides the functions of many popular Unix/Linux command line utilities which are typically connected by pipes – such as grep, sed, awk, cut, join, uniq, wc, tr, cat, sort – with Apache Hadoop doing the heavy lifting.

The open source repo is available on GitHub at https://github.com/Cascading/cascading.multitool where the “README.md” file has instructions for build and installation, and the “COMMANDS.md” file has a full list of command line options.

Multitool can be run in Hadoop as a JAR file:

hadoop jar multitool.jar param1 param2 .. paramN

Or, after installing on your laptop or server, as a command suitable for use in bash scripts, cron jobs, etc.:

multitool param1 param2 .. paramN

For an example of running in the cloud, see this app note on Amazon AWS: https://aws.amazon.com/articles/2293


Why use Multitool?

Multitool can run on your laptop with a small data set, using Hadoop in local mode. The very same commands can also be used to run on a large Hadoop cluster with petabytes of data. For example, you can prototype a large, complex ETL workflow simply using sample data on a Linux command line prompt. Then deploy that same app at scale on a Hadoop cluster.

Another benefit is learning about efficient MapReduce programming, because Multitool can generate DOT files to represent its workflows. These can be viewed with popular drawing apps such as OmniGraffle and Visio. In other words, you can work through a problem with small data sets on a command line, ensuring that correct results get produced. Then generate a DOT file to see how to program the same workflow in Cascading. In addition to Java, that could also be coded in Python, Scala, Clojure, or Ruby – with all the benefits of libraries, tools, and software lifecycle process which those languages provide.

Example 1: Grep

Consider having a file called “days.txt” which contains the following text:

I read in the newspapers they are going to have 30 minutes of intellectual stuff on television every Monday from 7:30 to 8.
Sometimes it pays to stay in bed in Monday, rather than spending the rest of the week debugging Monday's code.
Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe.
Monday religion is better than Sunday profession.
A schedule so tight that it would only work if I didn't sleep on Monday nights.
We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more.
We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.

One could use the grep utility to find text lines in that file which match a given pattern. For example, to write all the lines containing the string “Tuesday” into a file called “output/tuesday.txt”:

grep Tuesday data/days.txt > output/tuesday.txt

The results in the “output/tuesday.txt” file would be:

Monday's child is fair in face, Tuesday's child is full of grace, Wednesday's child is full of woe.
We're still investigating. I heard that Monday or Tuesday we will probably be having a press conference announcing more.
We take time to go to a restaurant two times a week. A little candlelight, dinner, soft music and dancing. She goes Tuesdays, I go Fridays.

To use Multitool for the exact same work:

multitool source=data/days.txt select=Tuesday sink=output/tuesday.txt

Note that Multitool uses Cascading operations for its parameters – a source for input and a sink for output in this case. That allows for a graph of MapReduce jobs to be connected together as a workflow, run as a single application.

Example 2: Word Count

Everybody loves the ubiquitous MapReduce example of “Word Count”, so let’s take a look at it in terms of Multitool compared with Linux command line utilities. First, the Multitool version:

hadoop jar multitool.jar source=data/days.txt expr="\$0.toLowerCase()" gen gen.delim=" " group=0 count group=1 group.secondary=0 group.secondary.reverse=true sink=output/ sink.part=1

Note that you may need to escape some characters when running from a Linux shell, such as in the “expr” statement above. That may not be needed in other Hadoop environments, such as Elastic MapReduce.

Log output from an example run is stored at: https://gist.github.com/2696821

The first 10 lines in the results would be:

to	5
a	5
of	4
monday	4
is	4
in	4
i	4
the	3
child	3
we	2

The same work performed with Linux command line utilities would be:
cat data/days.txt | tr [A-Z] [a-z] | tr " " \\n | sort | uniq -c | tr -s " " | cut -f 2,3 -d" " | sort -nr | awk -F'[ ]' '{ printf "%s\t%s\n", $2, $1 }'

These two different approaches use roughly the same length of command lines. However, the Multitool approach is stated in terms of set operations used in database theory, and is much closer conceptually to what’s happening in MapReduce. When you need to think in terms of data processing with MapReduce, then Multitool provides a simpler approach. It also scales as big as your Hadoop cluster.

We can look at this differently by adding another Multitool parameter “–dot=flow.dot” to generate a file “flow.dot”, which depicts the MapReduce workflow as a directed acyclic graph or “DAG”.

hadoop jar multitool.jar --dot=flow.dot source=data/days.txt expr="\$0.toLowerCase()" gen gen.delim=" " group=0 count group=1 group.secondary=0 group.secondary.reverse=true sink=output/ sink.part=1

The generated graph (in the center column of the following diagram) shows how this workflow is built out of two MapReduce jobs connected in a flow:

Directed graph for Word Count example

The first mapper phase loads from “days.txt” and processes the text. It splits each line on the spaces, converts the words to lowercase, then emits those words as tuples for the following reducer. The first reducer phase performs a “group by” to count the occurrence of each word. The second mapper phase simply loads this input (an “identity” mapper), and the second reducer phase sorts the results in decreasing order based on count.

Back to the point about learning to program MapReduce efficiently, let’s examine each of the Multitool commands which were used:

hadoop jar multitool.jar \
  source=data/days.txt \
  expr="\$0.toLowerCase()" \
  gen gen.delim=" " \
  group=0 count \
  group=1 group.secondary=0 group.secondary.reverse=true \
  sink=output/ sink.part=1

The source argument specifies that the input should be read from the file “days.txt” for the initial map phase. That is called a tap in Cascading.

The expr argument applies a function to the data, in this case a Java String function to convert the data to lower case.

The two gen arguments split the input text using a space delimiter, where the resulting stream of words is used for the input tuples to the rest of the flow.

The next two arguments, group=0 and count specify that fields emitted by the regular expression for the preceding gen should be grouped and counted according to the first column, i.e. count the number of occurrences of each word in the stream. This will produce two columns: words and counts. Note that a group must be used before a count argument.

The next set of group arguments cause the output to be sorted in descending order, based on the counts.

Finally, the two sink arguments write the output to the “output/” directory, where sink.part=1 means that all output should be consolidated into a single file.

Example 3: Join

Next, let’s join a view of the data in “days.txt” with a view of the data in another file called “rhymes.txt” which contains the following text:

Hey diddle diddle! The cat and the fiddle the cow jumped over the moon
Humpty Dumpty sat on a wall, Humpty Dumpty had a great fall. All the King's horses, and all the King's men cannot put Humpty Dumpty together again.
Jack be nimble, Jack be quick, Jack jump over the candlestick.
Hickory, dickory dock! The mouse ran up the clock. The clock struck one, and down her un, hickory, dickory, dock!
If you sneeze on Monday, you sneeze for danger; sneeze on a Tuesday, kiss a stranger.

For this example, we take two flows, one based on “days.txt” which we label as “lhs” and another based on “rhymes.txt” which we will label as “rhs”, apply “unique” to both, and then join them to produce the output in a directory “output/join/”:

hadoop jar multitool.jar \
  source=data/days.txt source.name=lhs \
  expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \
  group=0 unique \
  source=data/rhymes.txt source.name=rhs \
  expr="\$0.toLowerCase()" gen gen.delim="[^a-z]+" \
  group=0 unique \
  join join.lhs=lhs join.rhs=rhs \
  cut=0 \
  sink=output/join sink.part=1

The results provide the intersection of words common to both input files:

a
and
be
if
monday
on
s
the
tuesday

Since this workflow is a graph and not a linear pipeline, this example becomes more complex to state as a pipe of Unix/Linux command line utilities. We could use temporary files, and multiple command lines. However, keep in mind that the Multitool example only had two flows being joined, while it could have many flows joined in a complex DAG. That’s where the power of Cascading really begins to shine.

One Response to "Multitool"