Huge Data Processing Applying Hadoop Cluster – Part 4

Finely we are getting close to the project target which was finding spam tweets in twitter logs and find out if the spams occur more in the advertisements or not.

In the previous posts from this series we learned how to setup and deploy a hadoop cluster. Then we developed a GetLogs command for hadoop, for importing files from nodes and creating a merged one in HDFS. We also learned how to count occurence of each tweet in the log file. In current post we will learn how to count spam words such as ‘F’ words. We will need also to count all words of the documents for an overall comparison through different situations.

Preparing
First of all we need to have a prepared HDFS. Run the following command to import all tweets placed in the ‘input’ folder into the HDFS. You don’t need this if you made it in the previous posts.

bin/hadoop fs -mkdir /tweets
bin/hadoop jar GetLogs/getlogs.jar com.hexican.hadoop.GetLogs
                             input /tweets/giantTweet.txt

WordCount
WordCount should be the simplest hadoop command. It is also the base of all MapReduce classes we have in this series.

package com.hexican.hadoop;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class WordCount {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {


            Set<String> nullWords = new HashSet<String>();
            nullWords.add("null");
            // add whatever you want to be rejected

            StringTokenizer itr = new StringTokenizer(value.toString()); // Tokenizing 

            while (itr.hasMoreTokens()) {
                String nextToken = itr.nextToken(); // Cast token into a String variable.
                if (nullWords.contains(nextToken.toLowerCase())){ // ignore null words.
                    continue;
                }
                word.set(nextToken); // Moving word word into a Text object
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {


            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result); // Output count of each token
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <inFile> <outFile>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Just the same as GetLogs and TweetCount make it ready by creating WordCount, WordCount/src and WordCount/classes folders. Then run the following commands to build the jar file:

javac -classpath hadoop-core-0.20.203.0.jar:lib/commons-cli-1.2.jar -d WordCount/classes/ WordCount/src/WordCount.java
jar -cvf WordCount/wordcount.jar -C WordCount/classes/ .

Note: I’ve assumed you run eveything in the hadoop folder.
Now you should have wordcount.jar in the WordCount folder.

We need another class for counting spam words. I called it CountSpam. This is just antoher simple MapReduce code.

package com.hexican.hadoop;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class SpamCount {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();



        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString()); /* Tokenizing */
            Set spams = new HashSet<String>();

            //            Spam words,	    Just add whatever you want.
            spams.add("-online");
            spams.add("4u");
            spams.add("adipex");
            spams.add("advicer");
            spams.add("ass");
            spams.add("baccarrat");
            spams.add("blackjack");
            spams.add("bllogspot");
            spams.add("booker");
            spams.add("byob");
            spams.add("car-rental-e-site");
            spams.add("car-rentals-e-site");
            spams.add("carisoprodol");
            spams.add("casino");
            spams.add("casinos");
            spams.add("celebrity");
 	    spams.add("chatroom");
            spams.add("cialis");
            spams.add("coolcoolhu");
            spams.add("coolhu");
            spams.add("credit-card-debt");
            spams.add("credit-report-4u");
            spams.add("cute");
            spams.add("cutes");
	    spams.add("cwas");
            spams.add("cyclen");
            spams.add("cyclobenzaprine");
            spams.add("dating-e-site");
            spams.add("dating");
            spams.add("date");
            spams.add("day-trading");
            spams.add("debt-consolidation");
            spams.add("debt-consolidation-consultant");
            spams.add("discreetordering");
            spams.add("duty-free");
            spams.add("dutyfree");
            spams.add("equityloans");
            spams.add("fioricet");
            spams.add("flowers-leading-site");
            spams.add("freenet-shopping");
            spams.add("freenet");
            spams.add("fuc");
            spams.add("fuck");
            spams.add("gambling");
            spams.add("girl");
            spams.add("girls");
            spams.add("hair-loss");
            spams.add("health-insurancedeals-4u");
            spams.add("homeequityloans");
            spams.add("homefinance");
            spams.add("holdem");
            spams.add("holdempoker");
            spams.add("holdemsoftware");
            spams.add("holdemtexasturbowilson");
            spams.add("hotel-dealse-site");
            spams.add("hotele-site");
            spams.add("hotelse-site");
            spams.add("incest");
            spams.add("insurancedeals-4u");
            spams.add("jrcreations");
            spams.add("levitra");
            spams.add("macinstruct");
            spams.add("mortgage-4-u");
            spams.add("mortgagequotes");
            spams.add("online-gambling");
            spams.add("onlinegambling-4u");
            spams.add("ottawavalleyag");
            spams.add("ownsthis");
            spams.add("palm-texas-holdem-game");
            spams.add("paxil");
            spams.add("penis");
            spams.add("pharmacy");
            spams.add("phentermine");
            spams.add("poker-chip");
            spams.add("poze");
            spams.add("pussy");
            spams.add("punk");
            spams.add("rental-car-e-site");
            spams.add("ringtones");
            spams.add("roulette");
            spams.add("shemale");
            spams.add("shoes");
            spams.add("slot-machine");
            spams.add("strip");
            spams.add("strios");
            spams.add("texas-holdem");
            spams.add("thorcarlson");
            spams.add("top-site");
            spams.add("top-e-site");
            spams.add("tramadol");
            spams.add("trim-spa");
            spams.add("ultram");
            spams.add("valeofglamorganconservatives");
            spams.add("viagra");
            spams.add("vioxx");
            spams.add("xanax");
            spams.add("zolus");
            spams.add("کنسرت");
            spams.add("missing-and-abused-kids");
            spams.add("s p a n k e");
            spams.add("fook");
            Long totalWords = 0L;
            while (itr.hasMoreTokens()) {
                totalWords++;
                String nextToken = itr.nextToken(); // Cast token into a String variable.

                if (spams.contains(nextToken.toLowerCase())){  // collect spams.
                    word.set(nextToken.toLowerCase()); // Moving spam word into a Text object
                    context.write(word, one);
                }
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {


            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result); // Output count of each token
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: spamcount <inFile> <outFile>");
            System.exit(2);
        }
        Job job = new Job(conf, "spam count");
        job.setJarByClass(SpamCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Do the same as previous classes and build spamcount.jar.

javac -classpath hadoop-core-0.20.203.0.jar:lib/commons-cli-1.2.jar 
                    -d SpamCount/classes/ SpamCount/src/SpamCount.java
jar -cvf SpamCount/spamcount.jar -C SpamCount/classes/ .

Running MapReduce Commands
We’ve created a big merged file from all tweets. The file placed in the following path:

/tweets/giantTweet.txt

We use it as input file for next two MapReduce commands that we provided:

bin/hadoop jar WordCount/wordcount.jar com.hexican.hadoop.WordCount 
                          /tweets/giantTweet.txt /tweets/giantTweet_wordCounted
bin/hadoop jar SpamCount/spamcount.jar com.hexican.hadoop.SpamCount 
                         /tweets/giantTweet.txt /tweets/giantTweet_spamCounted

The result could be found in the following paths in HDFS:

/tweets/giantTweet_wordCounted/part-r-00000
/tweets/giantTweet_spamCounted/part-r-00000

At this moment I need to count them all. So I get them by the following hadoop file system command:

bin/hadoop fs -get /tweets/giantTweet_wordCounted/part-r-00000 result/giantTweet_wc.txt
bin/hadoop fs -get /tweets/giantTweet_spamCounted/part-r-00000 result/giantTweet_sc.txt
bin/hadoop fs -get /tweets/giantTweet.txt result/.

I assumed you’ve created the ‘result’ folder in the hadoop folder. That is the place we put MapReduced results.

Have a look into them. The giantTweet_wc.txt consists all words each one counted. The giantTweet_wc.txt also consits all spam words occured in the giantTweet.txt each one counted. Also you can see giantTweet.txt consists all tweets merged into itself.

Count their lines applyin the following Linux command:

wc -l [fileName]

Now you can tune the TweetCount by giving the proper value for finding advertisements. I tried it with 3, 10, 15 and 20 as the minimum occurence of each tweet. I assumed repeated tweets are some kind of advertisement.

bin/hadoop jar TweetCount/tweetcount.jar com.hexican.hadoop.TweetCount /tweets/giantTweet.txt /tweets/ads

Also repeat counting words and spams by the following commands for TweetCount results by the following commands:

bin/hadoop jar WordCount/wordcount.jar com.hexican.hadoop.WordCount 
                             /tweets/ads/part-r-00000 /tweets/ads/wordCounted
bin/hadoop jar SpamCount/spamcount.jar com.hexican.hadoop.SpamCount
                           /tweets/ads/part-r-00000 /tweets/ads/spamCounted

Just get the results and count the lines as what you did for giantTweet.txt.

 
bin/hadoop fs -get /tweets/ads/part-r-00000 result/ads.txt
bin/hadoop fs -get /tweets/ads/wordCounted/part-r-00000 result/ads_wc.txt
bin/hadoop fs -get /tweets/ads/spamCounted/part-r-00000 result/ads_sc.txt
wc -l result/ads.txt
wc -l result/ads_wc.txt
wc -l result/ads_sc.txt

The following table shows spam words almost are within the massive sent tweets such as advertisements.

Occ=1 Occ>3 Occ>10 Occ>15 Occ>20
Tweets No. 54431 1164 354 230 159
Words No. 203010 8034 2674 1857 1422
Spams No. 1984 1289 1265 1152 1051

The following chart has painted by the above table data:

Note: The Spam No. row consists total number of spam words. The following is the result of counting spam words through content of tweets that each one repeated more than 20 times (The red one cell.):

Date	1  
Fuc	1
PUSSY	1
ass	1
cute	3
date	2
girl	1
missing-and-abused-kids	1
punk	2

Then I counted each spam word exactly how many times repeated in the result of its specific TweetCount result.

So the real occurrence of each one is:

Spam MapReduce Repeated Occurrence
Date 1 24
Fuc 1 188
PUSSY 1 30
ass 1 21
cute 3 96
date 1 21
date 1 22
girl 1 191
missing-and-abused-kids 1 213
punk 1 26
punk 1 27
SUM 1051

The “cute” has repeated 3 times in a single tweet.

cat ads.txt | grep ' cute '
       The saying "U cute 2 be" is soooooooo funny 2 me! U cute 2 be big, or u cute 2 be dark skin....lma	96

As you can see the “punk” has repeated in two different tweets:

cat ads.txt | grep ' punk '
	Get to it! LOL RT @cesleyb: Im about to show this phony punk a fook!! RT @mrsdarian: @cesleyb you're a FOOL! Smh..	26
	Get to it! LOL RT @cesleyb: Im about to show this phony punk a fook!! RT @mrsdarian: @cesleyb you're a FOOL! Smh..	27

Seems more repeated tweets have more spam words! We assumed repeated tweets are almost advertisement.

OK, I am going to optimize the procedures and make them all more automated.

This entry was posted in Cloud Computing, Java, Software Engineering. Bookmark the permalink.

Leave a Reply