Homework #1 - Internet Search Technologies 15-505
Introduction to Hadoop/Eclipse/Map/Reduce
Deliverables
Run the LineIndexer (explained below), both on the hadoop.cs.washington.edu cluster and locally, using The Complete Works of Shakespeare (available at http://www.gutenberg.org/etext/100) as input. Take a look at the output.
- What are some problems with the LineIndexer as it's presented here?
- What difficulties did you run into when trying to run the LineIndexer?
Write your thoughts down in one or two paragraphs. Hand them in as a type-written response in class September 11, 2007.
Getting Started with Eclipse and Hadoop
(Special notes for windows users at the end of this document.)
- Download and install Eclipse (http://www.eclipse.org/downloads/)
- You must have Java installed (Java 5 recommended)
- If you're unfamiliar with Eclipse it might be helpful to look at and/or Eclipse And Java For Total Beginners. The cheat sheets found under the Help menu in Eclipse may also be of use (note that there is a Hadoop cheat sheet too!).
- Download the Hadoop plug-in from http://www.cs.cmu.edu/~knigam/15-505/hadoop-eclipse-plugin.jar. Put the Hadoop plug-in in your eclipse plugin directory (located at <eclipse-base-dir>/plugins/).
- Download & unpack our pre-packaged Hadoop workspace from http://www.cs.cmu.edu/~knigam/15-505/wkspc-ex.zip
- Start eclipse. You should be asked for a workspace on startup. Point eclipse to the workspace directory created in the previous step (or switch to this workspace after startup using File->Switch workspace)
- You should see on the left hand side the Package Explorer. Expand hadoop-0.13.1, src/examples. org.apache.hadoop.examples. RunExample.java shows how to run 3 different map reduces. The directories passed to WordCount and RandomWriter obviously will have to point to your input and output directories if you want to use these examples.
Change your passwords and create a home directory on the HDFS
HDFS is the Hadoop implementation of a distributed file system. It's how all of your mapreduces share data.
- Log into hadoop.cs.washington.edu (i.e. ssh <your username>@hadoop.cs.washington.edu). The first time you log in you should be asked to agree to IBM's terms and conditions and change your password.
- Execute
/hadoop/cmu60/bin/hadoop dfs -mkdir /user/<your username>
- Log into the CMU cluster (ssh <your username>@10.1.133.44). Again, you should be prompted to change your password.
Because you will be sharing the HDFS with your classmates it is very important that you keep all your files in your own directory. Be very careful when deleting files and directories. It is possible to delete other people's work in HDFS. We're assuming you will take this responsibility seriously.
Running a small mapreduce
- Select window-show view-other-mapreduce tools-mapreduce servers
- Click the blue elephant that appears at the top of the mapreduce servers frame (this frame is usually at the bottom).
- Fill in with the following information:
- Server Name: CMU Cluster
- Hostname: 10.1.133.44
- Installation directory: /hadoop/hadoop-0.13.1
- username: (your username)
- Select Tunnel Connections
- Tunnel via: hadoop.cs.washington.edu
- Tunnel username: (your username)
- Clicking validate location should ask you for your password at hadoop.cs, and then your password for the CMU cluster. It should then display a "Found Hadoop" message at the top of the window.
- Click Finish
Now you're ready to run a mapreduce.
Congratulations, you've just computed the value of pi. Ah, the sweet smell of successful pie. Er, pi.
Next we'll cover the details of mapreduce.
Hadoop Concepts & LineIndexer Example
To use Hadoop, you write two classes -- a Mapper and a Reducer. The Mapper class
contains a map function, which
is called once for each input and outputs any number of intermediate <key,
value> pairs. What code you put in the
map function depends on the problem you are trying to solve. Let's start with a
short example.
Suppose the goal is to create an "index" of a body of text -- we are given a
text file, and we want to output a list of
words annotated with the line-number at which each word appears.
For that problem, an appropriate Map strategy is: for each word in the input,
output the pair <word, line-number>
For example, suppose we have this five-line High school football coach quote as
our input data set:
We are not what
we want to be,
but at least
we are not what
we used to be.
Running the Map code that for each word, outputs a pair <word,
line-number>, yielding the set of pairs...
<we, 1>
<are, 1>
<not, 1>
<what, 1>
<we, 2>
<want, 2>
<to, 2>
<be, 2>
<but, 3>
etc...
For now we can think of the <key, value> pairs as a nice linear list, but
in reality, the Hadoop process runs in
parallel on many machines. Each process has a little part of the overall Map
input (called a map shard), and
maintains its own local cache of the Map output. (For a description of how it
really works, see Hadoop/MapReduce
or the Mapreduce paper you read for class)
After the Map phase produces the intermediate <key, value> pairs they are
efficiently and automatically grouped by
key by the Hadoop system in preparation for the Reduce phase (this grouping is
known as the Shuffle phase of a
map-reduce). For the above example, that means all the "we" pairs are grouped
together, all the "are" pairs are
grouped together like this, showing each group as a line...
<we, 1> <we, 2> <we, 4> <we, 5>
<are, 1> <are, 4>
<not, 1> <not, 4>
<what, 1> <what, 4>
<want, 2>
<to, 2> <to, 5>
<be, 2> <be 5>
<but, 3>
<at, 3>
<least, 3>
<used, 5>
The Reducer class contains a reduce function, which is then called once for each
key -- one reduce call for "we", one
for "are", and so on. Each reduce looks at all the values for that key and
outputs a "summary" value for that key in
the final output. So in the above example, the reduce is called once for the
"we" key, and passed the values the
mapper output, 1, 4, 2, and 5 (the values going into reduce are not in any
particular order). Suppose reduce computes
a summary value string made of the line numbers sorted into increasing order,
then the output of the Reduce phase
on the above pairs will produce the pairs shown below. The Reduce phase also
sorts the output <key,value> pairs
into increasing order by key:
<are, 1 4>
<at, 3>
<be, 2 5>
<but, 3>
<least, 3>
<not, 1 4>
<to, 2 5>
<we, 1 2 4 5>
<what, 1 4>
<want, 2>
<used, 5>
Like Map, Reduce is also run in parallel on a group of machines. Each machine is
assigned a subset of the keys to
work on (known as a reduce shard), and outputs its results into a separate file.
Consider a simple "line indexer". Given
an input text, offset indexer uses Hadoop to produce an index of all the words
in the text. For each word, the index
has a list of all the locations where the word appears and a text excerpt of
each line where the word appears.
Running the line indexer on the complete works of Shakespeare yields the
following entry for the word "cipher".
38624 To cipher what is writ in learned books,
12046 To cipher me how fondly I did dote;
34739 Mine were the very cipher of a function,
16844 MOTH To prove you a cipher.
66001 ORLANDO Which I take to be either a fool or a cipher.
(If you're interested the complete works of Shakespeare are available at http://www.gutenberg.org/etext/100)
The Hadoop code below for the line indexer is actually pretty short. The Map
code extracts one word at a time from
the input, and the Reduce code combines all the data for one word.
Line Indexer Map
A Java Mapper class is defined in terms of its input and intermediate <key,
value> pairs. To declare one, simply
subclass from MapReduceBase and implement the Mapper interface. The Mapper
interface provides a single
method: public void map(WriteableComparable key, Writeable value,
OutputCollector output, Reporter reporter).
Note: these inner classes probably need to be declared "static". If you get an
error saying ClassName.<init>() is not
defined, try declaring your class static. The map function takes four parameters
which in this example correspond to:
- WriteableComparable key - the byte-offset
- Writeable value - the line from the file
- OutputCollector - output - this has the .collect method to output a <key,
value> pair
- Reporter reporter - you can ignore this for now
The Hadoop system divides the (large) input data set into logical "records" and
then calls map() once for each
record. How much data constitutes a record depends on the input data type; For
text files, a record is a single line of
text. The main method is responsible for setting output key and value types.
Since in this example we want to output <word, offset> pairs, the types
will both be Text (a basic string wrapper,
with UTF8 support). It is necessary to wrap the more basic types because all
input and output types for Hadoop must
implement WritableComparable, which handles the writing and reading from disk.
Line Indexer Map
For the line indexer problem, the map code takes in a line of text. Then, for each
word in the line the mapper outputs one key/value pair:
<word, offset:line>
The Map code below accomplishes that by...
- Parsing each word out of value. For the parsing, the code delegates to a
utility StringTokenizer object that
implements hasMoreTokens() and nextToken() to iterate through the tokens.
- For each word, getting the location from key.
- Calling output.collect(word, value) to output a <key, value> pair for
each word.
Line Indexer Map Code
public static class LineIndexerMapper extends MapReduceBase implements Mapper {
private final static Text word = new Text();
private final static Text summary = new Text();
public void map(WritableComparable key, Writable val,
OutputCollector output, Reporter reporter)
throws IOException {
String line = val.toString();
summary.set(key.toString() + ":" + line);
StringTokenizer itr = new StringTokenizer(line.toLowerCase());
while(itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, summary);
}
}
}
When run on many machines, each mapper gets part of the input -- so for example
with 100 Gigabytes of data on
200 mappers, each mapper would get roughly its own 500 Megabytes of data to go
through. On a single mapper,
map() is called going through the data in its natural order, from start to
finish.
The Map phase outputs <key, value> pairs, but what data makes up the key
and value is totally up to the Mapper
code. In this case, the Mapper uses each word as a key, so the reduction below
ends up with pairs grouped by word.
We could instead have chosen to use the line-length as the key, in which case
the data in the reduce phase would
have been grouped by line length. In fact, the map() code is not required to
call output.collect() at all. It may have its
own logic to prune out data simply by omitting collect. Pruning things in the
Mapper is efficient, since it is highly
parallel, and already has the data in memory. By shrinking its output, we shrink
the expense of organizing and
moving the data in preparation for the Reduce phase.
Line Indexer Reduce
Defining a Reducer is just as easy. Simply subclass MapReduceBase and implement
the Reducer interface:
public void reduce(WriteableComparable key, Iterator values, OutputCollector output, Reporter reporter).
The reduce()
method is called once for each key; the values parameter contains all of the
values for that key. The Reduce code
looks at all the values and then outputs a single "summary" value.
Given all the values for the key, the Reduce code typically iterates over all
the values and either concats the values
together in some way to make a large summary object, or combines and reduces the
values in some way to yield a
short summary value.
The reduce() method produces its final value in the same manner as map() did, by
calling output.collect(key,
summary). In this way, the Reduce specifies the final output value for the
(possibly new) key. It is important to note<
that when running over text files, the input key is the byte-offset within the
file. If the key is propagated to the
output, even for an identity map/reduce, the file will be filed with the offset
values. Not only does this use up a lot
of space, but successive operations on this file will have to eliminate them.
For text files, make sure you don't output
the key unless you need it (be careful with the IdentityMapper and
IdentityReducer).
Line Indexer Reduce Code
The line indexer Reducer takes in all the <word, offset> key/value pairs
output by the Mapper for a single word. For
example, for the word "cipher", the pairs look like:
<cipher, 38624:To cipher what is writ in learned books>,
<cipher, 12046:To cipher me how fondly I did dote;>,
<cipher, ... >.
Given all those <key, value> pairs, the reduce outputs a single value
string. For the line indexer problem, the
strategy is simply to concat all the values together to make a single large
string, using "^" to separate the values. The
choice of "^" is arbitrary -- later code can split on the "^" to recover the
separate values. So for the key "cipher" the
output value string will look like
"38624:To cipher what is writ in learned book^12046:To cipher me how fondly I did dote;^34739:Mine were the very cipher of a function,^ ...".
To do this, the Reducer code simply iterates over values to get all the value
strings, and concats them together into
our output String.
public static class LineIndexerReducer extends MapReduceBase implements
Reducer {
public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
throws IOException {
boolean first = true;
StringBuilder toReturn = new StringBuilder();
while(values.hasNext()){
if(!first)
toReturn.append('^');
first=false;
toReturn.append(values.next().toString());
}
output.collect(key, new Text(toReturn.toString()));
}
}
Line Indexer Main Program
Given the Mapper and Reducer code, the short main() below starts the
Map-Reduction running. The Hadoop system
picks up a bunch of values from the command line on its own, and then the main()
also specifies a few key
parameters of the problem in the JobConf object, such as what Map and Reduce
classes to use and the format of the
input and output files. Other parameters, ie. the number of machines to use, are
optional and the system will
determine good values for them if not specified.
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(LineIndexer.class);
conf.setJobName("LineIndexer");
// The keys are words (strings):
conf.setOutputKeyClass(Text.class);
// The values are offsets+line (strings):
conf.setOutputValueClass(Text.class);
conf.setMapperClass(LineIndexer.LineIndexerMapper.class);
conf.setReducerClass(LineIndexer.LineIndexerReducer.class);
if (args.length < 2) {
System.out.println("Usage: LineIndexer <input path> <output path>");
System.exit(0);
}
conf.setInputPath(new Path(args[0]));
conf.setOutputPath(new Path(args[1]));
JobClient.runJob(conf);
}
Running A Map-Reduction
The LineIndexer is a map reduce which requires input data. To upload input data you can use the eclipse plugin. Ensure you have a connection to a Hadoop Server (see Running a Small mapreduce above) and click the blue elephant in the top right hand of the eclipse window to open the hadoop perspective. This should make a blue elephant appear (representing the Hadoop server) on the left side of your eclipse window, in the Package Explorer. Expand the folders until you get to the directory you created for yourself by following the directions earlier in this document. Right click on your directory, select Import from local directory, browse to a directory containing your data on your machine and click ok. After the upload is done, refresh the Hadoop directories (by right clicking and selecting refresh) and you should see your data. Please use your own home directory on HDFS for all of your input and output. Help keep things neat for everyone.
You can also upload data to the cluster using scp. Then you can ssh into hadoop.cs.washington.edu and transfer data to the distributed file system (HDFS) for use in mapreduces.
If the data files
are in the localInput/ directory, this is
accomplished by executing:
/hadoop/cmu60/bin/hadoop dfs -put localInput dfsInput
The files will then be copied onto the dfs into the directory dfsInput. It is
important to copy files into a well named
directory that is unique. These files can be viewed with
/hadoop/cmu60/bin/hadoop dfs -ls dir
where dir is the name of the directory to be viewed.
You can also use
/hadoop/cmu60/bin/hadoop dfs -lsr dir
to recursively view the directories. Note that all "relative" paths given will
be put in the /users/$USER/[dir]
directory.
Before you run your mapreduce, make sure that the dfsOutput directory does not already exist. If the output directory does exist when the mapreduce is executed you will be
presented with an error, and your job
will not run (This prevents the accidental overwriting of data, but can be
overridden. Please be very careful overriding this functionality.). You can remove directories using /hadoop/cmu60/bin/hadoop rm or by right clicking on the directory in the hdfs view of the plugin and selecting Delete.
There are three ways to run a mapreduce, two ways to run it in a distributed fashion, one way to run it locally. Running locally is the best way to develop and debug a mapreduce. Obviously,once you're using lots of data, a distributed mapreduce is best.
Running A Distributed Map-Reduce using the plugin
The first method of running a distributed mapreduce was described in the section "Running a small mapreduce" in this document. Simply give the main method your input data and output directory.
Running A Distributed Map-Reduce by ssh-ing into the cluster
The second method of running a distributed mapreduce involves scp-ing your .jar to hadoop.cs.washington.edu then and ssh-ing in using your username and password. Then you simply execute:
/hadoop/cmu60/bin/hadoop jar
The job should be run across the worker machines, copying input and intermediate
data as needed. The output of the
reduce stage will be left in the dfsOutput directory. To copy these files to
your local machine in the directory
localOutput, execute:
/hadoop/cmu60/bin/hadoop dfs -get dfsOutput localOutput
Running A Map-Reduction Locally
During testing, you may want to run your Map-Reduces locally so as not to
adversely affect the compute clusters.
This is easily accomplished by adding a line to the main method before you call runJob
conf.set("mapred.job.tracker", "local");
If you get an error that looks like
07/08/31 11:41:03 WARN mapred.LocalJobRunner: job_gi625
java.lang.OutOfMemoryError: Java heap space
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob (JobClient.java:604)
at org.apache.hadoop.examples.LineIndexer.main(LineIndexer.java:70)
at org.apache.hadoop.examples.RunExample.LineIndexer(RunExample.java:32)
at org.apache.hadoop.examples.RunExample.main (RunExample.java:12)
You need to increase the heapsize available to your java VM. Do this using the Run->"Open Run Dialog" menu, clicking the arguments tab and setting the vm arguments to include -Xmx1024m (or something larger, if need be). Obviously, it doesn't really make sense to run jobs over huge datasets on your puny little laptop. You should debug and develop only on a small portion of the actual data.
Seeing Job Progress
When you submit your job to run a line will be printed saying:
Running job: job_12345
where 'job_12345' will correspond to whatever name your job has been given.
Further status information will be printed in that terminal as the job progresses. However, it is also possible to
monitor a job given its name from any
node in the cluster. This is done by the command:
/hadoop/cmu60/bin/hadoop job -status job_12345
A small amount of status information will be displayed, along with a link to a
tracking URL (eg, http://jobtrackermachinename:<portnumber>/). This page will be a job-specific status
page, and provide links to main status
pages for other jobs and the Hadoop cluster itself. To view this page you need to set up the correct tunneling parameters. To do this, execute
ssh -L <portnumber>:10.1.133.44:<portnumber> <your login>@hadoop.cs.washington.edu
Windows users
To run locally under windows, you'll need to install cygwin and launch eclipse from a cygwin shell to make sure some cygwin executable/commands are in the path. See http://www.nabble.com/-jira--Created:-(HADOOP-1792)-df-command-doesn't-exist-under-windows-t4340981.html
Cygwin is available at http://www.cygwin.com/