 ![alt text](http://www.jkspeaks.com/wordpress/wp-content/uploads/2011/05/mapreduce-logo.jpg "")

####  This pratical session will be divided into multiple parts to approach the concept of Map Reduce programming model through simple examples. In the first part of the session we will focus on writting a simple WordCount program in Map Reduce using Python that we will run locally without relying on the Hadoop back-end so that it becomes clear "Map Reduce" simply is a programming model that is merely implemented in Hadoop. 

![alt text](http://www.glennklockwood.com/data-intensive/hadoop/mapreduce-workflow.png "Word Count Execution by Matei Zaharia ")

####  In the second part, you will make use of a Hadoop Cluster to execute your Map Reduce program on a large scale, you will then get familiar with the job submission routine that is related to the usage of any cluster, either Hadoop or HPC.

## What is a Word Count in Map Reduce ?

The Word Count is kind of the canonical example used to illustrate the Map Reduce programming model. The idea is to simply count the number each word's appearance through a given set of input texts.

----

#### What does the Mapper do ?
One mapper takes a line (i.e: a string of text) as input and must break it into words. Then, it outputs the key/value pairs it computed for the line received as input.

#### What does the Reducer do ?
One reducer receives key/value pairs as input and counts, for each word the total and output the final result for a single record.

----

![alt text](http://slideplayer.com/5003555/16/images/17/Word+Count+Execution+Input+Map+Shuffle+%26+Sort+Reduce+Output+Map+Reduce.jpg "Word Count Execution by Matei Zaharia ")

##### To keep the illustration simple, the input and output will we standard SDTIN and SDTOUT and we will run the example locally.

## Material preparation

We first need to import the necessary files required for this practical sessions.
Simply execute the next cell and wait.

### Downloading Books from the Gutenberg.org website

In [None]:
!mkdir ./CECICourse
!mkdir ./CECICourse/books

!wget --quiet http://www.gutenberg.org/cache/epub/20417/pg20417.txt -O ./CECICourse/books/pg20417.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20418/pg20418.txt -O ./CECICourse/books/pg20418.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20419/pg20419.txt -O ./CECICourse/books/pg20419.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20420/pg20420.txt -O ./CECICourse/books/pg20420.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20421/pg20421.txt -O ./CECICourse/books/pg20421.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20422/pg20422.txt -O ./CECICourse/books/pg20422.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20423/pg20423.txt -O ./CECICourse/books/pg20423.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20424/pg20424.txt -O ./CECICourse/books/pg20424.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20425/pg20425.txt -O ./CECICourse/books/pg20425.txt
!wget --quiet http://www.gutenberg.org/cache/epub/20426/pg20426.txt -O ./CECICourse/books/pg20426.txt
!echo "Books downloaded in ./books" 

### Creating a Lorem Ipsum excerpt

In [None]:
!echo "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum" > "./CECICourse/loremipsum.txt"
!echo "File created ./CECICourse" 

## Mapper

In [None]:
%%file ./CECICourse/mapper.py
#!/usr/local/anaconda3/bin/python
import sys

for line in sys.stdin:                              # The input data comes from STDIN (i.e: The standard input)
    line = line.strip()                             # Removal of leading and trailing whitespaces
    words = line.split()                            # Creation of a list containing all words by splitting the line in words
    # increase counters
    for word in words:                              # For each word in the list (i.e: words), do...
        print(word+"\t"+"1")

## Reducer

In [None]:
%%file ./CECICourse/reducer.py
#!/usr/local/anaconda3/bin/python
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

for line in sys.stdin:                              # The input data comes from STDIN (i.e: The standard input)
    line = line.strip()                             # Removal of leading and trailing whitespaces
    word, count = line.split('\t', 1)               # Parsing of the awaited key/value pair

    try:
        count = int(count)
    except ValueError:                              # In the case the value is not a number, we silently discard the line
        continue

    if current_word == word:                        # This IF only works because Hadoop sorts map output by key
        current_count += count                      # before it is passed to the reducer
    else:
        if current_word:
            print(current_word+"\t"+str(current_count))           # Output of the result to STDOUT
        current_count = count
        current_word = word

if current_word == word:                            # Output of the last word
    print(current_word+"\t"+str(current_count))

## Local execution of the Mapper

In [None]:
!echo "fox wolf dog wolf cat moose mouse dog cat." |  python /home/$USER/CECICourse/mapper.py

## Local execution of the Mapper followed by the Reducer; a Word Count application

In [None]:
!echo "fox wolf dog wolf cat moose mouse dog cat." | python /home/$USER/CECICourse/mapper.py | sort -k1,1 | python /home/$USER/CECICourse/reducer.py

## Local execution the WordCount application using files

In [None]:
!cat ./CECICourse/books/pg20417.txt | python /home/$USER/CECICourse/mapper.py | sort -k1,1 | python /home/$USER/CECICourse/reducer.py

As you can see, there are many other things than plain words. You may improve on this. 
The issue is that the file are raw with no preprocessing, any idea how to clean this up ? Go ahead !

## Hadoop (Python Streaming) execution of the WordCount application

In [None]:
!hadoop fs -mkdir /user/$USER/CECICourse
!hadoop fs -copyFromLocal ./CECICourse/books /user/$USER/CECICourse/books
!hadoop fs -copyFromLocal ./CECICourse/*.py /user/$USER/CECICourse
!hadoop fs -chmod u+x ./CECICourse/*.py
!yarn jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop-mapreduce/hadoop-streaming-3.0.0-cdh6.2.0.jar -file /home/$USER/CECICourse/mapper.py -mapper /home/$USER/CECICourse/mapper.py -file /home/$USER/CECICourse/reducer.py -reducer /home/$USER/CECICourse/reducer.py -input /user/$USER/CECICourse/books/* -output /user/$USER/CECICourse/books-output-python

Let's now examine the results !

In [None]:
!hadoop fs -ls /user/$USER/CECICourse/books-output-python
!hadoop fs -cat /user/$USER/CECICourse/books-output-python/part-00000

## Hadoop (Java Map Reduce) execution of the WordCount application

In [None]:
%%file ./CECICourse/WordCount.java
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

In [None]:
!chmod +x  ./CECICourse/WordCount.java 
!export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hadoop
!mkdir ./CECICourse/WordCount_Classes

!/usr/bin/javac -classpath `hadoop classpath` -d ./CECICourse/WordCount_Classes ./CECICourse/WordCount.java
!/usr/bin/jar -cvf ./CECICourse/WordCount.jar -C ./CECICourse/WordCount_Classes/ .

!yarn jar ./CECICourse/WordCount.jar WordCount /user/$USER/CECICourse/books/* /user/$USER/CECICourse/books_output-java

## Wrap Up

We have seen how to implement a WordCount program implemented in MapReduce. Also, implementing it in Python, we shown how easy it was possible to make it scale once executed on the Hadoop Cluster and not locally anymore.

Fine, **BUT** what about **Shuffle and Sort** ?

* **Shuffle phase :** Transfer of the map output from **a** Mapper to **a** Reducer in MapReduce
* **Sort phase :** Merging and sorting of map outputs. Data from the mapper are grouped by the key and automatically split among the reducers, whatever their number, and sorted by key.

Interestingly, Shuffling can start before the end of the Mapping phase. (What about "slow starts" ?)
Also, Sorting is done by Key not by Value !

This sorting step helps Hadoop to find out when a new Reducer is required and start it accordingly and transparently for the user. 

You may skip the Shuffling and Sorting if you specify no reducers. You then only get a Mapping done; that increases the speed of the Mapping phase.

What about **Partitions** ?

Well, partitioning is another issue. It determines to which reducer the output of a map phase will be send. The Default Partitioner uses a hashing on the keys to make the distribution to the reduce tasks. So, yes, you may override this for specific tasks.

![alt text](https://www.oreilly.com/library/view/distributed-computing-in/9781787126992/assets/fadf32ab-b857-4d22-a334-c989b5bafdea.png "Distributed Computing in Java 9 by Raja Malleswara Rao Pattamsetti")
