IT462 Lab 7: Introduction to MapReduce
The primary purpose of this assignment is to familiarize you with running Hadoop on our machines in MI316. For the first part of the assignment, you will be asked to work through a tutorial to run the Word Count example on Hadoop. For the second part, you will be asked to modify the Word Count example to count bigrams instead of words. The assignment requires a lot of activity on the command line (running Hadoop jobs, copying files around, etc.).
For the MapReduce - Hadoop portion of this class we will work in MI316. We will use Java and the org.apache.hadoop.mapred library, and not the org.apache.hadoop.mapreduce, as the former is better tested and more support is available.
Preliminaries:
On your Unix drive, create a folder named
"IT462" and another folder inside it called "Lab07"
(without quotes). All your work for this lab should be inside Lab07 directory.
Part 1 Word Count
Example
This tutorial is based on the MapReduce tutorial available at: http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system, typically the Hadoop Distributed File System (HDFS). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.
The MapReduce framework consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master.
Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
Although the Hadoop framework is implemented in JavaTM, MapReduce applications need not be written in Java. However, for this class we will use Java.
The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)
Before we jump into the details, lets walk through an example MapReduce application to get a flavour for how they work.
WordCount is a simple application that counts the number of occurrences of each word in a given input set.
|
WordCount.java |
|
|
1. |
|
|
2. |
|
|
3. |
import java.io.IOException; |
|
4. |
import java.util.*; |
|
5. |
|
|
6. |
import org.apache.hadoop.fs.Path; |
|
7. |
import org.apache.hadoop.conf.*; |
|
8. |
import org.apache.hadoop.io.*; |
|
9. |
import org.apache.hadoop.mapred.*; |
|
10. |
import org.apache.hadoop.util.*; |
|
11. |
|
|
12. |
public class WordCount { |
|
13. |
|
|
14. |
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { |
|
15. |
private final static IntWritable one = new IntWritable(1); |
|
16. |
private Text word = new Text(); |
|
17. |
|
|
18. |
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { |
|
19. |
String line = value.toString(); |
|
20. |
StringTokenizer tokenizer = new StringTokenizer(line); |
|
21. |
while (tokenizer.hasMoreTokens()) { |
|
22. |
word.set(tokenizer.nextToken()); |
|
23. |
output.collect(word, one); |
|
24. |
} |
|
25. |
} |
|
26. |
} |
|
27. |
|
|
28. |
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { |
|
29. |
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { |
|
30. |
int sum = 0; |
|
31. |
while (values.hasNext()) { |
|
32. |
sum += values.next().get(); |
|
33. |
} |
|
34. |
output.collect(key, new IntWritable(sum)); |
|
35. |
} |
|
36. |
} |
|
37. |
|
|
38. |
public static void main(String[] args) throws Exception { |
|
39. |
JobConf conf = new JobConf(WordCount.class); |
|
40. |
conf.setJobName("wordcount"); |
|
41. |
|
|
42. |
conf.setOutputKeyClass(Text.class); |
|
43. |
conf.setOutputValueClass(IntWritable.class); |
|
44. |
|
|
45. |
conf.setMapperClass(Map.class); |
|
46. |
conf.setCombinerClass(Reduce.class); |
|
47. |
conf.setReducerClass(Reduce.class); |
|
48. |
|
|
49. |
conf.setInputFormat(TextInputFormat.class); |
|
50. |
conf.setOutputFormat(TextOutputFormat.class); |
|
51. |
|
|
52. |
FileInputFormat.setInputPaths(conf, new Path(args[0])); |
|
53. |
FileOutputFormat.setOutputPath(conf, new Path(args[1])); |
|
54. |
|
|
55. |
JobClient.runJob(conf); |
|
57. |
} |
|
58. |
} |
|
59. |
Copy paste the code above into a file named WordCount.java in your Lab07 directory. Remove the line numbers, so your code conforms with Java syntax.
To compile the code and generate the jar file, use the following commands (from the command prompt - while logged into one of the Unix machines in MI316):
Assuming you are in your Lab07 directory:
To compile:
javac -classpath /usr/lib/hadoop-0.20/hadoop-core.jar WordCount.java
If you now execute ls from the command prompt, you should see the following files in your Lab07 folder: WordCount.class WordCount.java* WordCount$Map.class WordCount$Reduce.class
To create the jar file:
jar -cvf count.jar *.class
Create some input files
-create an input directory called "inputPart1" inside Lab07 (mkdir inputPart1)
-create
a file01.txt file inside the "inputPart1" directory with content
Hello
World Bye World
-create
a file02.txt file inside the "inputPart1" directory with content
Hello Hadoop Goodbye Hadoop
Copy files to HDFS
Hadoop expects that the inputs are stored in the Hadoop Distributed File System (HDFS).
-create an HDFS input directory called "hdfsinputPart1" inside Lab07 (mkdir hdfsinputPart1)
Copy the files from "inputPart1" directory into HDFS:
/usr/bin/hadoop-0.20 dfs -copyFromLocal inputPart1/* hdfsinputPart1
Execute WordCount
The following command will count the words in all files in the hdfsinputPart1 and store the results in a newly created "outputPart1" directory
/usr/bin/hadoop-0.20 jar count.jar WordCount hdfsinputPart1 outputPart1
Output:
Using
your favorite editor (or cat, more, etc commands), check the content of the
outputPart1/part-00000 file. You should have:
Bye 1
Goodbye
1
Hadoop 2
Hello
2
World
2
Take a print screen of the
editor/command window showing the content of outputPart1/part-00000 file and
paste it into yourlastname_Lab07.doc
The WordCount application is quite straight-forward.
The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes one line at a time, as provided by the specified TextInputFormat (line 49). It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < <word>, 1>.
For the given sample input the first map emits:
<
Hello, 1>
<
World, 1>
<
Bye, 1>
<
World, 1>
The second map emits:
<
Hello, 1>
< Hadoop, 1>
<
Goodbye, 1>
< Hadoop, 1>
We'll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later.
WordCount also specifies a combiner (line 46). Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.
The output of the first map:
<
Bye, 1>
<
Hello, 1>
<
World, 2>
The output of the second map:
<
Goodbye, 1>
< Hadoop, 2>
<
Hello, 1>
The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums up the values, which are the occurrence counts for each key (i.e. words in this example).
Thus the output of the job is:
<
Bye, 1>
<
Goodbye, 1>
< Hadoop, 2>
<
Hello, 2>
<
World, 2>
The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the JobConf. It then calls the JobClient.runJob (line 55) to submit the and monitor its progress.
We'll learn more about JobConf, JobClient, Tool and other interfaces and classes a bit later.
Part 2 WordCount for bigger files
Create some input files
-create another input directory called "inputPart2" inside Lab07
-Download (or view source and copy-paste) into your inputPart2 directory the file at http://www.usna.edu/Users/cs/adina/teaching/it462/fall2011/suggestedTopics.txt
-Download (or view source and copy-paste) into your inputPart2 directory the file at http://www.usna.edu/Users/cs/adina/teaching/it462/fall2011/index.html
Copy the files into HDFS to hdfsinputPart2 and execute the WordCount example on these new data files (store the output in outputPart2 directory). Check the content of the outputPart2/part-00000 file.
Take a print screen of the
editor/command window showing the content of outputPart2/part-00000 file (at
least last 15 lines) and paste it into yourlastname_Lab07.doc
Part 3 - Count Bigrams
Take the word count example created in Part 1 and extend it to count bigrams. Bigrams are simply sequences of two consecutive words. For example, the previous sentence contains the following bigrams: "Bigrams are", "are simply", "simply sequences", "sequence of", etc.
Work with the sample collection created in Part 2. Don't worry about doing anything fancy in terms of tokenization; it's fine to continue using Java's StringTokenizer.
Write the answer to the following
questions into yourlastname_Lab07.doc file:
Turn in (Both
electronic and paper submissions are required):
Electronic:
Hard-copies: