What is Kafka?
Kafka is a distributed messaging system and it is publish-subscribe messaging consider as a distributed commit log. Explaining below with brief:
Apache Kafka is a distributed, partitioned, replicated give log service. It provides the functionality of a messaging system, but with a distinctive design.
Use Case – In Integration with Spark Streaming:<
//import all staments from apache like Spark Kafka packages import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.kafka.common.serailization.StringSeralizer object KafkaWordCountExample{ // take object as Kafka Word Count Example def main(args: Array[String]){ //main method if(args.length < 6){ System.err.println("SimpleKafkaUsage: KafkaWordCountExample<zkQuorum> <group> <topics><numThreads>") //print zookeeper quorums, topics System.exit(1) } KakfkaStreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads ) = args //pass values val sparkConf = new SparkConf().setAppName("KafkaWordCountExamples") //Spark configuration with Kafka Object val ssc = new StreamingContext(sparkConf, Seconds(1)) //creat Spark Streaming Context ssc.checkpoint("checking checkpoint") //Checking SSC Checkpoint val topicNew = topics.split(",").map((_,numThreads.toInt)).toMap //create topics val lines = KafkaUtils. createStream(ssc,zkQuorum, group, topicNew).map(_._1) val words = lines.flatMapp(._split("")) //split into words val wordCount = words.map ( x => ( x, L)) . reduceByKeyWithWindowSession(_+_,_-_, Minutes(5), Seconds(1), 1)wordCount.print() ssc.start() //Start Spark Streaming Context ssc.awaitTermination() } object KafkaWordCountProducer { //Create KafkaWordCountProducer object def main (args: Array[String]){ if (Args.length < 6) { System.err.println("SimpleKafkaWordCountProducerUsage: KafkaWordCountProducer<metadataBrokerList> <topic>"+<messagePerSeconds> <wordsPerMessage>") system.exit(1) } val Array(brokers, topic, messagePerSeconds, wordsPerMessage) = args val properties = new HashMap[String, Object] () properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) properties.put(ProducerConfig.KEY_SERAILIZER_CLASS_CONFIG) properties.put(ProducerConfig.VALUE_SERAILIZER_CLASS_CONFIG) //Confiured Key, Value val producer = new KafkaProducer[String, String](properties) while (true){ (1 to messagesPerSeconds.toInt).foreach{messageNum = > val str = ( 1 to wordsPerMessage.toInt).map(x=> scala.util.Random.nextInt(10).toString).mkString("") val message = new ProducerRecord [ String, String] (topic, null , str) producer.send(message) //Producer send messages } Thread.sleep(10) } } }
How to run the above Spark code in Command line:
How to start the Zookeeper server for Kafka bin/zookeeper - server - start.sh config/zookeeper.properties Start Kafka Server : bin/kafka-server - start.sh config/server.properties create a topic with single partition and single replica bin/kafka - topics.sh --create --zookeeper localhost : 2181 -- replication -factor 1 -- pratitions 1 -- topic sreekanthtopic start the kafka producer to send the messages bin/kafka-console-producer.sh --broker -list localhost : 9092 --topic sreekanthtopic Run the kafka kafka straming code to process the continuosly comming data from kafka with spark streaming $SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCountExample localhost sreekanthgroup sreekanthtopic 1 KafkaWordCountExample is a program name localhost is the Zookeeper Quorum name sreekanthgroup is consumer Group Name sreekanthtopic is a topic name from where we are sending the data 1 is means for no. of threads
Summary: How to integrate with Kafka with Spark Streaming with a simple use case using Spark Scala coding and how to run the code using Zookeeper. Simple steps to execute this simple Spark Code.