Kafka integration with Spark Streaming with Use case




What is Kafka?

Kafka is a distributed messaging system and it is publish-subscribe messaging consider as a distributed commit log. Explaining below with brief:

Apache Kafka is a distributed, partitioned, replicated give log service. It provides the functionality of a messaging system, but with a distinctive design.

Use Case – In Integration with Spark Streaming:<

//import all staments from apache like Spark Kafka packages
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming._

import org.apache.spark.streaming.kafka._

import org.apache.kafka.common.serailization.StringSeralizer

object KafkaWordCountExample{
// take object as Kafka Word Count Example 
def main(args: Array[String]){
//main method
if(args.length < 6){

System.err.println("SimpleKafkaUsage: KafkaWordCountExample<zkQuorum> <group> <topics><numThreads>")
//print zookeeper quorums, topics 
System.exit(1)

}

KakfkaStreamingExamples.setStreamingLogLevels()

val Array(zkQuorum, group, topics, numThreads ) = args
//pass values
val sparkConf = new SparkConf().setAppName("KafkaWordCountExamples")
//Spark configuration with Kafka Object
val ssc  = new StreamingContext(sparkConf, Seconds(1))
//creat Spark Streaming Context
ssc.checkpoint("checking checkpoint")
//Checking SSC Checkpoint
val topicNew = topics.split(",").map((_,numThreads.toInt)).toMap
//create topics
val lines = KafkaUtils. createStream(ssc,zkQuorum, group, topicNew).map(_._1)

val words = lines.flatMapp(._split(""))
//split into words
val wordCount  = words.map ( x => ( x, L)) . reduceByKeyWithWindowSession(_+_,_-_, Minutes(5), Seconds(1), 1)wordCount.print()

ssc.start()
//Start Spark Streaming Context
ssc.awaitTermination()
}

object KafkaWordCountProducer {
//Create KafkaWordCountProducer object
def main (args: Array[String]){
if (Args.length < 6) {
System.err.println("SimpleKafkaWordCountProducerUsage: KafkaWordCountProducer<metadataBrokerList>  <topic>"+<messagePerSeconds> <wordsPerMessage>")
system.exit(1)
}
val Array(brokers, topic, messagePerSeconds, wordsPerMessage) = args
val properties = new HashMap[String, Object] ()
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
properties.put(ProducerConfig.KEY_SERAILIZER_CLASS_CONFIG)
properties.put(ProducerConfig.VALUE_SERAILIZER_CLASS_CONFIG)
//Confiured Key, Value
val producer = new KafkaProducer[String, String](properties)
while (true){
(1 to messagesPerSeconds.toInt).foreach{messageNum = >
val str = ( 1 to wordsPerMessage.toInt).map(x=> scala.util.Random.nextInt(10).toString).mkString("")
val message = new ProducerRecord [ String, String] (topic, null , str)
producer.send(message)
//Producer send messages
}
Thread.sleep(10)
}
}
}





How to run the above Spark code in Command line:

How to start the Zookeeper server for Kafka

bin/zookeeper - server - start.sh config/zookeeper.properties

Start Kafka Server :

bin/kafka-server - start.sh config/server.properties

create a topic with single partition and single replica

bin/kafka - topics.sh --create --zookeeper localhost : 2181 -- replication -factor 1 -- pratitions 1 -- topic sreekanthtopic

start the kafka producer to send the messages

bin/kafka-console-producer.sh --broker -list localhost : 9092 --topic sreekanthtopic

Run the kafka kafka straming code to process the continuosly comming data from kafka with spark streaming

$SPARK_HOME/bin/run-example

org.apache.spark.examples.streaming.KafkaWordCountExample localhost sreekanthgroup sreekanthtopic 1

KafkaWordCountExample is a program name

localhost is the Zookeeper Quorum name

sreekanthgroup is consumer Group Name

sreekanthtopic is a topic name from where we are sending the data

1 is means for no. of threads





Summary: How to integrate with Kafka with Spark Streaming with a simple use case using Spark Scala coding and how to run the code using Zookeeper. Simple steps to execute this simple Spark Code.

Leave a Reply

Your email address will not be published. Required fields are marked *