Huge Data Processing Applying Hadoop Cluster – Part 2

The previous post has been learning us how to set up and deploy a real Hadoop cluster. As I mentioned the target of this little project is to port a number of huge log files such as Twitter logs into a cluster for next processing. So at this moment we need to put a number of huge size files into the HDFS we have created previously.

The following code makes us able to pull source files within a certain folder and merge them all into one files during putting them into HDFS:

package com.hexican.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; 
import java.io.IOException; 
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
 
public class GetLogs {
    public static void main(String[] args) throws IOException {
        if(args.length != 2) {
            System.out.println("Usage: GetLogs [FolderName] [MergedFile]");
            System.exit(1);
        }
 
        Configuration conf = new Configuration();
        FileSystem hdfs = FileSystem.get(conf);
        FileSystem local = FileSystem.getLocal(conf);
        int filesProcessed = 0;
 
        Path inputDir = new Path(args[0]);
        Path hdfsFile = new Path(args[1]);
 
        try {
            FileStatus[] inputFiles = local.listStatus(inputDir);
            FSDataOutputStream out = hdfs.create(hdfsFile);
            for(int i = 0; i < inputFiles.length; i++) {
                if(!inputFiles[i].isDir()) {
                    System.out.println("\t Adding and Merging... <" + inputFiles[i].getPath().getName() + ">");
                    FSDataInputStream in = local.open(inputFiles[i].getPath());
 
                    byte buffer[] = new byte[256];
                    int bytesRead = 0;
                    while ((bytesRead = in.read(buffer)) > 0) {
                        out.write(buffer, 0, bytesRead);
                    }
                    filesProcessed++;
                    in.close();
                }
            }
            out.close();
            System.out.println("\n " + filesProcessed + " file successfully added  and merged into [" + hdfsFile.getName() + "].");
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }
}

Create GetLogs.java and save it into GetLogs/src folder. You have to created the following folders:

Now you can compile it and create getlogs.jar file. Just run the following commands:

The following is the command you need to run for running GetLogs:

./hadoop jar ../GetLogs.jar come.hexican.hadoop ~/twitter /twitterLogs/allTweets.log

[SourceFolder] located on your local machine. This is the place you have been putting your source files to be processed. [MergedFile] is a huge size file that placed in HDFS and we use it for map-reduce process.

The following shows how to use GetLogs class for loading files into HDFS and merging them simultaneously.

I’ve added 1.6 MB files from Twitter folder:

After running GetLogs, we have all log files merged in to a single file:

In next step we will find spams by map-reducing merged imported log files.

This entry was posted in Cloud Computing, Java, Linux. Bookmark the permalink.

2 Responses to Huge Data Processing Applying Hadoop Cluster – Part 2

  1. Ara says:

    Or you could simply do a hadoop fs -copyFromLocal and then a map/reduce job with no mapper (-mapper /bin/cat) and then a reducer with number of reducers parameter set to 1 (-D mapred.reduce.tasks=1) so that it creates just one output file. Very simple one liner.

    But creating 1 file from multiple files is a bad idea. Setting reducer to 1 means if you have 100 machines you only use 1 machine for your processing and you fetch all the files from your 100 node cluster to this single machine. Your application does the same thing too. The correct and scalable way to do this is to use something like Cloudera Flume to pump data continuously to hadoop. Flume has a hierarchical architecture. You can have 100 flume source nodes that create 100 files but it can sink them all to one stream. But of course this for real-time data intensive applications and doing a copyFromLocal and then reducer=1 should be good enough for 90% of tasks 🙂 It’s very important to remember whether you send data to code or code to data. Hadoop is all about sending code to the data, not fetching all the data from the cluster to one node. This is the opposite of what’s common in j2ee and other such server side frameworks.

  2. admin says:

    This is a great opportunity that I still receive your guidelines.
    This is a project that supposed to be a spam detector applying Hadoop and its MapReducers for finding spam tweets or finding most common subjects. At first, I have to collect tweets from multiple machines and store them into the HDFS. That is why I’ve collected and merged files into a single huge file.
    Actually I didn’t have any idea about the approach you’ve mentioned. You right my solution is sending data to code just like conventional methods. I am working on your idea.
    Thank you and best wishes.

Leave a Reply