Wednesday, December 8, 2010

Java templates/stubs for Mapper Reducer and Wrapper classes

A lot of times I want to test a concept in Hadoop that requires me to quickly create a small job and run it. Every job contains minimum 3 components

  • Mapper Class
  • Reducer Class
  • Wrapper Class
The following are the templates I use to generate empty templates, just replace variable  <YOUNAME> with your class name 


-----------------------------------------------------------------------------------------------------------------------------------
MAPPER
-----------------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

/* In case you are using Multiple outputs */
//import org.apache.hadoop.io.NullWritable;
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Mapper extends Mapper<LongWritable, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    @Override
    public void setup(Mapper.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);
    }

    @Override
    public void map(LongWritable key, Text values, Context context)
            throws IOException, InterruptedException {
    }

    @Override
    public void cleanup (Mapper.Context context)throws IOException, InterruptedException {
        
        /* In case you are using Multiple outputs */
        //contextMulti.close();
    }
}

-----------------------------------------------------------------------------------------------------------------------------------
REDUCER
-----------------------------------------------------------------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/* In case you are using Multiple outputs */
//import org.apache.hadoop.io.NullWritable;
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Reducer extends Reducer<Text, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    @Override
    public void setup(Reducer.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
    }

    @Override
    public void cleanup(Reducer.Context context) {
        /* In case you are using Multiple outputs */
        //contextMulti.close();
    }
}
-----------------------------------------------------------------------------------------------------------------------------------
WRAPPER
This class uses following 2 classes 
https://sites.google.com/site/hadoopandhive/home/ExtendedFileUtil.java?attredirects=0&d=1
https://sites.google.com/site/hadoopandhive/home/StringUtil.java?attredirects=0&d=1
-----------------------------------------------------------------------------------------------------------------------------------



import StringUtil;

import ExtendedFileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import java.io.IOException;
import java.text.ParseException;


public class <YOUNAME> extends Configured implements Tool, Constants {
    private Configuration conf = null;
    private Job job = null;
    private String inputDirList = null;
    private String outputDir = null;
    private String[] filesToProcess = null;
    private int totalReducers = 0;
    private int jobRes = 0;
    private ExtendedFileUtil fileUtil = new ExtendedFileUtil();


    public static void main(String[] args) throws Exception {
        <YOUNAME> ob = new <YOUNAME>();
        int jobRes = ToolRunner.run(ob, args);        
    }


    public int run(String[] args)
            throws ClassNotFoundException, IOException, InterruptedException, ParseException {
        jobRes = readCmdArgs(args);
        if (jobRes == 0) {
            jobRes = readConfig();
        }
        if (jobRes == 0) {
            jobRes = runMrJob();
        }
        return jobRes;
    }


    private int readCmdArgs(String[] args) {
        if (args.length == 2) {
            inputDirList = args[0];
            outputDir = args[1];
        } else {
            printUsage();
            System.exit(1);
        }
        return 0;
    }


    private int readConfig() throws IOException, InterruptedException, ClassNotFoundException {
        conf = new Configuration();
        //conf.set("SET_NEW_CONFIG_NAME", SET_NEW_CONFIG_VALUE);
        job = new Job(conf);
        if ((job.getJar() == null) || (job.getJar() == "")) {
            job.setJarByClass(<YOUNAME>.class);
        }
        return 0;
    }


    private int runMrJob()
            throws IOException, InterruptedException, ClassNotFoundException {
        filesToProcess = fileUtil.getFilesOnly(inputDirList, true);
        job.setJobName("<YOUNAME>");
        TextInputFormat.addInputPaths(job, StringUtil.arrayToString(filesToProcess, ","));
        TextOutputFormat.setOutputPath(job, new Path(outputDir));
        System.out.println("Input Dir: " + inputDirList);
        System.out.println("Output Dir: " + outputDir);


        job.setMapperClass(<YOUNAME>Mapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);


        job.setReducerClass(<YOUNAME>Reducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        totalReducers = Math.round((fileUtil.size(inputDirList) / 134217728) * 0.1F);
        totalReducers = Math.max(totalReducers, 1);
        job.setNumReduceTasks(totalReducers );
        deleteOutputDirectory(outputDir);
        jobRes = job.waitForCompletion(true) ? 0 : 1;
        deleteLogsDirectory();
        fileUtil.removeAllZeroByteFiles(outputDir);
        return 0;
    }




    private int deleteOutputDirectory(String outputDir) throws IOException {
        fileUtil.removeHdfsPath(new Path(outputDir).toString());
        return 0;
    }


    private int printUsage() {
        System.out.println("USAGE: <YOUNAME> <inputDirList> <outputDir>");
        return 0;
    }


    private int deleteLogsDirectory()
            throws IOException {
        Path outputLogPath = new Path(new Path(outputDir).toString() + "/" + "_logs");
        fileUtil.removeHdfsPath(outputLogPath.toString());
        return 0;
    }
}


How configure Secondary namenode on a separate machine

If you have installed cloudera's hadoop distribution (CDH2) then you must have noticed that running command start-dfs.sh starts an instance of SecondaryNameNode process on all the datanodes. This is happening due to the way SecondaryNameNode startup is defined in file bin/start-dfs.sh. 


Scenario 1 : If you want to run your SecondaryNameNode on some other server (say sn.jeka.com) instead of the datanodes then do the following 

1. Logon to JobTracker (I am going to JobTracker because I have set variable HADOOP_MASTER in file ${HADOOP_HOME}/conf/hadoop-env.sh to point to the JobTracker hence any changes made there will be synched to your cluster) 
  • Create a new file ${HADOOP_HOME}/conf/secondarynamenode and add following line
    sn.jeka.com
  • In file ${HADOOP_HOME}/bin/start-dfs.sh, replace line
    "$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode
    with
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts secondarynamenode start secondarynamenode;exit"
  • In file ${HADOOP_HOME}/bin/stop-dfs.sh, replace line
    "$bin"/hadoop-daemons.sh --config $HADOOP_CONF_DIR --hosts masters stop secondarynamenode
    with
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/hadoop-daemon.sh --config $HADOOP_CONF_DIR --hosts secondarynamenode stop secondarynamenode;exit"

2.  Logon to Namenode and execute the following commands
  • ${HADOOP_HOME}/bin/stop-dfs.sh; ${HADOOP_HOME}/bin/start-dfs.sh; ${HADOOP_HOME}/bin/stop-dfs.sh; ${HADOOP_HOME}/bin/start-dfs.sh
You have to start and stop twice because in the first start, the code will be synched from JobTracker 
Thats! it. You secondary name node process will now start on the designated server, i.e. sn.jeka.com and not on the datanodes. 

Scenario 2 : If you want to run your SecondaryNameNode on the NameNode (say nn.jeka.com) itself then do the following 
Follow same steps as Scenario 1 except that replace all intances of sn.jeka.com to nn.jeka.com

Scenario 3 : If you do not want to run secondary name node at all then do the following
Follow same steps as Scenario 1 except that instead of replacing lines, delete them. 


Tuesday, December 7, 2010

How to control a Hadoop job using the web interfaces provided by the Job Tracker and Name Node

Hadoop provides a great way to manage your jobs and operating on HDFS using the web interface by setting the property webinterface.private.actions to true in file src/core/core-default.xml. 


When set to true, the web interfaces of JobTracker and NameNode may contain  actions, such as kill job, delete file, etc., that should  not be exposed to public.


Note: Enable this option only if the web interfaces for JobTracker and Name node are reachable by those who have the right authorizations. 

<property>
  <name>webinterface.private.actions</name>
  <value>false</value>
</property>

How to limit access to Job Tracker and Name node in Hadoop.

Hadoop provides properties that can be used to create an include or exclude list of hosts that are allowed to access Job Tracker and Name node.

Property to to create an include list of hosts for JobTracker in file mapred-site.xml
<property>
  <name>mapred.hosts</name>
  <value></value>
  <description>Names a file that contains the list of nodes that may
  connect to the jobtracker.  If the value is empty, all hosts are
  permitted.</description>
</property>

Property to to create an exclude list of hosts for JobTracker in file mapred-site.xml
<property>
  <name>mapred.hosts.exclude</name>
  <value></value>
  <description>Names a file that contains the list of hosts that
  should be excluded by the jobtracker.  If the value is empty, no
  hosts are excluded.</description>
</property>


Property to to create an include list of hosts for NameNode in file hdfs-site.xml
<property>
  <name>dfs.hosts</name>
  <value></value>
  <description>Names a file that contains a list of hosts that are
  permitted to connect to the namenode. The full pathname of the file
  must be specified.  If the value is empty, all hosts are
  permitted.</description>
</property>

Property to to create an exclude list of hosts for NameNode in file hdfs-site.xml
<property>
  <name>dfs.hosts.exclude</name>
  <value></value>
  <description>Names a file that contains a list of hosts that are
  not permitted to connect to the namenode.  The full pathname of the
  file must be specified.  If the value is empty, no hosts are
  excluded.</description>
</property>

Hadoop distcp error: java.lang.NumberFormatException: For input string: ""

Hadoop provide distcp command to copy data between clusters.


ERROR
When running this command I got the following error
java.lang.NumberFormatException: For input string: ""
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
        at java.lang.Integer.parseInt(Integer.java:470)
        at java.lang.Integer.parseInt(Integer.java:499)
        at org.apache.hadoop.net.NetUtils.createSocketAddr(NetUtils.java:149)
        at org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(NameNode.java:164)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:81)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1448)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:67)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1476)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:197)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:175)
        at org.apache.hadoop.tools.DistCp.setup(DistCp.java:997)
        at org.apache.hadoop.tools.DistCp.copy(DistCp.java:650)
        at org.apache.hadoop.tools.DistCp.run(DistCp.java:857)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.tools.DistCp.main(DistCp.java:884)

REASON
This error normally happens when the port number in source or destination hdfs URI is missing.


Example: Execution of the following command will give this error
hadoop distcp hdfs://hadoop1.jeka.com:9000/user/hadoop/jeka hdfs://hadoop2.jeka.com:/user/hadoop/jeka

It happened because port in the destination URL /hdfs://hadoop2.jeka.com:/user/hadoop/jeka is missing but the colon that separates 

RESOLUTION
Add port number 9000 after the colon, i.e. the destination URI will look like hdfs://hadoop2.jeka.com:9000/user/hadoop/jeka 

or remove the colon, distcp by default looks for port 9000, hence the destination URI will look like 
hdfs://hadoop2.jeka.com/user/hadoop/jeka 

Thursday, December 2, 2010

How to check if network is the bottleneck in your hadoop cluster

We had an issue with our hadoop cluster that after a network upgrade the jobs on the cluster started running very slowly. We knew it had to do something with the network upgrade.

On further investigation we found that all the mapper in the jobs are running fine, all the performance degradation happened on reducers. This pretty much proved the point that network is the bottleneck because most mappers jobs were rack-local (and it requires very limited to almost no use of network) but the reducer phase uses network heavily for sorting and shuffling of data before it can apply the reduce function.

Wednesday, December 1, 2010

Search engine for Hadoop content, you might find it better than google

There is an excellent site to get help online for Hadoop, after the Apache wiki. The website http://search-hadoop.com lets you search Hadoop and all its subprojects  like Pig, Hive, Hbase HDFS etc.

The website aggregates, indexes, and makes searchable all content repositories for all Apache Hadoop Top Level Project (TLP) sub-projects. It indexes data from
- User lists
- Dev Lists
- Apache Hadoop Jira
- Apache Wiki
- Hadoop source code

Its a very good website. I compared some search results on this site with google and bing results and found that, for hadoop , its easier to get answers on this site then from google or bing. The search results are very well laid. See it to believe it.

Error in starting Datanode - ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in

While playing around with Hadoop setup I found that after I did something, the datanodes will not start when I run script bin/start-dfs.sh.

Following were the steps I followed to troubleshoot and fix the issue

TO FIND THE PROBLEM
  1. Login to any one of the datanodes that is not starting up (say t1.mydomain.com)
  2. Get the value of variable HADOOP_LOG_DIR in file conf/hadoop-env.sh. Say the value is /home/jeka/runtime_hadoop_data/logs
  3. Look into datanode log file /home/jeka/runtime_hadoop_data/logs/hadoop-jeka-datanode-t1.mydomain.com.log. Following error was logged
    2010-12-01 18:57:39,115 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException: Incompatible namespaceIDs in /home/jeka/runtime_hdfs/datanode: namenode namespaceID = 1509057607; datanode namespaceID = 1781994419
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(DataStorage.java:233)
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(DataStorage.java:148)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(DataNode.java:298)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.
    (DataNode.java:216)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(DataNode.java:1283)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(DataNode.java:1238)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(DataNode.java:1246)
    at org.apache.hadoop.hdfs.server.datanode.DataNode.main(DataNode.java:1368)

REASON
  1. This issue happened right after I reformatted the namenode (and did not reformat datanodes). Everytime Namenode is formatted, Hadoop creates a unique namespaceID and places it in a file in the Namenode but since I did not formatted the datanodes hence datanodes still had the old namespaceID and hence the problem. 


SOLUTION 
The solution is to copy the new namespaceID from Namenode to Datanodes. Following are the steps
  1. Logon to the namenode (say nn.mydomain.com) 
  2. Stop DFS by running command bin/stop-dfs.sh
  3. Find the values of property dfs.name.dir and dfs.data.dir. This can be found in file conf/hdfs-site.xml. If missing then look for it in file src/hdfs/hdfs-default.xml. Say the value of these properties are /home/jeka/runtime_hdfs/namenode and /home/jeka/runtime_hdfs/datanode respectively
  4. Note the value of field namespaceID (this is the new Namenode namespaceID that we need to copy to all datanodesin file /home/jeka/runtime_hdfs/namenode/current/VERSION.

    In our case its 1509057607.

    For your reference, following are all the contents of this file
    #Wed Dec 01 19:05:31 UTC 2010
    namespaceID=1509057607
    cTime=0
    storageType=NAME_NODE
    layoutVersion=-18
  5. Now copy the new namespaceID,1509057607 in our case, to file /home/jeka/runtime_hdfs/datanode/current/VERSION on all datanodes by running the following command on shell prompt on Namenode

    for dn in $(cat ~/hadoop-0.20.2/conf/slaves);do ssh $dn "cat /home/jeka/runtime_hdfs/datanode/current/VERSION | sed 's/namespaceID=[0-9]*/namespaceID=1509057607/' > /home/jeka/runtime_hdfs/datanode/current/VERSION.temp; mv /home/jeka/runtime_hdfs/datanode/current/VERSION.temp /home/jeka/runtime_hdfs/datanode/current/VERSION";done

    This command will go to each and every datanode listed in file conf/slaves and change the namespaceID of the datanode to 1509057607
  6. Start DFS by running command bin/start-dfs.sh. Thats it!, all datanodes should be up and running now.


How to start and stop datanodes and tasktrackers without bringing down the cluster

Assuming that Hadoop and Java is set up on the nodes, following are the commands that can be used to start and stop datanodes and tasktrackers.

cd <hadoop_installation>/bin

- To start datanode
hadoop-daemon.sh start datanode

- To start tasktracker
hadoop-daemon.sh start tasktracker



- To stop datanode (remove the datanode from the cluster)
hadoop-daemon.sh stop datanode

- To stop tasktracker (remove tasktracker from the cluster)
hadoop-daemon.sh stop tasktracker

How to see the HDFS statistics of a Hadoop cluster

When you maintain a cluster then often you want to know the stats of HDFS, things like total capacity of HDFS, percentage of HDFS used and free, list of all data nodes that are active, list of all datanodes that are not active etc.

The following command provides this information
hadoop dfsadmin -report


If your hadoop cluster has a lot of datanodes (which most likely will be the case in a production environment) then this will be a huge report. You will have to create some type of summary report on your own.