Spark Streaming Twitter Example

Spark Streaming Twitter Example:





//Using Scala Program

package org . apache . spark . demo . streaming

import org . apache . spark . streaming . SparkContext._

import org . apache . spark . streaming . twitter._

import org . apache . spark . streaming . {Seconds, StreamingContext}

import org . apache . spark . SparkConf

object TwitterTags{

def main(args: Array[String]){

if(args.length < 5 ){

System. err. println (“Usage : Twitter Popular Tags <consumer key> <consumer secret>”+”<access token><access token secret>[<filters]”)

System.exit(1)

}

StreamingExamples . setStreamingLogLevels()

val Array ( consumerKey, consumerSecret, accessToken, accessTokenSecret ) = args.take(5)

val filters = args . takeRight(args.length – 5)

//Set the system properties so that Twitter 4j library used by twitter stream

//Can we use them to generate OAuth(Open Authentication) credentials

System . setProperty (“twitter4j . oauth . consumerKey”, consumerKey)

System . setProperty( ” twitter4j . oauth . consumerSecret”, consumerSecret)

System. setProperty(“twitter4j.oauth.accessToken”,accessToken)

System. setProperty (” twitter4j . oauth. accessTokenSecret”, accessTokenSecret)

val sparkConf = new SparkConf(). setAppName(“TwitterTags”)

val scc=new StreamingContext (sparkConf, Seconds(3))

val stream = TwitterUtils.createStream ( scc, None,filters)

val hashTags =stream. flatMap (status = > status. getText. split(” “).filter(_.startsWith(“#”)))

val topCounts = hashTags. map((_, 1).reduceByKeyAndWindow(_+_, Seconds(60)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

val topCounts1 = hashTags . map((_, 1). reduceByKeyAndWindow(_+_, Seconds(30)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

//Print Popular hashtags

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(30)

println (“\n Popular topics in last 60 seconds(%s total): “.format ( rdd . count()))

topList . foreach {case(count ,tag) => println(“%s(%s tweets)”.format(tag,count))

}})

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(60)

println(“\n Popular topics in last 30 seconds(%s total): “.format(rdd. count()))

topList . foreach {case(count ,tag)=>println(“%s(%s tweets)”. format(tag,count)))

}})

 

scc . start()

scc . awaitTermination()

}

}