Spark Streaming Twitter Example:
//Using Scala Program
package org . apache . spark . demo . streaming
import org . apache . spark . streaming . SparkContext._
import org . apache . spark . streaming . twitter._
import org . apache . spark . streaming . {Seconds, StreamingContext}
import org . apache . spark . SparkConf
object TwitterTags{
def main(args: Array[String]){
if(args.length < 5 ){
System. err. println (“Usage : Twitter Popular Tags <consumer key> <consumer secret>”+”<access token><access token secret>[<filters]”)
System.exit(1)
}
StreamingExamples . setStreamingLogLevels()
val Array ( consumerKey, consumerSecret, accessToken, accessTokenSecret ) = args.take(5)
val filters = args . takeRight(args.length – 5)
//Set the system properties so that Twitter 4j library used by twitter stream
//Can we use them to generate OAuth(Open Authentication) credentials
System . setProperty (“twitter4j . oauth . consumerKey”, consumerKey)
System . setProperty( ” twitter4j . oauth . consumerSecret”, consumerSecret)
System. setProperty(“twitter4j.oauth.accessToken”,accessToken)
System. setProperty (” twitter4j . oauth. accessTokenSecret”, accessTokenSecret)
val sparkConf = new SparkConf(). setAppName(“TwitterTags”)
val scc=new StreamingContext (sparkConf, Seconds(3))
val stream = TwitterUtils.createStream ( scc, None,filters)
val hashTags =stream. flatMap (status = > status. getText. split(” “).filter(_.startsWith(“#”)))
val topCounts = hashTags. map((_, 1).reduceByKeyAndWindow(_+_, Seconds(60)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))
val topCounts1 = hashTags . map((_, 1). reduceByKeyAndWindow(_+_, Seconds(30)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))
//Print Popular hashtags
topCounts . foreachRDD (rdd = > {
val topList = rdd. take(30)
println (“\n Popular topics in last 60 seconds(%s total): “.format ( rdd . count()))
topList . foreach {case(count ,tag) => println(“%s(%s tweets)”.format(tag,count))
}})
topCounts . foreachRDD (rdd = > {
val topList = rdd. take(60)
println(“\n Popular topics in last 30 seconds(%s total): “.format(rdd. count()))
topList . foreach {case(count ,tag)=>println(“%s(%s tweets)”. format(tag,count)))
}})
scc . start()
scc . awaitTermination()
}
}