11-741 - Information Retrieval
Jamie Callan
Yiming Yang |
Due: Feb 11, 2010 |
Homework #2
Deadline Feb 11 11:59pm
In this homework, you will build a more efficient MapReduce (Hadoop) based indexer based on the Program 1 you build for HW1. You will also build a MapReduce based program that reads out inverted list fragments from a sharded index and merges them.
Expect the cluster to be busier (longer wait time) as it gets close to the deadline, so start early. Going large scale is not easy, you will need the time to debug on small datasets and scale to the large one. So work on the indexer RIGHT AWAY. And remember to leave 1 day for the report, it is credit heavy.
1. Learning points
- IR:
- simple sharded indexing
- more efficient sharded and positional indexing
- gathering statistics from a sharded index
- Large Scale:
- secondary sort (to produce sorted word-docno records for inverted indexing)
- efficient indexing in Hadoop
- reading from the output index
2. Hand in
- report:
- build a simple sharded indexer (Program 2) based on Program 1 from HW1,
- build an efficient positional indexer (Program 3) based on Programs 2,
- include timings for index construction (Program 2 and 3)
- You should vary the #map tasks and #reduce tasks a bit to see how the timing differs.
- and compare the timings for Program 2 and 3.
- show inverted lists for sample terms from the output indexes of both Program 2 and 3.
- discuss the Map/Reduce framework
- discuss design decisions you made during coding Program 2 and 3
mainly data classes used in Program 2 and 3, and their relationship with the Hadoop classes
- discuss the good and the bad of Map/Reduce
- good and bad about Hadoop's implementation
- try to make constructive suggestions
- if you know other existing parallel computation frameworks, you may compare with them.
- code:
- (code should be documented at necessary places to make sure that TA can understand it)
- Program 2: Sharded simple indexer
Your Program 3 will build on top of this one for more efficient indexing and positional indexing. Try to make your program modular, so it will be easier to extend.
- Program 3: efficient positional indexer
For the two programs, make sure you include a) the source code (the src directories), b) the exact path to your jar files on the cluster, so that we can run your code.
You must also turn in your source code, packaged as a .zip, .gz, or .tar file. The instructor will look at your source code, so make sure that it is legible, has reasonable documentation, and can be understood by others. This is a Computer Science class lesson - the instructor will actually care about your source code. The instructor will also run your code, so make sure that you include everything necessary to run it.
Please make it easy for the instructor to see how you have addressed each of the requirements described for each section.
3. Detailed assignment instructions
3.1 Background
Data:
- Same as HW1, a full 10GB of HTML texts in TRECWeb format
- located in HDFS of the cluster (ltilogin.cloud.pdl.cmu.local): /user/data/gov2-10GB
- feel free to use the same set of test files from HW1, at /user/data/gov2-1GB and /user/data/Small.TRECWEB.txt
3.2 Program 2: Secondary sort for simple indexing
Functionality: produce word and its postings (docnos that the word appears in), using secondary sort to sort the docnos for each word
- input: TRECWEB documents
- output: word, its simple inverted list (DF + sorted docnos)
Architecture:
- InputRecordReader for TRECWEB
- Mapper
- output <key=<word,docno>, value=docno>, (simply modify sample program 1)
The output key is a custom data class that you need to implement. To fit into the Hadoop framework, it has to at least implement the Writable interface.
Just use the String-typed docnos (wrap it into a Text object to output as a field in the value). Do NOT change the docnos into integer values.
- Combiner
- (Combiners are used typically to speedup Map/Reduce. It avoids too many records being transferred to reducers, thus, shortens network transfer and sorting time.)
- Here, it needs to merge records for the same key into a single record <key=<word,docno>, value=docno>
- Combiner will be applied after Hadoop sorts Mapper's output using the comparator specified by JobConf.setOutputKeyComparatorClass(..), e.g. FullKeyComparator detailed below.
- Combine can be applied multiple times: 1) as mapper outputs data, once output buffer memory limit is reached, in-memory sort and combine happens, and 2) after Mapper finishes, all previous combiner outputs will be merge sorted and combined again. After combine finishes, one key <word, docno> will have only one merged record in the output.
This means, you cannot assume that combiner's input will always be mapper's output. A combiner's input may also be combiners' output records.
- Partitioner
- partition by docno (will produce a sharded index)
- custom partitioner extending HashPartitioner<K, V>
- override the method: getPartition
- you should use abs( docno.hashcode() ) % numReduceTasks to generate the partition code.
Here we assume docno is a Text object, and this uses Text.hashcode() to generate the partition number.
The abs() is necessary, because hashcode() may return negative values.
- FullKeyComparator to sort on word and docno (this description was changed at 4:30pm Feb 1st to a simpler design, either design is fine for grading purposes.)
- this is to make sure docno's are sorted for each word
- create a FullKeyComparator that compares on both word and docno, and make sure it does the comparison correctly
- Hint: you'll need to extend the WritableComparator class
- implement a no-args constructor public FullKeyComparator(), which must just simply call super(YourKeyClass.class, true);
- override public int compare(Object o1, Object o2)
- Force the Object typed objects into the actual type you use for the key
- make sure to compare on both word and docno, which should both be in the keys
- hook it up with Hadoop
- set it in workflow view of the driver class, HadoopComparator panel
after this, your hadoop program should produce records sorted on both word and docno as input to the reducers. Verify that before going ahead.
- GroupingComparator
- because the key=<word, docno> includes not only the word, but also the docno, the same inverted list is split into multiple records (as many as number of documents that contain the word), so in order to group postings of the same word together,
- you'll need to:
- create your own GroupingComparator class for grouping according to word
- you can simply extend the FullKeyComparator, or create a separate comparator if you want to.
- and just override public int compare(Object o1, Object o2) to compare only on the word
- hook your GroupingComparator up with hadoop, JobConf.setOutputValueGroupingComparator(YourComparator.class)
- The NetBeans plugin does not support GroupingComparator, so you need to manually add the above statement inside your driver class, right after initJobConf(job);
- because the NetBeans plugin doesn't understand GroupingComparators, results as you see in the workflow view will be wrong. You'll need to debug this using Hadoop directly under command line.
- Reducer
- You need to merge the input and output <key=word value=inverted_list>
- The input, because of grouping, will be <key=<word,docno>, value=a list of <docno> records>
Because of the grouping according to docno, the docno field in the key may not correspond to the docno in the values, you should use the docnos in the values.
- For each unique word, the value will be a list of docnos,
- For every key (word), if the final merged inverted list will contain more than 100 documents, it's recommended to output every 100 docnos as one record to avoid timeout errors and slow processing time caused by long records.
For example, if word "the" occurs in document 1 - 1000, then the final output for "the" will contain 10 output records, each with "the" as the key and the list of 100 consecutive docnos as the value.
- Note, the Hadoop framework will reuse the key and value objects that are passed into the reducer, therefore the application should clone the objects they want to keep a copy of. Same with Combiner.
- OutputFormat
Just use the default org.apache.hadoop.mapred.TextOutputFormat, it's more human readable.
Deployment
Deploy your program 2 on the cluster using the full input data.
Make sure the docnos are sorted.
Include in the report,
- the first 10 documents of each inverted list for the sample words from HW1 Section 4.2, with reducer# fixed at 4
Gather inverted lists for the same sample words used in HW1 Section 4.2.
Hint: use the same hadoop fs -cat | grep trick in HW1 Section 4.2 to locate the sample words in your output directory.
For frequent words, use head to truncate the long inverted list records, e.g.
hadoop fs -cat /youroutput/part-00000 | grep -a "^WORD{Literal_TAB}" | head -n 1
Here, keep the number of reducers for indexing fixed at 4. This will result in 4 shards (part-00000 to part-00003), and 4 inverted list fragments for each unique word.
Override the toString() method of your data classes to format the output as follows:
(records will be followed by new line characters "\n", and fields within each record separated by TAB)
[word] [DF] [docno] [docno] ...
avatar 4 GX046-73-2232524 GX186-95-16464543 GX240-92-15755572 GX246-39-9037678
avatar 4 GX001-35-5195992 GX006-47-3205930 GX036-51-9241581 GX173-01-16076052
avatar 3 GX021-18-1156827 GX028-50-12367763 GX169-77-0344935
avatar 1 GX241-62-5165601
On the 10GB full dataset, the global DF of "avatar" is 12, distributed in 4 index shards.
Make sure your indexer outputs the same inverted list for "avatar".
The first 5 documents for the word "reduce" in each shard is:
reduce 100 GX000-01-15897683 GX000-01-15979284 GX000-01-2893456 GX000-01-8011551 GX000-02-10992597
reduce 100 GX000-02-11544820 GX000-02-15320763 GX000-02-15881606 GX000-03-3212167 GX000-03-5594674
reduce 100 GX000-00-6435664 GX000-01-12642466 GX000-02-3622562 GX000-05-10797645 GX000-07-4432334
reduce 100 GX000-00-4041288 GX000-02-5834661 GX000-03-7355341 GX000-04-8792333 GX000-06-13138007
The DF in each record above are all 100, because the original records are too long and have been broken into 100 docnos per record. Your count may be different if you split the long records differently.
Since there will be many records for a moderately frequent word, the best way to locate the first record of each shard is to do the "grep" separately, and use a "head" to chop off the output: e.g. for the first shard, hadoop fs -cat otuput/part-00000 | grep -a "^WORD{Literal_TAB}" | head -n 1.
- timings for indexing,
You need to vary the #reduce tasks a bit, to see how the timing differs.
You will be sharing a cluster with your fellow students, so
- Use up to a maximum of 8 reduce tasks.
- Do not keep multiple copies of your output index. Delete old ones when you are building another.
Sorting on both <key1, key2>, but grouping records of the same key1 together is called secondary sort. See a secondary sort example here.
Secondary sort is needed for indexing, if you want to output the inverted list for each word as one single record, and still keep the docnos sorted.
3.3 Program 3: Efficient Sharded Positional Indexer
After the two programs from last HW, your indexer is just a few simple steps away,
Functionality: sharded and positional indexing
- input: TRECWEB documents
- output: word and its postings in shards,
word positions from each document should start from 0.
Architecture:
There are two main differences between this program and Program 2 from HW1,
- this program indexes term occurrence positions, not just TF information
- this program merges Mapper output into inverted list fragments using a Combiner.
This reduces the network traffic and can speed up the whole indexing process.
- InputRecordReader
- Same as before, use Cloud9's source
- Mapper
- output <key=<word, docno>, value=<docno, tf=1, position>>, (the comparator compares keys to group, and values are never passed into the comparator)
- value needs to include docno for secondary sort on docno
- without a docno, after grouping according to word in reducer, the docno in the key will be lost.
- Combiner
- merge records for the same word in the same document,
- resulting in <key=<word, docno>, value=<docno, tf, <position1, position2, ..>>>
- make sure after combine, the term occurrence positions for that doc are sorted in ascending order
- Partitioner
- Partition Combiner's Output <key, value> pairs according to docno - thus, each reducer will create one index shard
- same as in sample program 2
- FullKeyComparator to sort on word and docno
- this makes sure docno's are sorted within each word's inverted index
- same as in sample program 2
- GroupingComparator
- Because the key=<word, docno> includes not only the word, but also the docno, the same inverted list is split into multiple records (as many as number of documents that contain the word), so in order to group postings of the same word together,
- same as in sample program 2
- Reducer
- Input: <key=<word, docno>, value=a list of <docno, positions> records>
- Output <key=word, value=<<docno1, tf, positions>, <docno2, ..>..>>.
- Note, the Hadoop framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of. Same with Combiner.
- For frequent terms such as "the", the reducer output record may exceed the memory limit of the JVM, resulting in out of memory error. This is because Hadoop keeps the whole record (in this case the whole postings list for "the") in memory before sending it to disk. Partitioning the collection into more shards helps, but it's a suboptimal hack. One way to avoid such error is to partition these large postings into manageable sized chunks, and output several records for the same key (the word "the"). E.g. record1: <key="the", value=<<"doc1", tf, positions>, <"doc2", tf, pos> ...<"doc1000", tf, pos>>>, record2: <key="the", value=<<"doc1001", tf, pos>, <"doc1002", tf, pos> ...<"doc2000", tf, pos>>>, ..., recordN.
- OutputFormat
Just use the default org.apache.hadoop.mapred.TextOutputFormat, it's more human readable.
Deployment
Run the indexer on full input data, and record timings.
Include in the report,
- timings for indexing,
You need to vary the #reduces as in Program 2,
compare with the timings from Program 2, and discuss differences.
- report the first 10 documents of each inverted list for the sample words,
Hint: use the same hadoop fs -cat | grep trick in HW1 Section 4.2 to locate the sample words in your output directory.
Here, keep the number of reducers for indexing fixed at 4. This will result in 4 shards, and 4 inverted list fragments for each unique word.
Override the toString() method of your data classes to make your output look like this:
(records will be followed by new line characters "\n", and fields within each record separated by TAB)
[word] [DF] [docno] [TF] [positions] [docno] [TF] [positions]...
avatar 4 GX046-73-2232524 4 571 623 655 710 GX186-95-16464543 1 5265 GX240-92-15755572 1 165 GX246-39-9037678 1 5046
avatar 4 GX001-35-5195992 1 2370 GX006-47-3205930 2 16636 17687 GX036-51-9241581 1 2683 GX173-01-16076052 1 338
avatar 3 GX021-18-1156827 1 384 GX028-50-12367763 7 1857 1859 1901 1936 1998 2023 2070 GX169-77-0344935 1 215
avatar 1 GX241-62-5165601 1 80
This is very similar to the output of Program 2, except term occurrences are included here.
Note that both DOCNOs and occurrence positions are sorted.
The first 5 documents for the word "reduce" in each shard is:
reduce 100 GX000-01-15897683 1 636 GX000-01-15979284 2 3746 4182 GX000-01-2893456 2 34 278 GX000-01-8011551 4 1111 1176 1684 1774 GX000-02-10992597 3 413 648 953
reduce 100 GX000-02-11544820 2 18898 31448 GX000-02-15320763 1 809 GX000-02-15881606 2 93 187 GX000-03-3212167 2 396 461 GX000-03-5594674 1 107
reduce 100 GX000-00-6435664 3 182 200 262 GX000-01-12642466 2 231 244 GX000-02-3622562 1 1980 GX000-05-10797645 1 136 GX000-07-4432334 1 201
reduce 100 GX000-00-4041288 1 674 GX000-02-5834661 1 550 GX000-03-7355341 1 479 GX000-04-8792333 2 1892 2374 GX000-06-13138007 1 1923
The DF in each record above are all 100, because the original records are too long and have been broken into 100 docnos per record. Your count may be different if you split the long records differently.
Since there will be many records for a moderately frequent word, the best way to locate the first record of each shard is to do the "grep" separately, and use a "head" to chop off the output: e.g. for the first shard, hadoop fs -cat otuput/part-00000 | grep -a "^WORD{Literal_TAB}" | head -n 1.
3.4 Some details about data classes:
- We call every key or value class a data class. Data classes need to implement Writable, in order for the Hadoop framework to transfer that data between map, reduce etc. stages within the Hadoop cluster.
If your MyKey class contains two fields you want to output, one IntWritable, and one Text, then your MyKey.write(DataOutput) method should do IntWritable.write(DataOutput) and Text.write(DataOutput). Similarly for MyKey.readFields(DataInput).
- Key and value classes should also override toString() method in order to output final results in the TextOutputFormat. toString() can simply output all fields of a record in text format separating the fields by Tab: "\t".
- Map Output key class need to be WritableComparable, and implement the compareTo() method, for correct sorting behavior. Even when you create your own comparator class for sorting, you can still call the compareTo() in the key class for the default sorting behavior.
3.5 Details about controller classes:
- After you extend a mapper or reducer
- if you override the constructor, remember to call "super(..);" with proper arguments to invoke the constructor of the parent class.
- You don't need to store global DF for your sharded index. DF can be calculated on the fly when doing retrieval
- use one M/R process, to collect DF statistic
- Mappers map out the relevant inverted lists in the index, (each mapper knows which terms are being queried, and map out only the queried inverted lists)
- Partition by term, so that all postings for that term will appear in the same reducer
- Reducers accumulate global DF by term (same as WordCount).
Other useful resources
Copyright 2010, Carnegie Mellon University.
Updated on Feb 23, 2010.
Le Zhao