Resilient Distributed Datasets(RDD) in Spark



RDD:

Resilient Distributed Datasets represents a collection of partitioned data elements that can be operated on in a parallel manner. RDD is the primary data abstraction mechanism in Spark and defined as an abstract class in Spark library it is similar to SCALA collection and it supports LAZY evaluation.

Characteristics of RDD:

1.Immutable :

RDD is an immutable data structure. Once created, it cannot be modified in-place. Basically, an operation that modifies RDD returns a new RDD.

2.Partitioned:

In RDD Data is split into partitions. These partitions are generally distributed across a cluster of nodes. When Spark is running on a single machine all the partitions are on that machine.

 

RDD Operations :

Applications in Spark process data using the same methods in RDD class. It referred to as operations

RDD operations are two types:

1.Transformations

2.Actions

 1.Transformations :

A transformation method of an RDD creates a new RDD by performing a computation on the source RDD.

RDD transformations are conceptually similar to SCALA collection methods.

The key difference is that the SCALA collection methods operate on data that can fit in the memory of a single machine, whereas RDD methods can operate on data distributed across a cluster of node RDD transformations are LAZY but SCALA collection methods are strict.

A) Map:

The map method is a higher order method that takes a function as input and applies it to each element in the source RDD to create a new RDD.

B) filter:

The filter method is a high order method that takes a Boolean function as input and applies it to each element in the source RDD to create a new RDD. A Boolean function takes an input and returns false or true. It returns a new RDD formed by selecting only those elements for which the input Boolean function returned true. The new RDD contains a subset of the elements in the original RDD.

c) flatMap:

This method is a higher order method that takes an input function in Spark, it returns a sequence for each input element passed to it. The flatMap method returns a new RDD formed by flattening this collection of the sequence.

D) mapPartitions :

It is a higher order method allows you to process data at a partition level. Instead of passing one element at a time to its input function, mapPartitions passes a partition in the form an iterator. The input function to the mapPartitions method takes an iterator as input and returns iterator as output.

E)Intersection:

Intersection method itakesRDD as input and returns a new RDD that contains the intersection of the element in the source RDD and the RDD passed to it as an input.

F)Union:

This method takes  RDD as input and returns a new RDD that contains a Union of the element in the resource RDD and the RDD passed to it as an input.


G)Subtract:

Subtract method takes RDD as input and returns a new RDD that contains elements in the source RDD but not in the input RDD.

 

H)Parallelize:

The Prallelized collections are created by calling Spark Context’s parallelize method on an existing collection in your driver program. The elements of the collection are copied to form a distributed data set that can be operated on in parallel.

 

I)Distinct:

Distinct method of an RDD returns a new RDD containing the distinct elements in the source RDD

 

J)Group By:

Group By is a higher order method it groups the elements of  RDD according to user-specified criteria. It takes as input a function that generates a key for each element in the source RDD. It is applicable to all the elements in the source RDD and returns an RDD of pairs.

 

K)Sort By:

The sortBy method is a higher order it returns RDD with sorted elements from the source RDD. It takes two input parameters. The first input is a function that generates a key for each element in the source RDD. The second input allows specifying ascending or descending order for sort.

 

L)Coalesce:

Coalesce method reduces the number of partitions in  RDD. It takes an integer input and returns new RDD with the specified number of partitions.

 

M)GroupByKey:

The GroupByKey method returns an RDD of pairs, where the first element in a pair is a key from the source RDD and the second element is a collection of all values that have the same key. It is the same as the groupBy method. The major difference is that groupBy is a higher order method that takes an input function that returns a key for each element in the source RDD. The groupByKey method operates in an RDD of key-value pairs.


N)ReduceByKey:

The higher-order reduceBy key method takes an associative binary operator as input and reduces values with the same key to a single value using specified binary operators.

2.Actions:

Actions are RDD methods that return a value to a driver program.

A)Collect:

The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker to the driver program.

B)Count:

This method returns a count of the elements in the source RDD.

C)Count By Value :

The countByValue method returns a count of each unique element in the source RDD. It returns an instance of the Map class containing each unique element and its count as a key-value pair.

D)First:

The first method returns the first element in the source RDD

E)Max:

The max method returns the largest element in  RDD

F)Min

The min method returns the smallest element in RDD

G)Top

The top method takes an integer N as input and returns an array containing the N largest elements in the source RDD.

H)Reduce

The high order reduces method aggregates the elements of the source RDD using an associative and commutative binary operator provided to it.

G)CountByKey

The countByKey methods count the occurrences of each unique key in the source RDD. It returns a Map of key count pairs.


Spark Streaming Twitter Example

Spark Streaming Twitter Example:

//Using Scala Program

package org . apache . spark . demo . streaming

import org . apache . spark . streaming . SparkContext._

import org . apache . spark . streaming . twitter._

import org . apache . spark . streaming . {Seconds, StreamingContext}

import org . apache . spark . SparkConf

object TwitterTags{

def main(args: Array[String]){

if(args.length < 5 ){

System. err. println (“Usage : Twitter Popular Tags <consumer key> <consumer secret>”+”<access token><access token secret>[<filters]”)

System.exit(1)

}

StreamingExamples . setStreamingLogLevels()

val Array ( consumerKey, consumerSecret, accessToken, accessTokenSecret ) = args.take(5)

val filters = args . takeRight(args.length – 5)

//Set the system properties so that Twitter 4j library used by twitter stream

//Can we use them to generate OAuth(Open Authentication) credentials

System . setProperty (“twitter4j . oauth . consumerKey”, consumerKey)

System . setProperty( ” twitter4j . oauth . consumerSecret”, consumerSecret)

System. setProperty(“twitter4j.oauth.accessToken”,accessToken)

System. setProperty (” twitter4j . oauth. accessTokenSecret”, accessTokenSecret)

val sparkConf = new SparkConf(). setAppName(“TwitterTags”)

val scc=new StreamingContext (sparkConf, Seconds(3))



val stream = TwitterUtils.createStream ( scc, None,filters)

val hashTags =stream. flatMap (status = > status. getText. split(” “).filter(_.startsWith(“#”)))

val topCounts = hashTags. map((_, 1).reduceByKeyAndWindow(_+_, Seconds(60)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

val topCounts1 = hashTags . map((_, 1). reduceByKeyAndWindow(_+_, Seconds(30)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

//Print Popular hashtags

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(30)

println (“\n Popular topics in last 60 seconds(%s total): “.format ( rdd . count()))

topList . foreach {case(count ,tag) => println(“%s(%s tweets)”.format(tag,count))

}})

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(60)

println(“\n Popular topics in last 30 seconds(%s total): “.format(rdd. count()))

topList . foreach {case(count ,tag)=>println(“%s(%s tweets)”. format(tag,count)))

}})

 

scc . start()



scc . awaitTermination()

}

}

 

 

Spark Streaming Use Case

Spark Streaming Use Case with Explanation:

Using Scala streaming imports

import org. apache. spark. streaming . StreamingContext

import org. apache. spark. streaming. StreamingContext._

import org. apache. spark. streaming.dstream . DStream

import org. apache. spark. streaming.Duration

import org. apache. spark. streaming.Seconds

Spark Streaming Context :

This is also sets up underlying SparkContext that it will use to process data. It takes as input a batch interval specifying how often to process new data

socketTextStream()

We use socketTextStream() to create a DStream based on text data received on the local machine

Then we transform the DStream with filter() to get only the lines that contains error. Output operation print() to print some of the filtered lines.

Create a Streaming Context with a 1 – second batch size frin a SparkConf

val scc=new StreamingContext(conf, Seconds(1))

// Create DStream using data received after connecting to default port on the local machine

val lines = scc.socketTextStream(“localhost”, 9000)

//Filter our DStream for lines with “error”

var errorLines = lines.filter(_.contains(“error”))

//Print out the lines with errors

errorLines. print()



Above example of converting a stream of lines to words the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream . This is shown in below figure Input DStreams are DStreams representing the stream of input data received from streaming process. In the above example of converting streaming of lines information words, lines was an input DStream as it represented the stream if data received from the server.

Every input DStream is associated with a Receiver object whether Java, Scala etc. Which receives the data from a source and stores it in Spark’s memory for processing. Here Spark Streaming provides two categories :



  1. Basic Sources: Sources directly available in the Streaming Context API example: file systems, socket connections
  2. Advanced Source: Sources indirectly available  like Flume, Kafka, Twitter etc. are available through extra utility classes.

 

Spark Streaming with Pictures

Spark Streaming:

Spark Streaming is a Spark’s module for a real time applications(Twitter tweets, statistics, page views). Lets user write streaming applications using a very similar API to batch jobs. Spark Streaming is a distributed data stream processing framework. It makes it easy to develop distributed applications for processing live data streams in real time. It only provides a simple programming model but also enables an application to process high velocity stream data. It also allows the combining of data streams and data for processing.

Spark Streaming is an extension of the core Spark API that enables scalable, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Flume, Kafka etc can be processed using complex algorithms expressed with high-level functions like map, reduce. Processed data can be pushed out to file systems and live dashboards.



Process Flow in Spark Streaming:

Spark Streaming receives live input data streams and divides the data into batches. Spark Engine will process the same data. Once processing is done Spark engine will generate the final stream of outputs in batches.

 

Streaming Context

Streaming Context , a class defined in the Spark Streaming library, is the main entry point into the Spark Streaming library. It allows a Spark Streaming application to connect to  a Spark Cluster.

Streaming Context provides methods for creating an instance of the data stream abstraction provided by Spark Streaming.

Every Spark Streaming application must create an instance of this class



Example:

import org. apache. spark._

import org. apache. spark. streaming._

val config = new Spark Conf(). setMaster (“spark : // host : port”) . setAppName (“Streaming app”)

val batch = 20

val ssc = new Streaming Context(conf, Seconds(batch)

The batch size can be as small as 500 milliseconds. The upper bound for the batch size is determined by the latency requirement of your application and the available memory in spark streaming.

 

Spark Most Typical Interview Questions List

Apache SPARK Interview Questions List

    1. Why  RDD resilient?
    2. Difference between Persist and Cache?
    3. Difference between Lineage and DAG?
    4. What is is narrow and wide transformations?
    5. What are Shared variables and it uses?
    6. How to define custom accumulator?
    7. If we have 50 GB memory and 100 GB data, how spark will process it?
    8. How to create UDFs in Spark?
    9. How to use hive UDFs in Spark?
    10. What are accumulators and broadcast variables?
    11. How to decide various parameter values in Spark – Submit?
    12. Difference between Coalesce and Re partition?
    13. Difference between RDD DATA FRAME and DATA SET. When to use one?
    14. What is Data Skew and  how to fix it?
    15. Why shouldn’t we use group by transformation in Spark?
    16. How to do Map side join in Spark?
  1. What Challenges  are faced in Spark Project?
  2. Use of map, flat map, map partition, for each for each partition ?
  3. What is Pair RDD ? When to use them?
  4. Performance optimization techniques in Spark?
  5. Difference between Cluster and Client mode?
  6. How to capture log in client mode and Cluster mode?
  7. What happens if a worker node is dead?
  8. What types of file format Spark supports? Which of them are most suitable for our organization need  ?


  9. Difference between reduceByKey() and groupByKey()?
  10. Difference between Spark 1 and Spark 2?
  11. How do you debug Spark jobs ?
  12. Difference between Var an Val ?
  13. What size of file do you use for development?
  14. How long will take to run your script in production ?
  15. Perform joins using RDD’s ?
  16. How do run your job in Spark?
  17. What is difference between Spark data frame and data set ?
  18. How data sets are type safe?
  19. What are sink processors?
  20. Lazy evaluation in Spark and its benefits?
  21. After Spark – Submit,  Whats’s process run behind of application?
  22. How to decide no.of stages in Spark job?

Above questions are related to Spark developers for experienced and beginners.

Spark Coding Test for Spark Developers

Here two programs for spark developers :

Question 1:

Mr. Bolt is in his 60’s and loves travelling. He recently visited a country famous for its pens. He has  ‘A’ grandchildren. He went to open Pen shop to purchase pens for them. The shop keeper showed him ‘a’ varieties of pens each variety containing ‘b[i ]’ pens.

He has to select city ‘c’ varieties of pens in a set in such a way that all the ‘A’  grand children get the same number of pens. If there are more than one such sets, the once with the minimum number of pens per child should be returned.

Inputs:

input 1: Value of ‘X’

input 2: Value of ‘c’

input 3:Value of ‘a’

input 4: Values in the array ‘b’

Output:

Return the minimum number of pens each grand children should get. Return -1 if no solution possible

Example:

Inputs:

input 1 : 5

input 2 : 3

input 3: 5

input 4 : {1,2,3,4,5}

 



Output: 2

Explanation : He can purchase pens in two sets {2,3,45} and {1,4,5}. The sum of each set is 10. Therefore, he will be able to give 2 pens to each of his grand children.

 

Question 2:

You just got a new job but your new office has different rule. They allow to take interval breaks in between tasks if there is no task available but the problem is that the tasks com randomly and sometimes it may be required  to do them simultaneously.

On your first day, you are given a list of tasks with their starting and ending time. Find out the total time you will get the breaks . Assurance ending time to be greater than  starting time.

Input:

Input 1 :  No.of tasks

Input 2 : 2-d array in for [10,11] representing starting and ending time period of the task

Output:

Your function must return an integer representing the total break time

Example:

input 1: 4

input 2: { (6,8)(1,9)(2,4)(4,7)}




Output: 0

Above Programs are related to Spark using SCALA, Java, Python and R languages for Spark developers.

Spark with Kafka Scenario for Spark Developers

Problems:

This scenario is related to real time example in Spark with Kafka for Spark developers.

Problem Statement:

A Reality Television in a Game show has 7 players, the game for one complete day, the winner of the game is decided by the votes cast by the audience watching the show. At the end of the day, the winner is decided by certain criteria which are detailed below.

Rules to cast vote:

1)Each unique user(let us assume has an ID) can cast vote for the players

2)The user can cast, maximum one vote every 2 minutes he has the liberty casting different players each time

3)If a user casts more than one vote in a spam of two minutes, the latest vote will overwrite the previous vote.

Calculation criteria for the winner:

1)Find the player who has maximum votes every minute of the day, the player with maximum votes for the minute will get one reward point.

2)At the end of the day player who has maximum reward points is the winner

 



Tasks:

1)Create a system which simulates user voting to a Kafka topic

2)Spark Streaming job should process the stream data and process the data based on the rules mentioned above

3)The reward points for the users should be stored in the persistent system

4)Provide a query to find the winner.

 

Apache Kafka is messaging and integration  for Spark Streaming. Kafka act as the central hub for real-time streams of data and are processed.

Above scenario asked for coding in Spark with Kafka. For Spark Developers will implement in SCALA or Python depends upon your programming knowledge. Now a days most important scenario in IT industry for CCA – 175 also.

Spark SQL with example(Pictures)

In Hive Context provides a super set of the functionality provided by SQL Context. The parser that comes with Hive Context is more powerful than the SQL Context parser. It can execute both HiveQL (Hive Query Language) and SQL queries and it can read data from Hive tables. It also allows applications to access Hive UDFs(User Defined Functions). If we want to process existing Hive tables then add hive-site.xml file to Spark’s class path. Hive Context read Hive configuration from the hive-site.xml file.

Data Frames: Data Frame is a Spark SQl’s primary data abstraction. It represents a distributed collections of rows organized into named columns . It is similar to relational data base.

Spark SQL is a Spark module for structured data processing and it provides a programming abstraction called Data Frames and can also act as distributed SQL query engine.

Few points for Spark SQL:

1.A Data Frame is a distributed collection of data organized into named columns.

2. It is conceptually equivalent to a table in a relational database

3.Data Frames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs converting “RDD to Data Frame” .

In Spark SQL provides an implicit conversion method named toDf which it creates a Data Frame simply.

Coming to RDD of objects represented by a case class when this technique is used Spark SQL infers the schema of a data set. The toDF method is not defined in the RDD class, but it is available through an implicit conversion . To convert an RDD to a Datd Frame using toDF, then we need import the implicit methods.



example:

Scala > val data =sc . parallelize (1 to 100)

Scala > val new Data=data.map(l=> (l, 1 – 10))

Scala > val result Data=new Data. toDF(“normal”, “transformed”)

result Data.print Schema

Scala > result Data.show

Above example very simple for Spark beginners.

 

 

Interview Questions on Spark Core

Spark Core interview questions :

1)What is Spark and explain briefly?

Spark is an in-memory cluster computing framework for processing and analyzing large amount of data. Spark provides a simple programming interface, which enables an application developer to easily to use Memory, CPU and storage resources across cluster of servers for processing in large data sets.

2)What is an an RDD and explain RDD properties?

Resilient Distributed Data sets  represents a collection of partitioned data elements that can be operated on in parallel manner. RDD is the primary data abstraction mechanism in Spark and defined as an abstract class in Spark library it is similar to SCALA collection and it supports LAZY evaluation.

3)What is Lazy evaluation, Why Spark is Lazy Evaluated?

Spark is “Lazy Evaluated ” system because Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in lazy way that is the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when you are working with Big Data.



4)What is the Spark Context?

Spark Context is a class defined in the Spark library and main entry point into the Spark library. Spark Context will run in a program called “Driver Program” is the main program in Spark.

5)What are narrow and wide dependencies in RDD?

Narrow Dependencies:

In an RDD each parent partition contribute data to a single child partition  and it is a sequence of operations involving narrow dependencies can be pipe lined.

Wide Dependencies:

In and RDD each parent partition contributes data to multiple child partition and it requires a shuffle and expensive operation in a distributed system

6)What are the components of the Spark Compute Engine?

Spark Compute Engine is a data parallel application for data processing. It is divide into three components .

1.Driver

2.Cluster manager

3.Executor

Why Spark is Lazy Evaluation and How RDDs are Fault Tolerant

Why Spark is Lazy Evaluation and How RDDs are Fault Tolerant

Why Spark is Lazy Evaluation ? :

Why Spark is “Lazy Evaluated ” system because Spark computes RDDs. Although you can define new RDDs any time, Spark computes them only in lazy way that is the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when you are working with Big Data.

How RDDs are Fault Tolerant ? :

RDD is designed to be fault tolerant and represents data distributed across a cluster of nodes. The probability of a node failing is proportional to the number of nodes in a cluster. The larger a cluster, the higher probability that some node will fail on any given RDD automatically handles node failures. When a node fails, and partitions stored on that node become in accessible Spark reconstructs the lost RDD partitions on another node.

Spark storage lineage information for each RDD. Using this lineage information, it can recover parts of an RDD or even an entire RDD in the event of node failures RDD. persist ()



Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, can ask Spark to persist it using RDD. persist ()

Go to Spark shell and check it filtered Name .persist method in Spark after create RDD.

ex: Scala > filtered Name. persist()

Every Spark program and shell session will work as follows:

Create some input RDDs from external data. Transform them to define new RDDs using transformations like filter().

Spark to persist() any intermediate RDDs that will need to be reused. After launch actions such as count() and first() to kick off a parallel computation which is then optimized and executed by Spark.

In Spark cache() method same as the calling persist() with default storage level