![alt text](https://i.pinimg.com/736x/3e/a4/74/3ea47478593f9f25bfd4ef9223b8bbf3.jpg "")

#### The "Streaming" aspect implies that Spark Streaming allows the processing of unbounded data sets. Instead of processing well defined batches of data, this approach allow you to handle continous influx of information. Spark Streaming is a framework that natively supports processing in "within-batch" and "accross-batch"


![alt text](https://spark.apache.org/docs/latest/img/streaming-arch.png "")

#### How does this work ? In essence, Spark Streaming receives input data streams from any type of services (i.e: Kafka, Flume, HDFS, Kinesis, Twitter,...) and divides them into batches. Those mini batches are then processed by Spark to build a final stream of results in batches. Why this division of batches ? To parallelize the processing accross as many machines as we are allowed to use.


![alt text](https://spark.apache.org/docs/latest/img/streaming-flow.png "")

#### Spark Streaming provides a high level of abstraction to manipulate those influx of data. 'DStreams' can be created from an input source like Kafka.


# Discretized Streams (DStreams)

What is it ? The basic abstraction provided by Spark Streaming.
A DStream is represented by a continuous series of RDDs. Each RDD in a DStream contains data from a specific interval.

![alt text](https://spark.apache.org/docs/latest/img/streaming-dstream.png) 

Any operation that is applied on a DStream translates to operations on the underlying RDDs. If we consider a simple example where the stream of data is a stream of lines of words (i.e: simple sentences for instance), a flatMap operation is applied on each RDD in the lines to generate as output the words DStreams containing a list of the words present in the processed sentence.

![alt text](https://spark.apache.org/docs/latest/img/streaming-dstream-ops.png) 

**For Streaming data, you have two options :** 

1) Use a "Basic Source" which for instance can be a retrieval from a file system.

2) Use an "Advanced Source" like Kafka or another service from which you could consume data.


#### Do not forget ! 
1) Once a context is started, no new streaming computations can be set up or added to it.

2) Once a context has been stopped, it cannot be restarted.

3) A Single StreamingContext can be active in a JVM at a time.

4) The function call stop() on a StreamingContext also stops the SparkContext. However, you can only stop the first one by adding the parameter "stopSparkContext" set to false.

5) A SparkContext can be re-used to create multiple StreamingContexts. But, the previous SSC must be stopped before the next SSC is launched.

## WordCount in a Streaming Context using Spark Streaming

In this session, you won't execute each part of the notebook, as such, from Zeppelin. You will be required to create your Python executables from the command line interface and execute them localy by invoking your scripts from command like ./nameOfYouScript. 
###### Is this the only way of submitting a PySpark job ? No, there are multiple options.
1) You may start pyspark2 from command line and write your script interactively.

2) You may start a Python script that uses PySpark libraries from the command line.

3) You may submit a Python script to spark-submit from command line.

On top of those methods of submission you may also set the parameters of your job (i.e: If the program must be executed locally and with how many threads, if the program must be executed under yarn supervision,...) 

###### Will all methods work out-of-the-box ? Well... They should, but they won't.
The issue here is very simple : Spark is written in Scala but you can use it's core from either Scala, Java or Python which implies multiples layers of software dependencies. On top of that, a Spark Submission under Yarn's supervision implies that those dependencies must be met on all machine implicated in the job processing AND that Yarn's own dependencies are met. And, in the case you desire to use Kafka to consume data, then the dependencies of that particular service must be met as well.
The bottom line is simple : Big Data softwares are complex and it is of the upmost importance to know, at all time, on which services you rely and how they are architectured.

###### For the software dependencies, how can we ensure those are met ?
For Spark and Spark Streaming, setting up all the necessary paths and environment variables can be tedious... So much that a simple library called "FindSpark" was developped to solve that particular issue. You simply have to call the init function with the Spark path as parameters, and all is automatically set. You must do it prior to building your Spark Context.


**import findspark**

**findspark.init("/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/")**

## Basics

#### If the following exercices, we will see three methods to create a WordCount script using Spark Streaming :
1) Using the local filesystem as input source (i.e: Not scalable, Bad performances,...)<br>
2) Using HDFS as the input source (i.e: Scalable,...)<br>
3) Using Kafka, an external service dedicated to data ingestion, as input source. (i.e: Scalable, High Performance,...)


### Do not forget how this works !
By default, a SparkStreaming context will try to consume new information flowing in the source that you specified. Thus, if you point to a folder either on the local filesystem or HDFS, all existing files will be ignored and only the new ones, the ones created or modified AFTER the start time of the Spark Streaming job, will be processed. This also stands true for Kafka.
SparkStreaming will check every X seconds if there are new informations to process, if yes, the mini batches will be fed some data, if not, it won't process anything !

# Prepare stuff

In [None]:
!mkdir ./CECICourse/booksStream

# WordCount from Local Filesystem

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

conf = SparkConf().setAppName("Reading files from a directory")
sc   = SparkContext(conf=conf)
#sc.setLogLevel("DEBUG")                # If you wan't to get more information in the output. Be careful, the amount if huge.
ssc  = StreamingContext(sc, 2)

lines = ssc.textFileStream('./CECICourse/booksStream')

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))

wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()

In [None]:
ssc.stop()

In [None]:
!cp ./CECICourse/loremipsum.txt ./CECICourse/booksStream

# WordCount from HDFS

In [None]:
import os
import pwd
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

username = pwd.getpwuid( os.getuid() )[ 0 ]

if __name__ == "__main__":
    conf = SparkConf().setAppName("Reading files from a HDFS directory")
    sc   = SparkContext(conf=conf)
    #sc.setLogLevel("DEBUG")             # If you wan't to get more information in the output. Be careful, the amount if huge.
    ssc  = StreamingContext(sc, 2)

    lines = ssc.textFileStream('hdfs://public00:8020/user/'+username+'/CECICourse/booksStream/')

    # Split each line into words
    words = lines.flatMap(lambda line: line.split(" "))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))

    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()

    ssc.start()             # Start the computation
    ssc.awaitTermination()

# WordCount from Kafka

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark/jars/spark-streaming-kafka-0-10_2.11-2.4.0-cdh6.2.0.jar /usr/local/anaconda3/bin/pyspark'
import pwd
import findspark
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils 

username = pwd.getpwuid( os.getuid() )[ 0 ]
#findspark.init("/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark")

if __name__ == "__main__":
    conf = SparkConf().setAppName("Reading data from Kafka")
    sc   = SparkContext(conf=conf)
    #sc.setLogLevel("DEBUG")            # If you wan't to get more information in the output. Be careful, the amount if huge.
    ssc  = StreamingContext(sc, 2)

    broker, topic = "public01:2181", username
    kvs = KafkaUtils.createStream(ssc, broker, "raw-event-streaming-consumer", {topic:1}) 
    lines = kvs.map(lambda x: x[1])

    # Split each line into words
    words = lines.flatMap(lambda line: line.split(" "))

    # Count each word in each batch
    pairs = words.map(lambda word: (word, 1))

    wordCounts = pairs.reduceByKey(lambda x, y: x + y)

    # Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.pprint()

    ssc.start()             # Start the computation
    ssc.awaitTermination()