What is MaprReduce? Twitter example using Java in MapReduce | Big Data | Hadoop | Java




What is MapReduce?

MapReduce may be a processing technique and a program model for distributed computing supported java. The MapReduce algorithm contains two important jobs, namely Map and Reduce. The map takes a group of knowledge and converts it into another set of knowledge, where individual elements are weakened into tuples (key/value pairs). Secondly, reduce the job, which takes the output from a map as an input and combines those data tuples into a smaller set of tuples. because the sequence of the name MapReduce implies, the reduce task is usually performed after the map job.

The major advantage of MapReduce is that it’s easy to scale processing over multiple computing nodes. Under the MapReduce model, the info processing primitives are called mappers and reducers. Decomposing a knowledge processing application into mappers and reducers is usually nontrivial. But, once we write an application within the MapReduce form, scaling the appliance to run over hundreds, thousands, or maybe tens of thousands of machines during a cluster is simply a configuration change. this easy scalability is what has attracted many programmers to use the MapReduce model.

The Algorithm

  • Generally, the MapReduce paradigm is predicated on sending the pc to where the info resides!
  • MapReduce program performs in three stages, namely map stage, shuffle stage and reduce stage.

o Map stage − The map or mapper’s job is to process the input file. Generally, the input file is within the sort of file or directory and is stored within the Hadoop filing system ( HDFS) The input data is passed to the mapper function line by line. The mapper processes the info and creates several small chunks of knowledge.

o Reduce stage − This stage is that the combination of the Shuffle stage and therefore the Reduce stage. The Reducer’s job is to process the info that comes from the mapper. After processing, it produces a replacement set of output, which can be stored within the HDFS.

  • During a MapReduce job, Hadoop sends the Map and Reduce tasks to the acceptable servers within the cluster.
  • The framework manages all the small print of data-passing like issuing tasks, verifying task completion, and copying data around the cluster between the nodes.
  • Most of the computing takes place on nodes with data on local disks that lower the network traffic.
  • After completion of the given tasks, the cluster collects and reduces the info to make an appropriate result, and sends it back to the Hadoop server.

 

Inputs and Outputs (Java Perspective)




The MapReduce framework operates on pairs, that is, the framework views the input to the work as a group of pairs and produces a group of pairs because of the output of the work, conceivably of various types.

The key and therefore the value classes should be in a serialized manner by the framework and hence, got to implement the Writable interface. Additionally, the key classes need to implement the Writable-Comparable interface to facilitate sorting by the framework. Input and Output ports of a MapReduce job − (Input) → map → → reduce → (Output).

Input-Output

Map list ()

Reduce list ()

Terminology

  • PayLoad − Applications implement the Map and therefore the Reduce functions, and form the core of the work.
  • Mapper − Mapper maps the input key/value pairs to a group of intermediate key/value pairs.
  • NamedNode − Node that manages the Hadoop Distributed filing system ( HDFS)
  • DataNode − Node where data is presented beforehand before any processing takes place.
  • MasterNode − Node where JobTracker runs and which receives job requests from clients.
  • SlaveNode − Node where Map and the lower program runs.
  • JobTracker − Schedules jobs and tracks the allot jobs to Task tracker.
  • Task Tracker − Tracks the task and announce status to JobTracker.
  • Job − A program is an implementation of a Mapper and Reducer across a dataset.
  • Task − An execution of a Mapper or a Reducer on a slice of knowledge.
  • Task Attempt − a specific instance of an effort to execute a task on a SlaveNode.

When we write applications to operation such bulk data,

  • They will take tons of your time to execute.
  • There are going to be important network traffic once we move data from source to network server then on.

To solve these problems, we have a MapReduce framework.

Example Program

Given below is the program to the sample data using the MapReduce framework in java.

package Hadoop;

import java.util.*;

import java.io.IOException;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapred.*;

import org.apache.hadoop.util.*;

public class ProcessUnits {

   //Mapper class

    public static class E_Mapper extends MapReduceBase implements

   Mapper<LongWritable ,/*Input key Type */

   Text,                /*Input value Type*/

   Text,                /*Output key Type*/

   IntWritable>        /*Output value Type*/

   {

      //Map function

      public void map(LongWritable key, Text value,

      OutputCollector<Text, IntWritable> output,  

      Reporter reporter) throws IOException {

         String line = value.toString();

         String lasttoken = null;

         StringTokenizer s = new StringTokenizer(line,"\t");

         String year = s.nextToken();

         while(s.hasMoreTokens()) {

            lasttoken = s.nextToken();

         }

         int avgprice = Integer.parseInt(lasttoken);

         output.collect(new Text(year), new IntWritable(avgprice));

      }

   }

   //Reducer class

   public static class E_Reduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable >

{

      //Reduce function

      public void reduce( Text key, Iterator <IntWritable> values,

      OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {

         int maxavg = 30;

         int val = Integer.MIN_VALUE;

           

         while (values.hasNext()) {

            if((val = values.next().get())>maxavg) {

               output.collect(key, new IntWritable(val));

            }

         }

      }

   }




   //Main function

   public static void main(String args[])throws Exception {

      JobConf conf = new JobConf(ProcessUnits.class);

      conf.setJobName("max_eletricityunits");

      conf.setOutputKeyClass(Text.class);

      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(E_Mapper.class);

      conf.setCombinerClass(E_Reduce.class);

      conf.setReducerClass(E_Reduce.class);

      conf.setInputFormat(TextInputFormat.class);

      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));

      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);

   }

}

Save the above program as ProcessUnits.java. The compilation and execution of the program are explained below.

Compilation and Execution of Process Units Program

Let us assume we are within the home directory of a Hadoop user (e.g. /home/Hadoop).

Follow the steps given below to compile and execute the above java program.




Step 1

The following command is to make a directory to store the compiled java classes.

$ mkdir units

Step 2

Download Hadoop-core-1.2.1.jar, which is employed to compile and execute the MapReduce program. Visit the subsequent link mvnrepository.com to download the jar. allow us to assume the downloaded folder is /home/hadoop/.

Step 3

The following commands are used for compiling the operation Units.java program and creating a jar for the program.

$ javac -classpath Hadoop-core-1.2.1.jar -d units ProcessUnits.java

$ jar -cvf units.jar -C units/ .

Step 4

The following command is employed to make an input directory in HDFS.

$HADOOP_HOME/bin/hadoop fs -mkdir input_dir

Step 5

The following command is employed to repeat the input data named sample.txtin the input directory of HDFS.

$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir

Step 6

The following command is employed to verify the files within the input directory.

$HADOOP_HOME/bin/hadoop fs -ls input_dir/

Step 7

The following command is employed to run the Eleunit_max application by taking the input files from the input directory.

$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir

Wait for a short time until the file is executed. After execution, as shown below, the output will contain the number of input splits, the amount of Map tasks, the number of reducer tasks, etc.

INFO MapReduce.Job: Job job_1414748220717_0002
completed successfully
14/10/31 06:02:52
INFO MapReduce.Job: Counters: 49
 filing system Counters

FILE: Number of bytes read = 61

FILE: Number of bytes written = 279400

FILE: Number of read operations = 0

FILE: Number of huge read operations = 0

FILE: Number of write operations = 0

HDFS: Number of bytes read = 546

HDFS: Number of bytes written = 40

HDFS: Number of read operations = 9

HDFS: Number of huge read operations = 0

HDFS: Number of bytes write operations = 2 Job Counters

 Launched map tasks = 2

 Launched reduce tasks = 1

 Data-local map tasks = 2

 Total time consumed by all maps in occupied slots (ms) = 146137

 Total time consumed by all reduces in occupied slots (ms) = 441

 Total time consumed by all map tasks (ms) = 14613

 Total time consumed by all lower tasks (ms) = 44120

 Total vcore-seconds grip by all map tasks = 146137

 Total vcore-seconds grip by all lower tasks = 44120

 Total megabyte-seconds grip by all map tasks = 149644288

 Total megabyte-seconds was taken by all lower tasks = 45178880

Map-Reduce Framework

 Map input records = 5

 Map output records = 5

 Map output bytes = 45

 Map output materialized bytes = 67

 Input split bytes = 208

 Combine input records = 5

 Combine output records = 5

 Reduce input groups = 5

 Reduce shuffle bytes = 6

 Reduce input records = 5

 Reduce output records = 5

 Spilled Records = 10

 Shuffled Maps = 2

 Failed Shuffles = 0

 Merged Map outputs = 2

 GC time elapsed (ms) = 948

 CPU time spent (ms) = 5160

 Physical memory (bytes) snapshot = 47749120

 virtual storage (bytes) snapshot = 2899349504

 Total committed heap usage (bytes) = 277684224

File Output Format Counters

 Bytes Written = 40

Step 8
The following command is employed to verify the resultant files within the output folder.

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

Step 9

The following command is employed to ascertain the output in Part-00000 file. This file is generated by HDFS.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000

Below is that the output generated by the MapReduce program.

1981 34

1984 40

1985 45

Step 10

The following command is employed to repeat the output folder from HDFS to the local filing system for analyzing.

$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop

 

Using Map Reduce to seek out the twitter trends

A few weeks back while preparing for our presentation for agileNCR 2013, Sanchit and that I started performing on a stimulating problem statement to unravel using MapReduce.

We thought of applying the MapReduce algorithm to seek out the trends on Twitter.

A Tweet during a twitter can have hashTags (#helloTwitter) and a particular hashTag used most number of times in tweets globally is claimed to possess the highest trend. More details are often found here.

This data is large and also keeps on increasing, so processing it in the traditional manner wouldn’t be possible.

Hence we might require Hadoop to assist us to solve this problem.

Twitter uses Cassandra to store the info in a key-value format. Let’s assume for simplicity that the key-value pair for tweet data looks something like this < twitterHandle, Tweet >.

So, so as to seek out the highest n trends during a given snapshot, we might need to:

  1. Procedure all Tweets and parse out tokens with HashTags.
  2. Count all the hashTags.
  3. determine top n hashtags by sorting them.

So, the input file for our Mapper might be a file generated out of Values of the Key-Value of.

It would look something like this :

1  I love #agileNCR.

2  Attenindg sesssion on #hadoop.

3  .....

4  ....

Click here to ascertain the sample input data.

Assumption: together tweet is often of maximum 140 characters we will store the info in such a format that every line is a new tweet.

Step 1: Mapper

Mapper while tokenizing would collect only the tokens which start by #.

@Override

private public void map(LongWritable key, Text value, Context context) throws IOException,

InterruptedException {

balkline = value.toString();

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

String token = tokenizer.nextToken();

if(token.startsWith("#")){

word.set(token.toLowerCase());

// Context here is sort of a multi set which allocates value "one" for key "word".

context.write(word, one);

}

}

Step 2: Combiner

Combiner would combine all an equivalent hashtags together.

Step3: Reducer

@Override

protected void reduce(Text key, Iterable values, Context context)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritableval : values) {

sum += val.get();

}

context.write(key, new IntWritable(sum));

}

The reducer will generate the output something like this :

1     #agileNCR 21

2    #hello 4

3   #xomato 88

4   #zo 36

The problem with this output is that it’s sorted by the key values, as within the mapping phase the shuffle and type step sorts them alphabetically on the idea of keys.

To get the specified out of sorting it on the idea of the number of occurrences of every hashTag, we might need them to be sorted on the idea of values.

So we decided to pass this output to the second Map-Reduce job which can swap the key and therefore the value then perform the sort.

Hence :

Step4: Mapper 2

Tokenize the input and put 2nd token(the number) as key and 1st token (hashtag) as value.

While mapping it’ll shuffle and type on the idea of the key.

However, the sorting of the keys by default is in ascending order and that we wouldn’t get out the desired list.

So, we might get to use a Comparator.

We would get to use LongWritable.ReverseComparator.

@Override

private public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

balkline = value.toString(); // agilencr 4

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

String token = tokenizer.nextToken();

// Context here is sort of a multiset that allocates value "one" for key "word".

context.write(new LongWritable(Long.parseLong(tokenizer.nextToken().toString())), new Text(token));

}

}

Step5: Reducer 2

In reducer, we’ll swap back the result again.

@Override

public void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

balkline = value.toString(); // agilencr 4

StringTokenizer tokenizer = new StringTokenizer(line);

while (tokenizer.hasMoreTokens()) {

String token = tokenizer.nextToken();

// Context here is sort of a multiset that allocates value "one" for key "word".

context.write(new LongWritable(Long.parseLong(tokenizer.nextToken().toString())),

new Text(token));

}

}

 

So that we get the specified output like this:

1 #xomato 88

2 #zo 36

3 #agileNCR 21

4 #hello 4