![alt text](https://kafka.apache.org/images/logo.png "")

#### So, the term "BigData" implies de manipulation of large amounts of informations... This leads to multiple challenges among wich : *Data Collection* & *Data Analysis*. With HDFS with have means of storing information in large quantities and with Map Reduce with have at least one way of making data analysis. However, the problems of "Data Storage" and "Data Collection" are different.

#### Kafka is a messaging system designed for high throughput distributed systems, using it we are able to ingest large amounts of information that would be generated as a continuous flow. It offers better throughput than other messaging systems, replication, fault-tolerance and built-in paritioning and as such is a good candidate for large-scale message processing applications.

![alt text](https://kafka.apache.org/images/kafka_diagram.png "Kafka Streaming Platform")

#### Ok, Kafka is a messaging system but what is it ? Evern heard of MQTT for instance ? The idea behind such system is to transfer data from one application to another. As Kafka may work in a distributed fashion, it also offers reliability for the actual queuing of the messages.

# Type of messaging systems

**Point to Point Messaging System :** In such a system, all messages are persisted in a queue and can be consumed by multiple consumers even though a particular message in the queue can only be consumed by one consumer only. In essence, once a message in the queue is read by a consumer, this message is removed from the queue.

![alt text](https://docs.oracle.com/cd/E19575-01/820-6424/images/to_simpleQ.gif "Point-to-Point Messaging System - Simple")

![alt text](https://docs.oracle.com/cd/E19575-01/820-6424/images/to_complexQ.gif "Point-to-Point Messaging System - Complex")


**Publish-Subscribe Messaging System :** The key characteristic here is that all messages are persisted in a topic. Here, consumers can subscribe to one or multiple topics and can consume ALL messages in those particular topics.

![alt text](https://docs.oracle.com/cd/E19575-01/820-6424/images/to_simpleTopic.gif "Publish-Subscribe Messaging System - Simple")

![alt text](https://docs.oracle.com/cd/E19575-01/820-6424/images/to_complexTopic.gif "Publish-Subscribe Messaging System - Complex")

# Why use Kafka ?

**Kafka** is a distributed publish-subscribe message system focused on high throughput. All messages ingested by Kafka are persisted on disk and are also replicated within a given cluster to garantee fault-tolerance and thus prevent data loss. However, Kafka relies on another service, **ZooKeeper** which offers Kafka the synchronisation information it needs to run properly in a distributed way.

The benefits of Kafka are thus : 

**- Reliability** as it is distributed, partitioned, replicated and offers fault tolerance.<br>
**- Scalability** as we can either increase or descrease the actual size of a Kafka cluster on demand and without downtime to best fit the actual load.<br>
**- Durabiliy** as all messages are persisted on disk.<br>
**- Performance** as it is designed for High Throughput.

What can Kafka be used for ?

**- Log aggregation** even though other systems may even be more appropriate for that purpose now.<br>
**- Metrics** as it can ingest vasts amounts of data, collecting metrics from multiple servers was a strong point for monitoring system even though, again, other systems do exist now for such scenarios.<br>
**- Stream Processing** Now we are talking. Framework like Storm and Spark Streaming can take advantage of Kafka to process information in a streaming fashion thus offering us different means of making data analysis. Indeed, in MapReduce, you only do Batch-Processing, you cannot handle continuous influx of information without having to invest lots of time in automating your submissions were in Spark Streaming for instance, you can make an application run permanently and analysing data as it arrives.

# Kafka concepts

Using Kafka, you have to be familiar with the concepts it uses such as *topics*, *brokers*, *producers* and *consumers*.

![alt text](https://www.tutorialspoint.com/apache_kafka/images/fundamentals.jpg "Kafka Architecture with a replication factor of 3")

**Kafka standalone** Instance of Kafka using a single Broker<br>
**Kafka cluster** Once Kafka uses more than a single Broker, is becomes a cluster.<br>
**Topics** Stream of message from a particular category. Those topics are split into <br>
**Partitions** that store messages in an immutable ordered sequence.<br>
**Parition offset** Unique sequence identifier called offset. It is used to locate the actual message a consumer consumes.<br>
**Replicas** of a partition are exact copies of the partitions. Those are only accessed when something wrong happens in the cluster and data must be restored.<br>
**Brokers** those are the simplest systems responsible for maintaining the published data. Each can either have 0 or more partitions for each topic.<br>
**Producers** are Publishers of messages<br>
**Consumers** are Consumers of messages<br>
**Leader** Node responsible for all reads and write for a given partition, thus, each partition has one server acting as leader.<br>
**Follower** Node which just follows the **Leader** instructions. A Follower may become a Leader if the node attributed the role of Leader fails at some point. In practise, a Follower acts just as a consumer, consuming data from its Leader to maintain its own data store.


### Let's not forget the role of ZooKeeper...


# Using Kafka from Command Line

## Listing Topics

In [None]:
!kafka-topics --list --zookeeper public00:2181

## Creating a topic

In [None]:
!kafka-topics --create --zookeeper public00:2181 --replication-factor 3 --partitions 5 --topic $USER

In [None]:
!kafka-topics --list --zookeeper public00:2181

## Removing a topic

In [None]:
!kafka-topics --delete --zookeeper public00:2181 --topic $USER

## Starting a producer - With file input

In [None]:
!kafka-console-producer --broker-list public01:9092 --topic $USER < ./CECICourse/books/pg20417.txt 

## Starting a consumer - From the beginning

In [None]:
!kafka-console-consumer --bootstrap-server public01:9092 --topic $USER --from-beginning

## Describing a topic

In [None]:
!kafka-topics --describe --zookeeper public00:2181 --topic $USER

## Consuming a topic from a given offset

In [None]:
!kafka-console-consumer --bootstrap-server public01:9092 --topic $USER --partition 0 --offset 10

## Cluster mode !

In [None]:
!kafka-topics --create --zookeeper public00:2181 --replication-factor 3 --partitions 5 --topic $USER
!kafka-console-producer --broker-list public01:9092,public02:9092,public03:9092,pulic04:9092,public05:9092,public06:9092,public07:9092,public08:9092,public09:9092 --topic $USER < ./CECICourse/books/pg20417.txt

## Let's get out of the command line

In [None]:
!kafka-console-consumer --bootstrap-server public01:9092 --topic $USER --from-beginning

## Producing through Python

In [None]:
import os
import pwd
from kafka import KafkaProducer

topic = pwd.getpwuid( os.getuid() )[ 0 ]
producer = KafkaProducer(bootstrap_servers=["public01:9092","public02:9092","public03:9092","public04:9092","public05:9092"])

producer.send(topic, b'Hello world !!!')
producer.send(topic, b'Ok, go check the consumer !')
producer.send(topic, b'Bye!')
print("Messsage sent to topic : "+topic)

## Consuming through Python

In [None]:
import os
import pwd
from kafka import KafkaConsumer

topic = pwd.getpwuid( os.getuid() )[ 0 ]
consumer = KafkaConsumer(topic, bootstrap_servers=["public01:9092"], auto_offset_reset='earliest')

print("Waiting for data to consume...")

for message in consumer:
    print(message.topic, message.partition, message.key, message.value)
    
# If you prefer to consume JSON messages that could be more practical
# KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))      

# If you desire to consume a specific pattern of topic
# consumer = KafkaConsumer()
# consumer.subscribe(pattern='^awesome.*')