![alt text](https://spark.apache.org/images/spark-logo-trademark.png "")


####  The Hadoop framework is based on a simple programming model, which is called MapReduce. Using it, we can make computing solution scalable (i.e: use more machines to do a single job), fault-tolerant (i.e: YARN will take care of re-executing a part of a job in the eventuality of a failure) and cost-effective (i.e: First, because multiple machine are usually less expenssive than high density servers and Second, because it can run on heterogeneous hardware mixing old and new equipments seamlessly).

#### Spark is not an alternative to Hadoop, it depends on it. Spark uses Hadoop for storage as it uses HDFS to store your datasets or results and for processing as well as it relies on YARN to make ressources reservations.

#### Spark is designed for fast computations. It extends the MapReduce model to other types of computations which include interactive queries as well as stream processing, Machine LEarning, SQL queries, Graph algorithms an so on.

## Spark Components

**Apache Spark Core :** Underlying general execution engine for Spark. All other functionalities are built upon it.<br>
**Spark SQL :** This component is built upon Spark Core and introduces a data abstraction mechanism called SchemaRDD that provides support for structured and semi-structured data.<br>
**Spark Streaming :** As its name indicates, it allows performing streaming analytics by ingesting data in mini-batches while performing RDD transformations on those batches.<br>
**MLlib :** This is a distributed machine learning framework built above Spark Core. The equivalent in Hadoop would be Apache Mahout. At on stage, Mahout did not rely on Apache Spark and thus, MLlib was up to nine times faster than Mahout as Mahout relied solely on Hadoop, reading and writing all data to disk instead of using a share in-memory storage.<br>
**GraphX :** A distributed graph-processing framework built on top of Spark Core. 

![alt text](http://datadotz.com/wp-content/uploads/2014/09/websiteIcon-training-1024x406.png "Spark Components are their integration with other services. ")



## Operations in MapReduce

#### The main feature of Spark is : in-memory cluster computing. Why is this so important ?

MapReduce is convenient as it allows users to write parallel computations without having to worry about the way the work distribution must take place and fault tolerance. However, in MR, the only way to reuse data between computations (i.e: either between multiple jobs or job steps) is by relying on a storage system which, in the case of Hadoop is HDFS. 

![alt text](https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_mapreduce.jpg "Iterative Operations on MapReduce ")

The problem is simply that accessing disk is slow and this is the case either in the case of iterative or interactive operations.

![alt text](https://www.tutorialspoint.com/apache_spark/images/interactive_operations_on_mapreduce.jpg "Interactive Operations on MapReduce ")

# Operations in Spark using RDD

#### Spark does better in this area, but how ?

As explained briefly, sharing data is slow in the canonical MapReduce paradigm because of the operations or replications, serialization and disk IO. In fact, most Hadoop applications spend more than 90% of the time doing HDFS reads and writes.

Spark, on the other side, uses **R**esilient **D**istributed **D**atasets that support in-memory processing computations. The idea is to store the state of memory as an object across the jobs and this object is simply shared between multiple jobs. How faster is this ? Between 10 and 100 times faster than network and disk access.

![alt text](https://www.tutorialspoint.com/apache_spark/images/iterative_operations_on_spark_rdd.jpg "Iterative Operations on Spark RDD ")

In essence, all intermediate results are stored in RDD instead of files on disks. However, if the amount of memory available (i.e: The actual memory reserved for the job) is insufficient, then disk will be used nonetheless. This is thus very important to properly evaluate the amount required before submitting a job on a cluster on which you share ressources with other users.

![alt text](https://www.tutorialspoint.com/apache_spark/images/interactive_operations_on_spark_rdd.jpg "Interactive Operations on Spark RDD ")

Be aware that by default, each RDD may be recomputed each time you run an action on it. You may persist an RDD in memory for fast access or on disk or even replicated across multiples nodes. Depending on your use case as well as the amount of memory you may or may no be able to access, you may have to use one of those possibilities.

# Exercices

## WordCount in Spark - Single File

In [None]:
import os
import pwd
import getpass
from pyspark import SparkContext, SparkConf

userName = getpass.getuser()
appName = pwd.getpwuid( os.getuid() )[ 0 ]

sc = SparkContext("local", appName)
text_file = sc.textFile("hdfs://public00/user/"+userName+"/CECICourse/books/pg20417.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://public00/user/"+userName+"/"+appName+"_testing_single")
print("Done !")

In [None]:
!hadoop fs -ls /user/$USER/mwaumans_testing_single/
!hadoop fs -cat /user/$USER/mwaumans_testing_single/part-00000

## WordCount in Spark - Multiple Files

In [None]:
text_file = sc.textFile("hdfs://public00/user/"+userName+"/CECICourse/books/*.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://public00/user/"+userName+"/"+appName+"_testing_multiple")
print("Done !")

In [None]:
!hadoop fs -ls /user/$USER/mwaumans_testing_multiple/
!hadoop fs -cat /user/$USER/mwaumans_testing_multiple/part-00000

## RDD Operations - Transformations and Actions

**Transformations** and **Actions** are the two types of RDD operations you can conduct. A Transformation is a function that will produce one or multiple new RDDs from the existing RDDs. However, it is possible to actually work with the loaded dataset using Actions. Why creating new RDDs each time ? Because RDDs are immutable. It is important to notice that transformations are not executed immediately but only when we call an action.

**There are two types of transformations :**


**Narrow** : All the elements required to compute the records in a single partition live in the single partition of the parent RDD. In this case, just a subset of the partition is used to compute the actual result. 
    (e.g: **map()**, **flatMap()**, **mapParitions()**, **union()**, **filter()**) 

**Wide** : In this case, the records computed for a single partition may be in many partitions of the parent RDD.
    (e.g: **groupbbyKey()**, **reducebyKey()**, **distinct()**, **intersection()**, **join()**, **cartesian()**, **repartition()**, **coalesce()**)

![alt text]( https://i.pinimg.com/564x/03/ed/05/03ed05a3b48605de48eed79a0406c92a--data-processing-big-data.jpg "RDD Transformations. ")

PySpark Documentation -> Go check : http://spark.apache.org/docs/2.1.0/api/python/pyspark.html

# The Dataset

In [None]:
%%file ./CECICourse/data.conv.short.txt
2017-03-31 03:38:15.951 1-0 122.153 2.07397
2017-03-31 03:38:16.684 1-1 -3.91901 2.03397
2017-03-31 03:38:16.238 1-2 11.04 2.09397
2017-02-28 00:59:16.987 1-0 19.9884 2.75964
2017-02-28 00:59:16.892 1-1 37.0933 2.74964
2017-02-28 00:59:16.764 1-2 45.08 2.75964
2017-02-28 01:03:16.668 1-0 19.3024 2.69742
2017-02-28 01:03:16.864 1-1 38.4629 2.69742
2017-02-28 01:03:17.110 1-2 45.08 2.75742
2017-02-28 01:06:16.102 1-0 19.1652 2.69742
2017-02-28 01:06:16.908 1-1 38.8039 2.69742
2017-02-28 01:06:16.599 1-2 45.08 2.77742
2017-02-28 01:06:46.950 1-0 19.175 2.75964
2017-02-28 01:06:47.728 1-1 38.8379 2.78964
2017-02-28 01:06:47.413 1-2 45.08 2.77964
2017-02-28 01:08:46.002 1-0 19.1456 2.69742
2017-02-28 01:08:46.558 1-1 38.9401 2.76742
2017-02-28 01:08:46.918 1-2 45.08 2.72742
2017-02-28 01:09:22.868 1-0 19.1652 2.71742
2017-02-28 01:09:22.366 1-1 38.872 2.70742
2017-02-28 01:09:22.584 1-2 45.08 2.77742
2017-02-28 01:09:46.703 1-0 19.1652 2.68742
2017-02-28 01:09:46.158 1-1 38.8039 2.77742
2017-02-28 01:09:46.141 1-2 45.08 2.77742
2017-02-28 01:10:17.273 1-0 19.1456 2.71964
2017-02-28 01:10:17.003 1-1 38.8379 2.70964
2017-02-28 01:10:17.450 1-2 45.08 2.73964
2017-02-28 01:10:46.630 1-0 19.1456 2.75742
2017-02-28 01:10:46.588 1-1 38.872 2.75742
2017-02-28 01:10:46.943 1-2 45.08 2.72742
2017-02-28 01:11:47.436 1-0 19.1456 2.71964
2017-02-28 01:11:47.931 1-1 38.9401 2.71964
2017-02-28 01:11:47.571 1-2 45.08 2.72964
2017-02-28 01:12:46.691 1-0 19.1358 2.71742
2017-02-28 01:12:47.009 1-1 38.9061 2.76742
2017-02-28 01:12:47.248 1-2 45.08 2.69742
2017-02-28 01:14:17.069 1-0 19.1162 2.71964
2017-02-28 01:14:16.991 1-1 38.8039 2.73964
2017-02-28 01:14:17.119 1-2 45.08 2.71964
2017-02-28 01:14:47.126 1-0 19.1162 2.70964
2017-02-28 01:14:47.436 1-1 38.872 2.75964
2017-02-28 01:14:47.344 1-2 45.08 2.76964
2017-02-28 01:15:17.490 1-0 19.1064 2.73964
2017-02-28 01:15:17.249 1-1 39.0082 2.69964
2017-02-28 01:15:16.856 1-2 45.08 2.77964
2017-02-28 01:16:17.202 1-0 19.1064 2.72964
2017-02-28 01:16:16.618 1-1 38.872 2.78964
2017-02-28 01:16:16.572 1-2 43.24 2.74964
2017-02-28 01:16:46.659 1-0 19.0966 2.70964
2017-02-28 01:16:46.750 1-1 38.8039 2.76964
2017-02-28 01:16:47.135 1-2 43.24 2.76964
2017-02-28 01:17:46.815 1-0 19.0966 2.75964
2017-02-28 01:17:47.260 1-1 38.7357 2.69964
2017-02-28 01:17:46.810 1-2 43.24 2.73964

In [None]:
!hadoop fs -copyFromLocal ./CECICourse/data.conv.short.txt ./CECICourse/data.conv.short.txt

In [None]:
text_file = sc.textFile("hdfs://public00/user/"+userName+"/CECICourse/data.conv.short.txt")

## Map

**MAP :** Return a new RDD by applying a function to each element of this RDD.
All lines present in the file of origin are loaded in a RDD then the MAP() splits it in lines. 
We then split those lines in multiples values considering the " " seperator used in the original.

In [None]:
lines = text_file.map(lambda line: line.split(" "))
print(lines)
for line in lines.take(lines.count()): print(line[0],line[1],line[2],line[3],line[4])
print("Done !")

## FlatMap

**FLATMAP :** Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
Here, the operation ressembles the Map but with the result being flattened, meaning all elements of the previous tuples are now present in a single list.

In [None]:
flatmap = text_file.flatMap(lambda line: line.split(" ")).collect()
print(flatmap)

## Reduce

**REDUCE :** Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
Here, the operation ressembles the Map but with the result being flattened, meaning all elements of the previous tuples are now present in a single list.

In [None]:
flatmap = text_file.flatMap(lambda line: line.split(" ")[-1:])
result = flatmap.reduce(lambda x,y: float(x) + float(y))
print(result)

## Filter

**FILTER :** Return a new RDD containing only the elements that satisfy a predicate.
The filter is used to filter out results that do not interest us. In the following example, we only want to keep the tuples where the hour is "01:10".

In [None]:
lines = text_file.map(lambda line: line.split(" "))
filter = lines.filter(lambda line: "01:10" in line[1]).collect()
print(filter)

## MapValues

**MAPVALUE :** Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD’s partitioning.

In [None]:
lines = text_file.map(lambda line: line.split(" "))
print(lines.collect())

def length(x):
        return len(x)

result = lines.mapValues(length).collect()
print(result)

## FlatMapValues

**FLATMAPVALUE :** Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

In [None]:
lines = text_file.map(lambda line: line.split(" "))
print(lines.collect())

def length(x):
        return x

result = lines.flatMapValues(length).collect()
print(result)

## Some actions besides collect()

**COUNTBYKEY :** Count the number of elements for each key, and return the result to the master as a dictionary.
**TAKE :** Take the first num elements of the RDD.It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit.
**LOOKUP :** Return the list of values in the RDD for key key. This operation is done efficiently if the RDD has a known partitioner by only searching the partition that the key maps to.

In [None]:
lines = text_file.map(lambda line: line.split(" "))

countByKey = lines.countByKey()
print("Result of countByKey : ",countByKey)

take = lines.take(1)
print("Result 1 in RDD lines : ",take)


lookup = lines.lookup('2017-03-31')
print("Result of lookup for 2017-03-31 in RDD lines : ",lookup)

## Let's play with RDDs

**GROUPBYKEY :** Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
**REDUCEBYKEY :** Merge the values for each key using an associative and commutative reduce function.This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

In [None]:
lines = text_file.map(lambda line: line.split(" ")[::4])

result = lines.groupByKey().collect()
print("GroupByKey example with pair RDD : ")
for key, value in result:
    print(key, end="")
    for elem in value:
            print(elem,"-", end="")
    print

print("ReduceByKey example with pair RDD : ")    
result = lines.reduceByKey(lambda v1,v2: float(v1) + float(v2)).collect()
print(result)

## Union

**PARALLELIZE :** Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
**UNION :** Build the union of a list of RDDs.

In [None]:
one = sc.parallelize(range(1,10))
two = sc.parallelize(range(10,21))
print("One : ",one.collect())
print("Two : ",two.collect())
print("One union Two : ",one.union(two).collect())

## Intersection

**INTERSECTION :** Return the intersection of this RDD and another one. The output will not contain any duplicate elements, even if the input RDDs did.

In [None]:
one = sc.parallelize(range(1,10))
two = sc.parallelize(range(5,15))
print("One : ",one.collect())
print("Two : ",two.collect())
print("One intersection Two : ",one.intersection(two).collect())

## Distinct

**DISTINCT :** Return a new RDD containing the distinct elements in this RDD.

In [None]:
one = sc.parallelize(range(1,9))
two = sc.parallelize(range(5,15))
print("One : ",one.collect())
print("Two : ",two.collect())
print("One distinct Two : ",one.union(two).distinct().collect())

## Repartitioning

**GLOM :** Return an RDD created by coalescing all elements within each partition into a list. The GLOM function coalesces all elements within each partition into a list
**REPARTITION :** Return a new RDD that has exactly numPartitions partitions.Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data. If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

In [None]:
rdd = sc.parallelize(range(50), 5).map(str)
glomed = rdd.glom()
print("Initial RDD : ",rdd.collect())
print("Glomed RDD : ",glomed.collect())
print("# of partitions : ",rdd.getNumPartitions())
print("-----------------------------")
# Repartition returns a new RDD with exactly the number of partitions selected.
rdd = sc.parallelize([1,2,3,4,5,6,7], 4)        # We specify 4 partitions
sorted(rdd.glom().collect())
rdd.repartition(2).glom().collect()
print("# of partitions before ",rdd.getNumPartitions())
print("Data in partitions : ",rdd.collect())
glomed = rdd.glom()
print("Glomed RDD : ",glomed.collect())

print("-----------------------------")
rdd.repartition(10).glom().collect()
print("# of partitions after ",rdd.getNumPartitions())
print("Data in partitions : ",rdd.collect())
glomed = rdd.glom()
print("Glomed RDD : ",glomed.collect())


glomed = rdd.repartition(10).glom().collect()
print("Glomed RDD : ",glomed)