IT462 Lab 7: Introduction to MapReduce

 

The primary purpose of this assignment is to familiarize you with running Hadoop on our machines in MI316. For the first part of the assignment, you will be asked to work through a tutorial to run the Word Count example on Hadoop. For the second part, you will be asked to modify the Word Count example to count bigrams instead of words. The assignment requires a lot of activity on the command line (running Hadoop jobs, copying files around, etc.).  

 

For the MapReduce - Hadoop portion of this class we will work in MI316. We will use Java and the org.apache.hadoop.mapred library, and not the org.apache.hadoop.mapreduce, as the former is better tested and more support is available.

 

Preliminaries: On your Unix drive, create a folder named "IT462" and another folder inside it called "Lab07" (without quotes). All your work for this lab should be inside Lab07 directory.

 

Part 1 Word Count Example

 

This tutorial is based on the MapReduce tutorial available at: http://hadoop.apache.org/common/docs/current/mapred_tutorial.html

 

Overview

Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system, typically the Hadoop Distributed File System (HDFS). The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very high aggregate bandwidth across the cluster.

The MapReduce framework consists of a single master JobTracker and one slave TaskTracker per cluster-node. The master is responsible for scheduling the jobs' component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master.

Minimally, applications specify the input/output locations and supply map and reduce functions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters, comprise the job configuration. The Hadoop job client then submits the job (jar/executable etc.) and configuration to the JobTracker which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.

Although the Hadoop framework is implemented in JavaTM, MapReduce applications need not be written in Java. However, for this class we will use Java.

Inputs and Outputs

The MapReduce framework operates exclusively on <key, value> pairs, that is, the framework views the input to the job as a set of <key, value> pairs and produces a set of <key, value> pairs as the output of the job, conceivably of different types.

The key and value classes have to be serializable by the framework and hence need to implement the Writable interface. Additionally, the key classes have to implement the WritableComparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

Example: WordCount v1.0

Before we jump into the details, lets walk through an example MapReduce application to get a flavour for how they work.

WordCount is a simple application that counts the number of occurrences of each word in a given input set.

Source Code

WordCount.java

1.

2.

3.

import java.io.IOException;

4.

import java.util.*;

5.

6.

import org.apache.hadoop.fs.Path;

7.

import org.apache.hadoop.conf.*;

8.

import org.apache.hadoop.io.*;

9.

import org.apache.hadoop.mapred.*;

10.

import org.apache.hadoop.util.*;

11.

12.

public class WordCount {

13.

14.

   public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

15.

     private final static IntWritable one = new IntWritable(1);

16.

     private Text word = new Text();

17.

18.

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

19.

       String line = value.toString();

20.

       StringTokenizer tokenizer = new StringTokenizer(line);

21.

       while (tokenizer.hasMoreTokens()) {

22.

         word.set(tokenizer.nextToken());

23.

         output.collect(word, one);

24.

       }

25.

     }

26.

   }

27.

28.

   public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

29.

     public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

30.

       int sum = 0;

31.

       while (values.hasNext()) {

32.

         sum += values.next().get();

33.

       }

34.

       output.collect(key, new IntWritable(sum));

35.

     }

36.

   }

37.

38.

   public static void main(String[] args) throws Exception {

39.

     JobConf conf = new JobConf(WordCount.class);

40.

     conf.setJobName("wordcount");

41.

42.

     conf.setOutputKeyClass(Text.class);

43.

     conf.setOutputValueClass(IntWritable.class);

44.

45.

     conf.setMapperClass(Map.class);

46.

     conf.setCombinerClass(Reduce.class);

47.

     conf.setReducerClass(Reduce.class);

48.

49.

     conf.setInputFormat(TextInputFormat.class);

50.

     conf.setOutputFormat(TextOutputFormat.class);

51.

52.

     FileInputFormat.setInputPaths(conf, new Path(args[0]));

53.

     FileOutputFormat.setOutputPath(conf, new Path(args[1]));

54.

55.

     JobClient.runJob(conf);

57.

   }

58.

}

59.

Create the WordCount.java file

Copy paste the code above into a file named WordCount.java in your Lab07 directory. Remove the line numbers, so your code conforms with Java syntax.

Usage

To compile the code and generate the jar file, use the following commands (from the command prompt - while logged into one of the Unix machines in MI316):

Assuming you are in your Lab07 directory:

To compile:

javac -classpath /usr/lib/hadoop-0.20/hadoop-core.jar WordCount.java

If you now execute ls from the command prompt, you should see the following files in your Lab07 folder: WordCount.class  WordCount.java*  WordCount$Map.class  WordCount$Reduce.class

To create the jar file:

jar -cvf count.jar *.class

Create some input files

-create an input directory called "inputPart1" inside  Lab07  (mkdir inputPart1)

-create a file01.txt file inside the "inputPart1" directory with content
Hello World Bye World

-create a file02.txt file inside the "inputPart1" directory with content
Hello Hadoop Goodbye Hadoop

Copy files to HDFS

Hadoop expects that the inputs are stored in the Hadoop Distributed File System (HDFS).

-create an HDFS input directory called "hdfsinputPart1" inside  Lab07  (mkdir hdfsinputPart1)

Copy the files from "inputPart1" directory into HDFS:

 /usr/bin/hadoop-0.20 dfs -copyFromLocal inputPart1/* hdfsinputPart1

Execute WordCount

The following command will count the words in all files in the hdfsinputPart1 and store the results in a newly created "outputPart1" directory

/usr/bin/hadoop-0.20 jar count.jar WordCount hdfsinputPart1 outputPart1

Output:

Using your favorite editor (or cat, more, etc commands), check the content of the outputPart1/part-00000 file. You should have:


Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

Take a print screen of the editor/command window showing the content of outputPart1/part-00000 file and paste it into yourlastname_Lab07.doc

Walk-through

The WordCount application is quite straight-forward.

The Mapper implementation (lines 14-26), via the map method (lines 18-25), processes one line at a time, as provided by the specified TextInputFormat (line 49). It then splits the line into tokens separated by whitespaces, via the StringTokenizer, and emits a key-value pair of < <word>, 1>.

For the given sample input the first map emits:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

The second map emits:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

We'll learn more about the number of maps spawned for a given job, and how to control them in a fine-grained manner, a bit later.

WordCount also specifies a combiner (line 46). Hence, the output of each map is passed through the local combiner (which is same as the Reducer as per the job configuration) for local aggregation, after being sorted on the keys.

The output of the first map:
< Bye, 1>
< Hello, 1>
< World, 2>

The output of the second map:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>

The Reducer implementation (lines 28-36), via the reduce method (lines 29-35) just sums up the values, which are the occurrence counts for each key (i.e. words in this example).

Thus the output of the job is:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

The run method specifies various facets of the job, such as the input/output paths (passed via the command line), key/value types, input/output formats etc., in the JobConf. It then calls the JobClient.runJob (line 55) to submit the and monitor its progress.

We'll learn more about JobConf, JobClient, Tool and other interfaces and classes a bit later.

 

Part 2 WordCount for bigger files

 

Create some input files

-create another input directory called "inputPart2" inside  Lab07 

-Download (or view source and copy-paste) into your inputPart2 directory the file at http://www.usna.edu/Users/cs/adina/teaching/it462/fall2011/suggestedTopics.txt

-Download (or view source and copy-paste) into your inputPart2 directory the file at http://www.usna.edu/Users/cs/adina/teaching/it462/fall2011/index.html

Copy the files into HDFS to hdfsinputPart2 and execute the WordCount example on these new data files (store the output in outputPart2 directory). Check the content of the outputPart2/part-00000 file.

 

Take a print screen of the editor/command window showing the content of outputPart2/part-00000 file (at least last 15 lines) and paste it into yourlastname_Lab07.doc

Part 3 - Count Bigrams

Take the word count example created in Part 1 and extend it to count bigrams. Bigrams are simply sequences of two consecutive words. For example, the previous sentence contains the following bigrams: "Bigrams are", "are simply", "simply sequences", "sequence of", etc.

Work with the sample collection created in Part 2. Don't worry about doing anything fancy in terms of tokenization; it's fine to continue using Java's StringTokenizer.

Write the answer to the following questions into yourlastname_Lab07.doc file:

  1. How many unique bigrams are there?
  2. List the top ten most frequent bigrams and their counts.
  3. What fraction of all bigrams occurrences does the top ten bigrams account for? That is, what is the cumulative frequency of the top ten bigrams?
  4. How many bigrams appear only once?

 

Turn in (Both electronic and paper submissions are required):

Electronic:

  1. Upload the file yourlastname_Lab7.doc with answers to Part 1, Part 2, and Part3 to Blackboard.
  2. Upload the java file from Part 3 (Bigrams) to Blackboard

 

Hard-copies:

  1. The completed assignment coversheet. Your comments will help us improve the course.
  2. Hard copy of yourlastname_Lab7.doc
  3. Hard copy of the Java code in Part 3.