Homework #1 - Internet Search Technologies 15-505

Introduction to Hadoop/Eclipse/Map/Reduce

Deliverables

Run the LineIndexer (explained below), both on the hadoop.cs.washington.edu cluster and locally, using The Complete Works of Shakespeare (available at http://www.gutenberg.org/etext/100) as input. Take a look at the output. Write your thoughts down in one or two paragraphs. Hand them in as a type-written response in class September 11, 2007.

Getting Started with Eclipse and Hadoop

(Special notes for windows users at the end of this document.)
  1. Download and install Eclipse (http://www.eclipse.org/downloads/)
  2. Download the Hadoop plug-in from http://www.cs.cmu.edu/~knigam/15-505/hadoop-eclipse-plugin.jar. Put the Hadoop plug-in in your eclipse plugin directory (located at <eclipse-base-dir>/plugins/).
  3. Download & unpack our pre-packaged Hadoop workspace from http://www.cs.cmu.edu/~knigam/15-505/wkspc-ex.zip
  4. Start eclipse. You should be asked for a workspace on startup. Point eclipse to the workspace directory created in the previous step (or switch to this workspace after startup using File->Switch workspace)
  5. You should see on the left hand side the Package Explorer. Expand hadoop-0.13.1, src/examples. org.apache.hadoop.examples. RunExample.java shows how to run 3 different map reduces. The directories passed to WordCount and RandomWriter obviously will have to point to your input and output directories if you want to use these examples.

Change your passwords and create a home directory on the HDFS

HDFS is the Hadoop implementation of a distributed file system. It's how all of your mapreduces share data.
  1. Log into hadoop.cs.washington.edu (i.e. ssh <your username>@hadoop.cs.washington.edu). The first time you log in you should be asked to agree to IBM's terms and conditions and change your password.
  2. Execute
    /hadoop/cmu60/bin/hadoop  dfs -mkdir /user/<your username>
    
  3. Log into the CMU cluster (ssh <your username>@10.1.133.44). Again, you should be prompted to change your password.
Because you will be sharing the HDFS with your classmates it is very important that you keep all your files in your own directory. Be very careful when deleting files and directories. It is possible to delete other people's work in HDFS. We're assuming you will take this responsibility seriously.

Running a small mapreduce

Now you're ready to run a mapreduce. Congratulations, you've just computed the value of pi. Ah, the sweet smell of successful pie. Er, pi.

Next we'll cover the details of mapreduce.

Hadoop Concepts & LineIndexer Example

To use Hadoop, you write two classes -- a Mapper and a Reducer. The Mapper class contains a map function, which is called once for each input and outputs any number of intermediate <key, value> pairs. What code you put in the map function depends on the problem you are trying to solve. Let's start with a short example.

Suppose the goal is to create an "index" of a body of text -- we are given a text file, and we want to output a list of words annotated with the line-number at which each word appears. For that problem, an appropriate Map strategy is: for each word in the input, output the pair <word, line-number>
For example, suppose we have this five-line High school football coach quote as our input data set:
We are not what
we want to be,
but at least
we are not what
we used to be.
Running the Map code that for each word, outputs a pair <word, line-number>, yielding the set of pairs...
<we, 1>
<are, 1>
<not, 1>
<what, 1>
<we, 2>
<want, 2>
<to, 2>
<be, 2>
<but, 3>
etc...
For now we can think of the <key, value> pairs as a nice linear list, but in reality, the Hadoop process runs in parallel on many machines. Each process has a little part of the overall Map input (called a map shard), and maintains its own local cache of the Map output. (For a description of how it really works, see Hadoop/MapReduce or the Mapreduce paper you read for class)
After the Map phase produces the intermediate <key, value> pairs they are efficiently and automatically grouped by key by the Hadoop system in preparation for the Reduce phase (this grouping is known as the Shuffle phase of a map-reduce). For the above example, that means all the "we" pairs are grouped together, all the "are" pairs are grouped together like this, showing each group as a line...
<we, 1> <we, 2> <we, 4> <we, 5>
<are, 1> <are, 4>
<not, 1> <not, 4>
<what, 1> <what, 4>
<want, 2>
<to, 2> <to, 5>
<be, 2> <be 5>
<but, 3>
<at, 3>
<least, 3>
<used, 5>
The Reducer class contains a reduce function, which is then called once for each key -- one reduce call for "we", one for "are", and so on. Each reduce looks at all the values for that key and outputs a "summary" value for that key in the final output. So in the above example, the reduce is called once for the "we" key, and passed the values the mapper output, 1, 4, 2, and 5 (the values going into reduce are not in any particular order). Suppose reduce computes a summary value string made of the line numbers sorted into increasing order, then the output of the Reduce phase on the above pairs will produce the pairs shown below. The Reduce phase also sorts the output <key,value> pairs into increasing order by key:
<are, 1 4>
<at, 3>
<be, 2 5>
<but, 3>
<least, 3>
<not, 1 4>
<to, 2 5>
<we, 1 2 4 5>
<what, 1 4>
<want, 2>
<used, 5>
Like Map, Reduce is also run in parallel on a group of machines. Each machine is assigned a subset of the keys to work on (known as a reduce shard), and outputs its results into a separate file.
Consider a simple "line indexer". Given an input text, offset indexer uses Hadoop to produce an index of all the words in the text. For each word, the index has a list of all the locations where the word appears and a text excerpt of each line where the word appears. Running the line indexer on the complete works of Shakespeare yields the following entry for the word "cipher".
38624 To cipher what is writ in learned books,
12046 To cipher me how fondly I did dote;
34739 Mine were the very cipher of a function,
16844 MOTH To prove you a cipher.
66001 ORLANDO Which I take to be either a fool or a cipher.
(If you're interested the complete works of Shakespeare are available at http://www.gutenberg.org/etext/100)

The Hadoop code below for the line indexer is actually pretty short. The Map code extracts one word at a time from the input, and the Reduce code combines all the data for one word.

Line Indexer Map

A Java Mapper class is defined in terms of its input and intermediate <key, value> pairs. To declare one, simply subclass from MapReduceBase and implement the Mapper interface. The Mapper interface provides a single method: public void map(WriteableComparable key, Writeable value, OutputCollector output, Reporter reporter). Note: these inner classes probably need to be declared "static". If you get an error saying ClassName.<init>() is not defined, try declaring your class static. The map function takes four parameters which in this example correspond to: The Hadoop system divides the (large) input data set into logical "records" and then calls map() once for each record. How much data constitutes a record depends on the input data type; For text files, a record is a single line of text. The main method is responsible for setting output key and value types.
Since in this example we want to output <word, offset> pairs, the types will both be Text (a basic string wrapper, with UTF8 support). It is necessary to wrap the more basic types because all input and output types for Hadoop must implement WritableComparable, which handles the writing and reading from disk.

Line Indexer Map For the line indexer problem, the map code takes in a line of text. Then, for each word in the line the mapper outputs one key/value pair:
 <word, offset:line>
The Map code below accomplishes that by...

Line Indexer Map Code

public static class LineIndexerMapper extends MapReduceBase implements Mapper {
   private final static Text word = new Text();
   private final static Text summary = new Text();
   public void map(WritableComparable key, Writable val,
                   OutputCollector output, Reporter reporter)
                   throws IOException {
      String line = val.toString();
      summary.set(key.toString() + ":" + line);
      StringTokenizer itr = new StringTokenizer(line.toLowerCase());
      while(itr.hasMoreTokens()) {
         word.set(itr.nextToken());
         output.collect(word, summary);
      }
   }
}
When run on many machines, each mapper gets part of the input -- so for example with 100 Gigabytes of data on 200 mappers, each mapper would get roughly its own 500 Megabytes of data to go through. On a single mapper, map() is called going through the data in its natural order, from start to finish.
The Map phase outputs <key, value> pairs, but what data makes up the key and value is totally up to the Mapper code. In this case, the Mapper uses each word as a key, so the reduction below ends up with pairs grouped by word. We could instead have chosen to use the line-length as the key, in which case the data in the reduce phase would have been grouped by line length. In fact, the map() code is not required to call output.collect() at all. It may have its own logic to prune out data simply by omitting collect. Pruning things in the Mapper is efficient, since it is highly parallel, and already has the data in memory. By shrinking its output, we shrink the expense of organizing and moving the data in preparation for the Reduce phase.

Line Indexer Reduce

Defining a Reducer is just as easy. Simply subclass MapReduceBase and implement the Reducer interface:
public void reduce(WriteableComparable key, Iterator values, OutputCollector output, Reporter reporter). 
The reduce() method is called once for each key; the values parameter contains all of the values for that key. The Reduce code looks at all the values and then outputs a single "summary" value. Given all the values for the key, the Reduce code typically iterates over all the values and either concats the values together in some way to make a large summary object, or combines and reduces the values in some way to yield a short summary value.
The reduce() method produces its final value in the same manner as map() did, by calling output.collect(key, summary). In this way, the Reduce specifies the final output value for the (possibly new) key. It is important to note< that when running over text files, the input key is the byte-offset within the file. If the key is propagated to the output, even for an identity map/reduce, the file will be filed with the offset values. Not only does this use up a lot of space, but successive operations on this file will have to eliminate them. For text files, make sure you don't output the key unless you need it (be careful with the IdentityMapper and IdentityReducer).

Line Indexer Reduce Code

The line indexer Reducer takes in all the <word, offset> key/value pairs output by the Mapper for a single word. For example, for the word "cipher", the pairs look like:
<cipher, 38624:To cipher what is writ in learned books>,
<cipher, 12046:To cipher me how fondly I did dote;>,
<cipher, ... >.
Given all those <key, value> pairs, the reduce outputs a single value string. For the line indexer problem, the strategy is simply to concat all the values together to make a single large string, using "^" to separate the values. The choice of "^" is arbitrary -- later code can split on the "^" to recover the separate values. So for the key "cipher" the output value string will look like
"38624:To cipher what is writ in learned book^12046:To cipher me how fondly I did dote;^34739:Mine were the very cipher of a function,^ ...".
To do this, the Reducer code simply iterates over values to get all the value strings, and concats them together into our output String.
public static class LineIndexerReducer extends MapReduceBase implements
Reducer {
public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { boolean first = true; StringBuilder toReturn = new StringBuilder(); while(values.hasNext()){ if(!first) toReturn.append('^'); first=false; toReturn.append(values.next().toString()); } output.collect(key, new Text(toReturn.toString())); } }

Line Indexer Main Program

Given the Mapper and Reducer code, the short main() below starts the Map-Reduction running. The Hadoop system picks up a bunch of values from the command line on its own, and then the main() also specifies a few key parameters of the problem in the JobConf object, such as what Map and Reduce classes to use and the format of the input and output files. Other parameters, ie. the number of machines to use, are optional and the system will determine good values for them if not specified.
public static void main(String[] args) throws IOException {
   JobConf conf = new JobConf(LineIndexer.class);
   conf.setJobName("LineIndexer");
   // The keys are words (strings):
   conf.setOutputKeyClass(Text.class);
   // The values are offsets+line (strings):
   conf.setOutputValueClass(Text.class);
   conf.setMapperClass(LineIndexer.LineIndexerMapper.class);
   conf.setReducerClass(LineIndexer.LineIndexerReducer.class);
   if (args.length < 2) {
      System.out.println("Usage: LineIndexer <input path> <output path>");
      System.exit(0);
   }
   conf.setInputPath(new Path(args[0]));
   conf.setOutputPath(new Path(args[1]));
   JobClient.runJob(conf);
}

Running A Map-Reduction

The LineIndexer is a map reduce which requires input data. To upload input data you can use the eclipse plugin. Ensure you have a connection to a Hadoop Server (see Running a Small mapreduce above) and click the blue elephant in the top right hand of the eclipse window to open the hadoop perspective. This should make a blue elephant appear (representing the Hadoop server) on the left side of your eclipse window, in the Package Explorer. Expand the folders until you get to the directory you created for yourself by following the directions earlier in this document. Right click on your directory, select Import from local directory, browse to a directory containing your data on your machine and click ok. After the upload is done, refresh the Hadoop directories (by right clicking and selecting refresh) and you should see your data. Please use your own home directory on HDFS for all of your input and output. Help keep things neat for everyone.

You can also upload data to the cluster using scp. Then you can ssh into hadoop.cs.washington.edu and transfer data to the distributed file system (HDFS) for use in mapreduces. If the data files are in the localInput/ directory, this is accomplished by executing:
/hadoop/cmu60/bin/hadoop dfs -put localInput dfsInput
The files will then be copied onto the dfs into the directory dfsInput. It is important to copy files into a well named directory that is unique. These files can be viewed with
/hadoop/cmu60/bin/hadoop dfs -ls dir
where dir is the name of the directory to be viewed.
You can also use
/hadoop/cmu60/bin/hadoop dfs -lsr dir
to recursively view the directories. Note that all "relative" paths given will be put in the /users/$USER/[dir] directory.
Before you run your mapreduce, make sure that the dfsOutput directory does not already exist. If the output directory does exist when the mapreduce is executed you will be presented with an error, and your job will not run (This prevents the accidental overwriting of data, but can be overridden. Please be very careful overriding this functionality.). You can remove directories using /hadoop/cmu60/bin/hadoop rm or by right clicking on the directory in the hdfs view of the plugin and selecting Delete.

There are three ways to run a mapreduce, two ways to run it in a distributed fashion, one way to run it locally. Running locally is the best way to develop and debug a mapreduce. Obviously,once you're using lots of data, a distributed mapreduce is best.

Running A Distributed Map-Reduce using the plugin

The first method of running a distributed mapreduce was described in the section "Running a small mapreduce" in this document. Simply give the main method your input data and output directory.

Running A Distributed Map-Reduce by ssh-ing into the cluster

The second method of running a distributed mapreduce involves scp-ing your .jar to hadoop.cs.washington.edu then and ssh-ing in using your username and password. Then you simply execute:
/hadoop/cmu60/bin/hadoop jar  
The job should be run across the worker machines, copying input and intermediate data as needed. The output of the reduce stage will be left in the dfsOutput directory. To copy these files to your local machine in the directory localOutput, execute:
/hadoop/cmu60/bin/hadoop dfs -get dfsOutput localOutput

Running A Map-Reduction Locally

During testing, you may want to run your Map-Reduces locally so as not to adversely affect the compute clusters.
This is easily accomplished by adding a line to the main method before you call runJob conf.set("mapred.job.tracker", "local");
If you get an error that looks like
07/08/31 11:41:03 WARN mapred.LocalJobRunner: job_gi625
java.lang.OutOfMemoryError: Java heap space
Exception in thread "main" java.io.IOException: Job failed!
    at org.apache.hadoop.mapred.JobClient.runJob (JobClient.java:604)
    at org.apache.hadoop.examples.LineIndexer.main(LineIndexer.java:70)
    at org.apache.hadoop.examples.RunExample.LineIndexer(RunExample.java:32)
    at org.apache.hadoop.examples.RunExample.main (RunExample.java:12)
You need to increase the heapsize available to your java VM. Do this using the Run->"Open Run Dialog" menu, clicking the arguments tab and setting the vm arguments to include -Xmx1024m (or something larger, if need be). Obviously, it doesn't really make sense to run jobs over huge datasets on your puny little laptop. You should debug and develop only on a small portion of the actual data.

Seeing Job Progress

When you submit your job to run a line will be printed saying:
Running job: job_12345
where 'job_12345' will correspond to whatever name your job has been given. Further status information will be printed in that terminal as the job progresses. However, it is also possible to monitor a job given its name from any node in the cluster. This is done by the command:
/hadoop/cmu60/bin/hadoop job -status job_12345
A small amount of status information will be displayed, along with a link to a tracking URL (eg, http://jobtrackermachinename:<portnumber>/). This page will be a job-specific status page, and provide links to main status pages for other jobs and the Hadoop cluster itself. To view this page you need to set up the correct tunneling parameters. To do this, execute
ssh -L <portnumber>:10.1.133.44:<portnumber> <your login>@hadoop.cs.washington.edu

Windows users

To run locally under windows, you'll need to install cygwin and launch eclipse from a cygwin shell to make sure some cygwin executable/commands are in the path. See http://www.nabble.com/-jira--Created:-(HADOOP-1792)-df-command-doesn't-exist-under-windows-t4340981.html
Cygwin is available at http://www.cygwin.com/