Huge Data Processing Applying Hadoop Cluster – Part 3

MapReduce
In the previous posts we have deployed a Hadoop cluster, then we’ve developed a Java class which can be used as an additional Hadoop command for gathering Tweets from different log files and merge them all into a single file in HDFS.
Now we need to process the huge file applying MapReduce algorithm. MapReduce is about manipulating key/value pairs. In this post we try to understand how MapReduce works. Also we will develop a new command for Hadoop to be used for detecting must repeated tweets. We assume that a spam almost occur frequently. So we develop a MapReducer for detecting spams. Essentially we have defined two approach for detecting spams. This is just one and another is looking for reserver words.

Actually MapReduce is just a data processing method. The most interesting thing with MapReduce is the ability of running it across multiple computers (node).

MapReduce has two main basic particle called “mappers” and “reducers”. It is possible to transfer process to each node just by manipulating configurations. This gives a brilliant scalability to MapReduce model. Consider how we’ve used MapReduce on a simple statement.

How MapReduce Works
MapReduce uses “lists” and key/value pairs as data primitives. The keys and values are almost strings. The input to our application is tweets as we talked. Tweets are almost key/values. The user name of one who has twitted can be used as the key. This gives us a great opportunity to chose the proper key regarding our business policy.

1. In our project tweets structured as a list of key/value pairs, list(). The input format for processing tweets through the large file we have created is: list( , ).

2. The list of (key/value) pairs is broken up and each individual (), is processed by calling the map function of the mapper. The mapper we will develop here transforms each into a list of . For spam detecting, our mapper takes . It’s output should be a list of . The output also can be simpler. The counts will be aggregated in a later stage, so we can output a list of with repeated pairs and let it be aggregated later. So we can have both of the following results for first step:
<"Hey, I am a just an annoying spam", 3>
OR
<"Hey, I am a just an annoying spam", 1>
<"Hey, I am a just an annoying spam", 1>
<"Hey, I am a just an annoying spam", 1>

The second approach is much easier to develop while the first approach has some performance benefits. Regardless of the approach we chose we get able to calculate any tweet occurrence and this is what we were looking for.

3. The output of all mappers are aggregated into one giant list of pairs. All pairs sharing the same k2 are grouped together into a new (key/value) pair, .

The following pseudo-code is map and reduce functions for a word counter:

map(String filename, String document) {
    List<String> tokenizedList = tokenize(document);
    for(token: tokenizedList) {
        doSomething ((String)token, (Integer) 1);
    }
}

reduce(String token, List<Integer> values) {
	Integer sum=0;
	for(value:values){
		sum = sum + value;
	}
	doSomething ((String) token, (Integer) sum);
}

How Could We Recognize Spam Tweets?
The following image is a snapshot of a twitter log file. This is possible to get files applying Twitter API.

The tweets log file which I am going to process are two huge size text files. This is really difficult to find out what is a spam or not when you traverse it. The log files are really bizarre. As I mentioned I assume the tweets containing some special words such as Viagra, are spams. But this is not enough. We almost receive a lot of advertising tweets with no special words. I use a simple formula for finding the second group. I believe if there is a tweet that has been sending frequently should be a spam. The following code is simple sample that detects spams by processing the giant log file that we’ve created by merging Twitter log files.

package com.hexican.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TweetCount {

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      StringTokenizer itr = new StringTokenizer(value.toString(),"\n\r\f"); // Tokenize using newline

      while (itr.hasMoreTokens()) {
        String nextToken = itr.nextToken(); // Cast token into a String variable.
        if (nextToken.trim().length() <= 80 ){  // Ignoring empty tweets.
		continue;
	}
	
        word.set(nextToken.substring(60, nextToken.length()-1)); // Moving tweet content int a Text object
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      if (sum>10){  // Adding whatever repeated more than 10 times as a spam
      	result.set(sum);
      	context.write(key, result); 
      }
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: tweetcount [inFile] [outFile]");
      System.exit(2);
    }
    Job job = new Job(conf, "Tweet (Spam) count");
    job.setJarByClass(TweetCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

This is just a modified version of WordCount, a Hadoop sample code.

To use it just go to the hadoop folder in Aplha machine (NameNode) and do the following steps:
Create TweetCount, TweetCount/src and TweetCount/classes folders just the same as GetLogs code.
Then create TweetCount/src/TweetCount.java applying above code.
Now we need to compile TweetCount.

The following command creates a jar file from TweetCount class.

And finally we can run TweetCount over the giant log file from previous post.

The result is interesting! Seems our code catches spam tweets very well:

This can be optimized to work better. I next post we will add a reserve word based spam detecting method for using more Hadoop’s data processing power.

This entry was posted in Cloud Computing, Java, Software Engineering. Bookmark the permalink.

Leave a Reply