Spark Performance Tuning with pictures





Spark Execution with a simple program:

Basically, the Spark program consists of a single spark driver and process and a set of executors processes across nodes of the cluster.

Performance tuning of Spark is measured bottleneck using big data environment metrics for block time analysis. Spark is run on In-memory cache so need to avoid network and I/O are key role while performance.

For example: Take two clusters, one cluster with 25 machines and cluster size is 750 GB of data. The second cluster with 75 machines clusters with 4.5TB of raw data.  The network communication is always irrelevant for the performance of these workloads coming to network optimization is to reduce job completion by 5% for better performance. Finally serialized compressed data.

Mostly Apache Spark always supports transformations like groupByKey and reduceByKey dependencies. Spark executes a shuffle, which transfers data around the cluster. Below three operations  with different outputs:

sc.textFile(" /hdfs/path/sample.txt")
map(mapFunc) #Using Map function 
flatMap(flatMapFunc) #Flatmap is another transformer
filter(filterFunc)
count() # Count the data.

Above code executes a single performer, which depends on a sequence of transformations on an RDD derived from the sample.tx file.

If the code contains how many times each character appears in all the words that appear more 1000 times in a given text file. Below code in Scala:

Val  token=sc.textFile(args(0).flatMap(_.split(' ' ))
Val wc=token.map((_,1)).reduceByKey(_+_)
Val filtered = wc.filter(_._2 > =1000)
val charCount = filtered.flatMap(_._1.toCharArray).map((_, reduceByKey(_+_)
charCount.collect

Above code breaks into mainly three stages. The reduceByKey operations result in stage boundaries

 

 

 

Java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver in Spark Scala

While writing Apache Spark in Scala / Python (PySpark) programming language to read data from Oracle Data Base using Scala / Python in Linux operating system/ Amazon Web Services, sometimes will get below error in




spark.driver.extraClassPath in either executor class or driver class.

Caused by: java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver

at scala.tools.nsc.interpreter.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:83)

at Java .lang.ClassLoader.loadClass(ClassLoader.java.424)

at Java.lang.ClassLoader.loadClass(ClassLoader.java.357)

at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:35)

at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anofun$createConnectionFactory$1.api

at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$anofun$createConnectionFactory$1.api

at scala.Option.foreach ( Option .scla:236)

at org . apache . spark . sql . execution . datasources . jdbc.JdbcUtils $ anofun $ createConnection Factory $ (JdbcUtils.scala)

at <init> ( < console >:46)

at . <init> (<console>:52)

at. <clinit> (<console>)

at. <init> (<console>:7)

at. <clinit> (<console>)

at $print (<console>)

Solution:

After getting this error will provide a simple solution in Spark Scala. Sometimes these are coming to Python (PySpark).

import related jars to both executor class and driver class. First, we need to edit the configuration file as spark defaults in spark-default.conf file.

Adding below two jar files path in spark-default.conf file.

spark.driver.extraClassPath /home/hadoop/ojdbc7.jar
spark.executor.extraClassPath /home/hadoop/ojdbc7.jar

Above two jar files path in configurations with exact version is matched to your Spark version otherwise will get the compatible issue.

Sometimes these two classpaths get an error then will add in your code either Scala or Pyspark programming –conf before Spark driver, executor jar files in bottom of the page example.

If will get the same issue again then will follow the below solution:
Step 1: Download Spark ODBC jar files from the official Maven website.

Step 2: Copy the download jar files into the below path in the share location in Spark.

/usr/lib/spark/jars

For Example –  PySpark programming code snippet for more information.

 

pyspark --driver-classpath /home/hadoop/odbc7.jar --jars #  jar file path
from pyspark import SparkContext, Spark conf # import Spark Context and configuration

from pyspark.sql import SparkContext #Sql context

sqlContext = sqlContext (sc)

dbconnection = sqlContext . read . format ("jdbc") . options (url: "Give your jdbc connection url path").load()
#Data Base connection to read data from Spark with the help of jdbc

 

Hadoop and Spark Interview Questions

Cognization conducted Hadoop and Spark interview question for experienced persons.

Round 1:

1. What is the future class in Scala programming language?

2.Difference between fold by fold Left or foldRight-in Scala?

3. How to distribute by will work in hive give some data tell me how to data will be distributed

4.dF.filter(Id == 3000) how to pass this condition in data frame on values in dynamically?

5. Have you worked on multithreading in Scala and explain?

7.On what basis you will increase the mappers in Apache Sqoop?

8. What will you mention last value while you are importing for the first time in Sqoop?

9. How do you mention date for incremental last modified in Spark?

10. Let’s say you have created the partition for Bengaluru but you loaded Hyderabad data what is the validation we have to do in this case to make sure that there won’t be any errors?

11. How many reducers will be launched in distributed by in Spark?

12. How to delete sqoop job in simple command?

13.In which location sqoop job last value will be stored?

14. What are the default input and output formats in Hive?

15. Can you explain brief idea about distributing cache in Spark with an example?

16. Did you use Kafka/Flume in your project and explain in detail?

17.Difference between Parquet and ORC file formats?

Round 2:

1. Explain your previous project?

2. How do you handle incremental data in apache sqoop?

3. Which Optimization techniques are used in sqoop?

4. What are the different parameters you pass your spark job?

5. In case one task is taking more time how will you handle?

6. What is stages and task in spark and give a real-time scenario?

7.On what basis you set mappers in Sqoop?

8. How will you export the data to Oracle without putting much load in the table?

9. What is column family in Hbase?

10. Can you create a table without mentioning column family

11.The number of column families limits for one table?

12. How to schedule Spark jobs in your previous project?

13. Explain Spark architecture with a real-time based scenario?

Resilient Distributed Datasets(RDD) in Spark

RDD:

Resilient Distributed Datasets represents a collection of partitioned data elements that can be operated on in a parallel manner. RDD is the primary data abstraction mechanism in Spark and defined as an abstract class in Spark library it is similar to SCALA collection and it supports LAZY evaluation.

Characteristics of RDD:

1.Immutable :

RDD is an immutable data structure. Once created, it cannot be modified in-place. Basically, an operation that modifies RDD returns a new RDD.

2.Partitioned:

In RDD Data is split into partitions. These partitions are generally distributed across a cluster of nodes. When Spark is running on a single machine all the partitions are on that machine.

 

RDD Operations :

Applications in Spark process data using the same methods in RDD class. It referred to as operations

RDD operations are two types:

1.Transformations

2.Actions

 1.Transformations :

A transformation method of an RDD creates a new RDD by performing a computation on the source RDD.

RDD transformations are conceptually similar to SCALA collection methods.

The key difference is that the SCALA collection methods operate on data that can fit in the memory of a single machine, whereas RDD methods can operate on data distributed across a cluster of node RDD transformations are LAZY but SCALA collection methods are strict.

A) Map:

The map method is a higher order method that takes a function as input and applies it to each element in the source RDD to create a new RDD.

B) filter:

The filter method is a high order method that takes a Boolean function as input and applies it to each element in the source RDD to create a new RDD. A Boolean function takes an input and returns false or true. It returns a new RDD formed by selecting only those elements for which the input Boolean function returned true. The new RDD contains a subset of the elements in the original RDD.

c) flatMap:

This method is a higher order method that takes an input function in Spark, it returns a sequence for each input element passed to it. The flatMap method returns a new RDD formed by flattening this collection of the sequence.

D) mapPartitions :

It is a higher order method allows you to process data at a partition level. Instead of passing one element at a time to its input function, mapPartitions passes a partition in the form an iterator. The input function to the mapPartitions method takes an iterator as input and returns iterator as output.

E)Intersection:

Intersection method itakesRDD as input and returns a new RDD that contains the intersection of the element in the source RDD and the RDD passed to it as an input.

F)Union:

This method takes  RDD as input and returns a new RDD that contains a Union of the element in the resource RDD and the RDD passed to it as an input.

G)Subtract:

Subtract method takes RDD as input and returns a new RDD that contains elements in the source RDD but not in the input RDD.

 

H)Parallelize:

The Prallelized collections are created by calling Spark Context’s parallelize method on an existing collection in your driver program. The elements of the collection are copied to form a distributed data set that can be operated on in parallel.

 

I)Distinct:

Distinct method of an RDD returns a new RDD containing the distinct elements in the source RDD

 

J)Group By:

Group By is a higher order method it groups the elements of  RDD according to user-specified criteria. It takes as input a function that generates a key for each element in the source RDD. It is applicable to all the elements in the source RDD and returns an RDD of pairs.

 

K)Sort By:

The sortBy method is a higher order it returns RDD with sorted elements from the source RDD. It takes two input parameters. The first input is a function that generates a key for each element in the source RDD. The second input allows specifying ascending or descending order for sort.

 

L)Coalesce:

Coalesce method reduces the number of partitions in  RDD. It takes an integer input and returns new RDD with the specified number of partitions.

 

M)GroupByKey:

The GroupByKey method returns an RDD of pairs, where the first element in a pair is a key from the source RDD and the second element is a collection of all values that have the same key. It is the same as the groupBy method. The major difference is that groupBy is a higher order method that takes an input function that returns a key for each element in the source RDD. The groupByKey method operates in an RDD of key-value pairs.

N)ReduceByKey:

The higher-order reduceBy key method takes an associative binary operator as input and reduces values with the same key to a single value using specified binary operators.

2.Actions:

Actions are RDD methods that return a value to a driver program.

A)Collect:

The collect method returns the elements in the source RDD as an array. This method should be used with caution since it moves data from all the worker to the driver program.

B)Count:

This method returns a count of the elements in the source RDD.

C)Count By Value :

The countByValue method returns a count of each unique element in the source RDD. It returns an instance of the Map class containing each unique element and its count as a key-value pair.

D)First:

The first method returns the first element in the source RDD

E)Max:

The max method returns the largest element in  RDD

F)Min

The min method returns the smallest element in RDD

G)Top

The top method takes an integer N as input and returns an array containing the N largest elements in the source RDD.

H)Reduce

The high order reduces method aggregates the elements of the source RDD using an associative and commutative binary operator provided to it.

G)CountByKey

The countByKey methods count the occurrences of each unique key in the source RDD. It returns a Map of key count pairs.

Spark Streaming Twitter Example

Spark Streaming Twitter Example:

//Using Scala Program

package org . apache . spark . demo . streaming

import org . apache . spark . streaming . SparkContext._

import org . apache . spark . streaming . twitter._

import org . apache . spark . streaming . {Seconds, StreamingContext}

import org . apache . spark . SparkConf

object TwitterTags{

def main(args: Array[String]){

if(args.length < 5 ){

System. err. println (“Usage : Twitter Popular Tags <consumer key> <consumer secret>”+”<access token><access token secret>[<filters]”)

System.exit(1)

}

StreamingExamples . setStreamingLogLevels()

val Array ( consumerKey, consumerSecret, accessToken, accessTokenSecret ) = args.take(5)

val filters = args . takeRight(args.length – 5)

//Set the system properties so that Twitter 4j library used by twitter stream

//Can we use them to generate OAuth(Open Authentication) credentials

System . setProperty (“twitter4j . oauth . consumerKey”, consumerKey)

System . setProperty( ” twitter4j . oauth . consumerSecret”, consumerSecret)

System. setProperty(“twitter4j.oauth.accessToken”,accessToken)

System. setProperty (” twitter4j . oauth. accessTokenSecret”, accessTokenSecret)

val sparkConf = new SparkConf(). setAppName(“TwitterTags”)

val scc=new StreamingContext (sparkConf, Seconds(3))

val stream = TwitterUtils.createStream ( scc, None,filters)

val hashTags =stream. flatMap (status = > status. getText. split(” “).filter(_.startsWith(“#”)))

val topCounts = hashTags. map((_, 1).reduceByKeyAndWindow(_+_, Seconds(60)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

val topCounts1 = hashTags . map((_, 1). reduceByKeyAndWindow(_+_, Seconds(30)).map{case (topic, count)=>(count, topic)}.transform(_.sortByKey(false))

//Print Popular hashtags

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(30)

println (“\n Popular topics in last 60 seconds(%s total): “.format ( rdd . count()))

topList . foreach {case(count ,tag) => println(“%s(%s tweets)”.format(tag,count))

}})

topCounts . foreachRDD (rdd = > {

val topList  =  rdd. take(60)

println(“\n Popular topics in last 30 seconds(%s total): “.format(rdd. count()))

topList . foreach {case(count ,tag)=>println(“%s(%s tweets)”. format(tag,count)))

}})

 

scc . start()

scc . awaitTermination()

}

}

Spark Streaming Use Case

Spark Streaming Use Case with Explanation:

Using Scala streaming imports

import org. apache. spark. streaming. StreamingContext

import org. apache. spark. streaming. StreamingContext._

import org. apache. spark. streaming.dstream . DStream

import org. apache. spark. streaming.Duration

import org. apache. spark. streaming.Seconds

Spark Streaming Context :

This is also sets up underlying SparkContext that it will use to process data. It takes as input a batch interval specifying how often to process new data

socketTextStream()

We use socketTextStream() to create a DStream based on text data received on the local machine

Then we transform the DStream with filter() to get only the lines that contains error. Output operation print() to print some of the filtered lines.

Create a Streaming Context with a 1 – second batch size frin a SparkConf

val scc=new StreamingContext(conf, Seconds(1))

// Create DStream using data received after connecting to default port on the local machine

val lines = scc.socketTextStream(“localhost”, 9000)

//Filter our DStream for lines with “error”

var errorLines = lines.filter(_.contains(“error”))

//Print out the lines with errors

errorLines. print()

Above an example of converting a stream of lines to words, the flatMap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. This is shown in below figure Input DStreams are DStreams representing the stream of input data received from streaming process. In the above example of converting streaming of lines information words, lines was an input DStream as it represented the stream if data received from the server.

Every input DStream is associated with a Receiver object whether Java, Scala etc. Which receives the data from a source and stores it in Spark’s memory for processing. Here Spark Streaming provides two categories :

  1. Basic Sources: Sources directly available in the Streaming Context API example: file systems, socket connections
  2. Advanced Source: Sources indirectly available  like Flume, Kafka, Twitter etc. are available through extra utility classes.

Spark Streaming with Pictures

Spark Streaming:

Spark Streaming is a Spark’s module for real-time applications(Twitter tweets, statistics, page views). Lets user writes streaming applications using a very similar API to batch jobs. Spark Streaming is a distributed data stream processing framework. It makes it easy to develop distributed applications for processing live data streams in real time. It only provides a simple programming model but also enables an application to process high-velocity stream data. It also allows the combining of data streams and data for processing.

Spark Streaming is an extension of the core Spark API that enables scalable, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Flume, Kafka etc can be processed using complex algorithms expressed with high-level functions like map, reduce. Processed data can be pushed out to file systems and live dashboards.



Process Flow in Spark Streaming:

Spark Streaming receives live input data streams and divides the data into batches. Spark Engine will process the same data. Once processing is done Spark engine will generate the final stream of outputs in batches.

 

Streaming Context

Streaming Context , a class defined in the Spark Streaming library, is the main entry point into the Spark Streaming library. It allows a Spark Streaming application to connect to  a Spark Cluster.

Streaming Context provides methods for creating an instance of the data stream abstraction provided by Spark Streaming.

Every Spark Streaming application must create an instance of this class
Example:

import org. apache. spark._

import org. apache. spark. streaming._

val config = new Spark Conf(). setMaster (“spark : // host : port”) . setAppName (“Streaming app”)

val batch = 20

val ssc = new Streaming Context(conf, Seconds(batch)

The batch size can be as small as 500 milliseconds. The upper bound for the batch size is determined by the latency requirement of your application and the available memory in spark streaming.

Spark Most Typical Interview Questions List

Apache SPARK Interview Questions List

    1. Why  RDD resilient?
    2. Difference between Persist and Cache?
    3. Difference between Lineage and DAG?
    4. What is is narrow and wide transformations?
    5. What are Shared variables and it uses?
    6. How to define custom accumulator?
    7. If we have 50 GB memory and 100 GB data, how spark will process it?
    8. How to create UDFs in Spark?
    9. How to use hive UDFs in Spark?
    10. What are accumulators and broadcast variables?
    11. How to decide various parameter values in Spark – Submit?
    12. Difference between Coalesce and Re partition?
    13. Difference between RDD DATA FRAME and DATA SET. When to use one?
    14. What is Data Skew and  how to fix it?
    15. Why shouldn’t we use group by transformation in Spark?
    16. How to do Map side join in Spark?
    1. What Challenges  are faced in Spark Project?
    2. Use of map, flat map, map partition, for each for each partition ?
    3. What is Pair RDD ? When to use them?
    4. Performance optimization techniques in Spark?
    5. Difference between Cluster and Client mode?
    6. How to capture log in client mode and Cluster mode?
    7. What happens if a worker node is dead?
    8. What types of file format Spark supports? Which of them are most suitable for our organization need  ?
  1. Difference between reduceByKey() and groupByKey()?
  2. Difference between Spark 1 and Spark 2?
  3. How do you debug Spark jobs ?
  4. Difference between Var an Val ?
  5. What size of file do you use for development?
  6. How long will take to run your script in production ?
  7. Perform joins using RDD’s ?
  8. How do run your job in Spark?
  9. What is difference between Spark data frame and data set ?
  10. How data sets are type safe?
  11. What are sink processors?
  12. Lazy evaluation in Spark and its benefits?
  13. After Spark – Submit,  Whats’s process run behind of application?
  14. How to decide no.of stages in Spark job?

Above questions are related to Spark developers for experienced and beginners.

Spark Coding Test for Spark Developers

Here two programs for spark developers :

Question 1:

Mr. Bolt is in his 60’s and loves traveling. He recently visited a country famous for its pens. He has  ‘A’ grandchildren. He went to open Pen shop to purchase pens for them. The shop keeper showed him ‘a’ varieties of pens each variety containing ‘b[i ]’ pens.

He has to select city ‘c’ varieties of pens in a set in such a way that all the ‘A’  grandchildren get the same number of pens. If there are more than one such sets, the one with the minimum number of pens per child should be returned.

Inputs:

input 1: Value of ‘X’

input 2: Value of ‘c’

input 3: Value of ‘a’

input 4: Values in the array ‘b’

Output:

Return the minimum number of pens each grand children should get. Return -1 if no solution possible

Example:

Inputs:

input 1 : 5

input 2 : 3

input 3: 5

input 4 : {1,2,3,4,5}

 

Output: 2

Explanation : He can purchase pens in two sets {2,3,45} and {1,4,5}. The sum of each set is 10. Therefore, he will be able to give 2 pens to each of his grand children.

 

Question 2:

You just got a new job but your new office has different rule. They allow to take interval breaks in between tasks if there is no task available but the problem is that the tasks com randomly and sometimes it may be required  to do them simultaneously.

On your first day, you are given a list of tasks with their starting and ending time. Find out the total time you will get the breaks . Assurance ending time to be greater than  starting time.

Input:

Input 1 :  No.of tasks

Input 2 : 2-d array in for [10,11] representing starting and ending time period of the task

Output:

Your function must return an integer representing the total break time

Example:

input 1: 4

input 2: { (6,8)(1,9)(2,4)(4,7)}

Output: 0

Above Programs are related to Spark using SCALA, Java, Python and R languages for Spark developers.

Spark with Kafka Scenario for Spark Developers

Problems:

This scenario is related to real-time example in Spark with Kafka for Spark developers.

Problem Statement:

A Reality Television in a Game show has 7 players, the game for one complete day, the winner of the game is decided by the votes cast by the audience watching the show. At the end of the day, the winner is decided by certain criteria which are detailed below.

Rules to cast vote:

1)Each unique user(let us assume has an ID) can cast vote for the players

2)The user can cast, maximum one vote every 2 minutes he has the liberty casting different players each time

3)If a user casts more than one vote in spam of two minutes, the latest vote will overwrite the previous vote.

Calculation criteria for the winner:

1)Find the player who has maximum votes every minute of the day, the player with maximum votes for the minute will get one reward point.

2)At the end of the day player who has maximum reward points is the winner

 

Tasks:

1)Create a system which simulates user voting to a Kafka topic

2)Spark Streaming job should process the stream data and process the data based on the rules mentioned above

3)The reward points for the users should be stored in the persistent system

4)Provide a query to find the winner.

 

Apache Kafka is messaging and integration for Spark Streaming. Kafka acts as the central hub for real-time streams of data and is processed.

Above scenario asked for coding in Spark with Kafka. For Spark Developers will implement in SCALA or Python depends upon your programming knowledge. Nowadays the most important scenario in the IT industry for CCA – 175 also.