Saturday, December 25, 2010

How to dynamically assign reducers to a Hadoop Job in runtime

When we were working on setting our production jobs we came to a point where we needed a way to dynamically assign reducers to a Hadoop Job in runtime. Following is what we have implemented as of today.  If you are reading the blog and have a better idea to share then please leave a comment.
We have a lot of cases where same produciton job works on different sizes of data sets, i.e. for an hour the job can process from 35GB to 400Gb data. We wanted to change the number of reducers depending on the data sets, in runtime, and also we did not want a job to hog all reducers because the grid was shared between jobs from different teams. All jobs were equally important.
This is something that hadoop MR framework cannot do for you. We found that hive came up with a solution by providing a property that limits the maximum number of bytes that will be processed by one reducer hence if this property is set to 1GB and data size is 1.2 GB data then 2 reducers will be assigned to the job in runtime; if data size 10 GB data then 10 reducers will be assigned to the job in runtime.  This works great for hive because it is designed to work agnostic of the dataset. Also a big job in hive can still take all the reducers in the grid. Since we knew our data very well and also did not want a single job to take all reducers hence we decided to implement our own solution that solved our problem.
The solution is very simple. We asked all the job owners to run their jobs in the sandbox on a fixed set of data and provide us either of the following
1.       Required: Input size of the data in Mb
2.       Required: Output size of the data in Mb
3.       Required: Is the reducer calculation CPU bound or I/O bound
4.       Optional: A decimal number, the multiplier,  to fine tune the number of reducers (if not provided then 1 will be used)


1.       Required: Provide fixed number of reducers (they have to be less than TotalReducersOnTheGrid/2)

For CPU bound jobs, total number of reducers were calculated as
Total Reducers = Minimum((InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
i.e. total number of reducers should be equal to either the Input data size divided by the HDFS block size multiplied by the multiplier or half the total number of reducers available in the grid, whichever is smaller.

For I/O bound jobs the total number of reducers were calculated using the following formula
Total Reducers = Minimum((OutputDataSize/InputDataSize) * (InputSizeOfDataInMb/128Mb) * multiplier, TotalReducersOnTheGrid/2)
The concept of multiplier was introduced to optimize the jobs when the generic formula was not enough to optimize the number of reducers for the job. We found that some jobs always required an exact number of reducers regardless of the size of data set hence we also provided the job owners a way to specify that.
This pretty much solved most of our problems.

Friday, December 17, 2010

Hadoop cluster at Ebay

I am always curious to know how other companies are installing Hadoop clusters. How are they using its ecosystem. Since Hadoop is still relatively new, there are no best practices. Every company is implementing what they think is the best infrastructure for the Hadoop Cluster.

At Hadoop NYC 2010 conference, ebay showcased there implementation of Hadoop production cluster. Following are some tidbits on ebay's implementation of Hadoop.

- JobTracker, Namenode, Zookeeper, HBase Master are all enterprise nodes running in Sun 64 bit architecture. They are running red hat linux with 72GB Ram and 4TB disks.
- There are 4000 datanodes, each running cent OS with 48 GB RAM and 10TB space
- Ganglia and Nagios are used for monitoring and alerting. Ebay is also building a custom solution to augment them.
- ETL is done using mostly Java Map Reduce programs
- Pig is used to build data pipelines
- Hive is used for AdHoc queries
- Mahout is used for Data Mining

They are toying with the idea of using Oozie to manage work flows but haven't decided to use it yet.

It looks like they are doing all the right things.

Friday, December 10, 2010

ERROR: hdfs.DFSClient: Exception in createBlockOutputStream Bad connect ack with firstBadLink

While running a job once I got the following exception

10/12/10 21:09:05 INFO hdfs.DFSClient: Exception in createBlockOutputStream Bad connect ack with firstBadLink
10/12/10 21:09:05 INFO hdfs.DFSClient: Abandoning block blk_3623545154924652323_87440
10/12/10 21:09:11 INFO hdfs.DFSClient: Exception in createBlockOutputStream Connection refused
10/12/10 21:09:11 INFO hdfs.DFSClient: Abandoning block blk_-4726571439643867938_87441\

The error contains the IP address ( of the tasktracker/datanode machine for which the exception is thrown. The exception is thrown because the datanode daemon is not running on that machine; you can check this by logging into this machine, lets use in the example, and running command
ps -eaf | grep "DataNode" | grep -v "grep"
If no lines are returned then this means that datanode daemon is not running on

What happened is that machine contain a data block that is required for the job that you are trying to run. If this block is replicated on other machines and those machines are running datanode daemons then this is not a problem, Hadoop will get the data block from some other machine and continue the job but if for any reason the data block is not available on any other node then your job will fail.

Logon to and run the following command start datanode
The above command should start the datanode daemon on You can double check this my running command
ps -eaf | grep "DataNode" | grep -v "grep"
It should return 1 line

Thats it. Try running the job again. It should not throw exception anymore

How to see table definition (extended) in Hive

To see table definition in Hive, run command
describe table name;

To see more detailed information about the table, run command
describe extended [tablename];

Thursday, December 9, 2010

ERROR: java.lang.IllegalArgumentException: Name cannot be have a '' char

Sometimes your Hadoop MapReduce job can fail with the following exception
java.lang.IllegalArgumentException: Name cannot be have a '' char
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkTokenName(
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.checkNamedOutputName(
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(
at org.apache.hadoop.mapreduce.lib.output.MultipleOutputs.write(
at org.apache.hadoop.mapred.MapTask.runNewMapper(
at org.apache.hadoop.mapred.Child.main(

This happens when you use MultipleOutputs class in your Hadoop Job and try to name the output file with non-alphanumeric characters (like : or - etc.)
i.e. <MultipleOutput Object>.write(KEY,VALUE,"Fruit::Mango") will throw this error because you are using colons in the output file name

Try to use only alphanumeric characters in the output filename and if you absolutely have to use some kind of delimiter, stick with dot (.)
i.e. <MultipleOutput Object>.write(KEY,VALUE,"Fruit..Mango")  will not throw this error

MultipleOutputs performance issues

MultipleOutputs class in Hadoop API provides a very neat way of separating disparate data but it comes with a performance hit.

I found that some of my production jobs slowed down after I refactored by code to use MultipleOutputs class. I did some benchmarking to ensure that its not the cluster but MultipleOutputs class that slowed my processes down.

I setup a small cluster with just 6 machines and some data
  • 1 machine running JobTracker
  • 1 machine running Namenode
  • 4 machines running Datanodes and tasktracker
  • Input data 8Gb
All machines were of same size and nothing else was running on them during benchmarking. 

Test 1: Mapper without MultipleOutputs
I created a mapper that 
  • Reads a file line by line
  • Creates output file name on the fly by taking first 3 characters of the hash of the input line. This information was not used to write output (because we are not using MultipleOutputs yet).  
  • Write the output key as input line and outputValue as NullWritable
I ran it 5 times and the median runtime was 4m 40s. 

Test 2: MultipleOutputs Mapper
Then I modified the above mapper to use the output file name and write data out using MultipleOutputs. I ran this 5 times and the median runtimes was 5m 48s. 

Based on this benchmark I found that MultipleOutputs slows down a job by almost 20%. 

This happens because more small files are created when you use MultipleOutputs class.
Say you have 50 mappers then assuming that you don't have skewed data, Test1 will always generate exactly 50 files but Test2 will generate somewhere between 50 to 1000 files (50Mappers x 20TotalPartitionsPossible) and this causes a performance hit in I/O. In my benchmark, 199 output files were generated for Test1 and 4569 output files were generated for Test2. 

Wednesday, December 8, 2010

Extended FileUtil class for Hadoop

While writing production jobs in Hadoop I identified following tasks that were required for some MapReduce jobs but were not readily available in Hadoop 0.20 API

  1. Get size of a file or directory in HDFS
    • We require this to dynamically change the number of reducers used for a job by looking at the amount of input data that the job will process
  2. Recursively remove all zero byte files from a directory in HDFS. 
    • This happens a lot when you use MultipleOutput class in reducer (impact is less when used in Mapper). A lot of times the reducer does not gets any record for which a MutipleOutput file needs to be created hence it creates a 0 byte files. These files have no use, its best to remove them after the job is finished. 
  3. Recursively get all subdirectories of a directories 
  4. Recursively get all files within a directory and its sub directories
    • By default, as of now, when Hadoop job is run, it only processes the immediate files under the input directory, any files in the subdirectories of the input path are not processed hence if you want your job to process all files under the subdirectories also then its better to create a comma delimited list of all files within the input path and submit it to the job.  

All the above tasks were implemented in the ExtendedFileUtil class. Source code can be found at

The wrapper class on link contains an example of how to use ExtendedFileUtil class

How to combine small files in Hadoop

Currently Hadoop is not built to work with a lot of small files. The following are the architectural limitations of hadoop that causes this problem

  • In HDFS, all file metadata is stored in memory of the Namenode (which is most often a single big powerful machine). This means "more files=more memory". There is a limitation on the amount of memory you can add to a machine and that limits the amount of files that can be stored in Hadoop. 
  • Namenode is used heavily for all jobs that run on Hadoop. More data in the memory can slow down Namenode and might end of slowing down the job execution time (it might be insignificant for long jobs though)
  • There is a setup time required by Hadoop to run a mapper. By default, Hadoop will start minimum 1 mapper for every file in the input directory. Till Hadoop 0.20, hadoop does not lets us choose the number of mappers you want to run hence if your file is small, say 100K, then more time is wasted in Hadoop setup than actually processing the data.
There are couple different solutions to solve this problem
  • Keep an eye on all data that is entered into HDFS  from other data sources. Try to optimize the processes, that push data to HDFS, to create files of size 128Mb (block size of HDFS). 
  • If you have map reduce pipeline where output of a map reduce job become input of the next map reduce job then try to use reducers wisely in your jobs. If suppose your job uses 100 reducers and outputs files of size 10 MB each, and if the reducer computations are not CPU bound, then try to run the same job with less reducers (7-10). Remember - Hadoop creates one file for every reducer run even if reducer did not output any data. 
  • If all else fails then try to combine small files into bigger files. Media6degrees has come up with a faily good solution to combine small files in Hadoop. You can use their jar straight out. See here for more details

Java templates/stubs for Mapper Reducer and Wrapper classes

A lot of times I want to test a concept in Hadoop that requires me to quickly create a small job and run it. Every job contains minimum 3 components

  • Mapper Class
  • Reducer Class
  • Wrapper Class
The following are the templates I use to generate empty templates, just replace variable  <YOUNAME> with your class name 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Mapper;

/* In case you are using Multiple outputs */
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Mapper extends Mapper<LongWritable, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    public void setup(Mapper.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);

    public void map(LongWritable key, Text values, Context context)
            throws IOException, InterruptedException {

    public void cleanup (Mapper.Context context)throws IOException, InterruptedException {
        /* In case you are using Multiple outputs */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Reducer;


/* In case you are using Multiple outputs */
//import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class <YOUNAME>Reducer extends Reducer<Text, Text, Text, Text> {
    private Configuration conf;
    private Text outputKey = new Text();
    private Text outputValue = new Text();
    private String line = null;

    /* In case you are using Multiple outputs */
    //private NullWritable outputValue = NullWritable.get();
    //private MultipleOutputs<Text, Text> contextMulti = null;

    public void setup(Reducer.Context context) {
        this.conf = context.getConfiguration();

        /* In case you are using Multiple outputs */
        //contextMulti = new MultipleOutputs<Text, Text>(context);

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

    public void cleanup(Reducer.Context context) {
        /* In case you are using Multiple outputs */
This class uses following 2 classes

import StringUtil;

import ExtendedFileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.text.ParseException;

public class <YOUNAME> extends Configured implements Tool, Constants {
    private Configuration conf = null;
    private Job job = null;
    private String inputDirList = null;
    private String outputDir = null;
    private String[] filesToProcess = null;
    private int totalReducers = 0;
    private int jobRes = 0;
    private ExtendedFileUtil fileUtil = new ExtendedFileUtil();

    public static void main(String[] args) throws Exception {
        <YOUNAME> ob = new <YOUNAME>();
        int jobRes =, args);        

    public int run(String[] args)
            throws ClassNotFoundException, IOException, InterruptedException, ParseException {
        jobRes = readCmdArgs(args);
        if (jobRes == 0) {
            jobRes = readConfig();
        if (jobRes == 0) {
            jobRes = runMrJob();
        return jobRes;

    private int readCmdArgs(String[] args) {
        if (args.length == 2) {
            inputDirList = args[0];
            outputDir = args[1];
        } else {
        return 0;

    private int readConfig() throws IOException, InterruptedException, ClassNotFoundException {
        conf = new Configuration();
        job = new Job(conf);
        if ((job.getJar() == null) || (job.getJar() == "")) {
        return 0;

    private int runMrJob()
            throws IOException, InterruptedException, ClassNotFoundException {
        filesToProcess = fileUtil.getFilesOnly(inputDirList, true);
        TextInputFormat.addInputPaths(job, StringUtil.arrayToString(filesToProcess, ","));
        TextOutputFormat.setOutputPath(job, new Path(outputDir));
        System.out.println("Input Dir: " + inputDirList);
        System.out.println("Output Dir: " + outputDir);


        totalReducers = Math.round((fileUtil.size(inputDirList) / 134217728) * 0.1F);
        totalReducers = Math.max(totalReducers, 1);
        job.setNumReduceTasks(totalReducers );
        jobRes = job.waitForCompletion(true) ? 0 : 1;
        return 0;

    private int deleteOutputDirectory(String outputDir) throws IOException {
        fileUtil.removeHdfsPath(new Path(outputDir).toString());
        return 0;

    private int printUsage() {
        System.out.println("USAGE: <YOUNAME> <inputDirList> <outputDir>");
        return 0;

    private int deleteLogsDirectory()
            throws IOException {
        Path outputLogPath = new Path(new Path(outputDir).toString() + "/" + "_logs");
        return 0;

How configure Secondary namenode on a separate machine

If you have installed cloudera's hadoop distribution (CDH2) then you must have noticed that running command starts an instance of SecondaryNameNode process on all the datanodes. This is happening due to the way SecondaryNameNode startup is defined in file bin/ 

Scenario 1 : If you want to run your SecondaryNameNode on some other server (say instead of the datanodes then do the following 

1. Logon to JobTracker (I am going to JobTracker because I have set variable HADOOP_MASTER in file ${HADOOP_HOME}/conf/ to point to the JobTracker hence any changes made there will be synched to your cluster) 
  • Create a new file ${HADOOP_HOME}/conf/secondarynamenode and add following line
  • In file ${HADOOP_HOME}/bin/, replace line
    "$bin"/ --config $HADOOP_CONF_DIR --hosts masters start secondarynamenode
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/ --config $HADOOP_CONF_DIR --hosts secondarynamenode start secondarynamenode;exit"
  • In file ${HADOOP_HOME}/bin/, replace line
    "$bin"/ --config $HADOOP_CONF_DIR --hosts masters stop secondarynamenode
    ssh $(cat $HADOOP_CONF_DIR/secondarynamenode) "${bin}/ --config $HADOOP_CONF_DIR --hosts secondarynamenode stop secondarynamenode;exit"

2.  Logon to Namenode and execute the following commands
  • ${HADOOP_HOME}/bin/; ${HADOOP_HOME}/bin/; ${HADOOP_HOME}/bin/; ${HADOOP_HOME}/bin/
You have to start and stop twice because in the first start, the code will be synched from JobTracker 
Thats! it. You secondary name node process will now start on the designated server, i.e. and not on the datanodes. 

Scenario 2 : If you want to run your SecondaryNameNode on the NameNode (say itself then do the following 
Follow same steps as Scenario 1 except that replace all intances of to

Scenario 3 : If you do not want to run secondary name node at all then do the following
Follow same steps as Scenario 1 except that instead of replacing lines, delete them. 

Tuesday, December 7, 2010

How to control a Hadoop job using the web interfaces provided by the Job Tracker and Name Node

Hadoop provides a great way to manage your jobs and operating on HDFS using the web interface by setting the property webinterface.private.actions to true in file src/core/core-default.xml. 

When set to true, the web interfaces of JobTracker and NameNode may contain  actions, such as kill job, delete file, etc., that should  not be exposed to public.

Note: Enable this option only if the web interfaces for JobTracker and Name node are reachable by those who have the right authorizations. 


How to limit access to Job Tracker and Name node in Hadoop.

Hadoop provides properties that can be used to create an include or exclude list of hosts that are allowed to access Job Tracker and Name node.

Property to to create an include list of hosts for JobTracker in file mapred-site.xml
  <description>Names a file that contains the list of nodes that may
  connect to the jobtracker.  If the value is empty, all hosts are

Property to to create an exclude list of hosts for JobTracker in file mapred-site.xml
  <description>Names a file that contains the list of hosts that
  should be excluded by the jobtracker.  If the value is empty, no
  hosts are excluded.</description>

Property to to create an include list of hosts for NameNode in file hdfs-site.xml
  <description>Names a file that contains a list of hosts that are
  permitted to connect to the namenode. The full pathname of the file
  must be specified.  If the value is empty, all hosts are

Property to to create an exclude list of hosts for NameNode in file hdfs-site.xml
  <description>Names a file that contains a list of hosts that are
  not permitted to connect to the namenode.  The full pathname of the
  file must be specified.  If the value is empty, no hosts are

Hadoop distcp error: java.lang.NumberFormatException: For input string: ""

Hadoop provide distcp command to copy data between clusters.

When running this command I got the following error
java.lang.NumberFormatException: For input string: ""
        at java.lang.NumberFormatException.forInputString(
        at java.lang.Integer.parseInt(
        at java.lang.Integer.parseInt(
        at org.apache.hadoop.hdfs.server.namenode.NameNode.getAddress(
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(
        at org.apache.hadoop.fs.FileSystem.createFileSystem(
        at org.apache.hadoop.fs.FileSystem.access$300(
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(
        at org.apache.hadoop.fs.FileSystem$Cache.get(
        at org.apache.hadoop.fs.FileSystem.get(
        at org.apache.hadoop.fs.Path.getFileSystem(

This error normally happens when the port number in source or destination hdfs URI is missing.

Example: Execution of the following command will give this error
hadoop distcp hdfs:// hdfs://

It happened because port in the destination URL /hdfs:// is missing but the colon that separates 

Add port number 9000 after the colon, i.e. the destination URI will look like hdfs:// 

or remove the colon, distcp by default looks for port 9000, hence the destination URI will look like 

Thursday, December 2, 2010

How to check if network is the bottleneck in your hadoop cluster

We had an issue with our hadoop cluster that after a network upgrade the jobs on the cluster started running very slowly. We knew it had to do something with the network upgrade.

On further investigation we found that all the mapper in the jobs are running fine, all the performance degradation happened on reducers. This pretty much proved the point that network is the bottleneck because most mappers jobs were rack-local (and it requires very limited to almost no use of network) but the reducer phase uses network heavily for sorting and shuffling of data before it can apply the reduce function.

Wednesday, December 1, 2010

Search engine for Hadoop content, you might find it better than google

There is an excellent site to get help online for Hadoop, after the Apache wiki. The website lets you search Hadoop and all its subprojects  like Pig, Hive, Hbase HDFS etc.

The website aggregates, indexes, and makes searchable all content repositories for all Apache Hadoop Top Level Project (TLP) sub-projects. It indexes data from
- User lists
- Dev Lists
- Apache Hadoop Jira
- Apache Wiki
- Hadoop source code

Its a very good website. I compared some search results on this site with google and bing results and found that, for hadoop , its easier to get answers on this site then from google or bing. The search results are very well laid. See it to believe it.

Error in starting Datanode - ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: Incompatible namespaceIDs in

While playing around with Hadoop setup I found that after I did something, the datanodes will not start when I run script bin/

Following were the steps I followed to troubleshoot and fix the issue

  1. Login to any one of the datanodes that is not starting up (say
  2. Get the value of variable HADOOP_LOG_DIR in file conf/ Say the value is /home/jeka/runtime_hadoop_data/logs
  3. Look into datanode log file /home/jeka/runtime_hadoop_data/logs/ Following error was logged
    2010-12-01 18:57:39,115 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: Incompatible namespaceIDs in /home/jeka/runtime_hdfs/datanode: namenode namespaceID = 1509057607; datanode namespaceID = 1781994419
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.doTransition(
    at org.apache.hadoop.hdfs.server.datanode.DataStorage.recoverTransitionRead(
    at org.apache.hadoop.hdfs.server.datanode.DataNode.startDataNode(
    at org.apache.hadoop.hdfs.server.datanode.DataNode.
    at org.apache.hadoop.hdfs.server.datanode.DataNode.makeInstance(
    at org.apache.hadoop.hdfs.server.datanode.DataNode.instantiateDataNode(
    at org.apache.hadoop.hdfs.server.datanode.DataNode.createDataNode(
    at org.apache.hadoop.hdfs.server.datanode.DataNode.main(

  1. This issue happened right after I reformatted the namenode (and did not reformat datanodes). Everytime Namenode is formatted, Hadoop creates a unique namespaceID and places it in a file in the Namenode but since I did not formatted the datanodes hence datanodes still had the old namespaceID and hence the problem. 

The solution is to copy the new namespaceID from Namenode to Datanodes. Following are the steps
  1. Logon to the namenode (say 
  2. Stop DFS by running command bin/
  3. Find the values of property and This can be found in file conf/hdfs-site.xml. If missing then look for it in file src/hdfs/hdfs-default.xml. Say the value of these properties are /home/jeka/runtime_hdfs/namenode and /home/jeka/runtime_hdfs/datanode respectively
  4. Note the value of field namespaceID (this is the new Namenode namespaceID that we need to copy to all datanodesin file /home/jeka/runtime_hdfs/namenode/current/VERSION.

    In our case its 1509057607.

    For your reference, following are all the contents of this file
    #Wed Dec 01 19:05:31 UTC 2010
  5. Now copy the new namespaceID,1509057607 in our case, to file /home/jeka/runtime_hdfs/datanode/current/VERSION on all datanodes by running the following command on shell prompt on Namenode

    for dn in $(cat ~/hadoop-0.20.2/conf/slaves);do ssh $dn "cat /home/jeka/runtime_hdfs/datanode/current/VERSION | sed 's/namespaceID=[0-9]*/namespaceID=1509057607/' > /home/jeka/runtime_hdfs/datanode/current/VERSION.temp; mv /home/jeka/runtime_hdfs/datanode/current/VERSION.temp /home/jeka/runtime_hdfs/datanode/current/VERSION";done

    This command will go to each and every datanode listed in file conf/slaves and change the namespaceID of the datanode to 1509057607
  6. Start DFS by running command bin/ Thats it!, all datanodes should be up and running now.

How to start and stop datanodes and tasktrackers without bringing down the cluster

Assuming that Hadoop and Java is set up on the nodes, following are the commands that can be used to start and stop datanodes and tasktrackers.

cd <hadoop_installation>/bin

- To start datanode start datanode

- To start tasktracker start tasktracker

- To stop datanode (remove the datanode from the cluster) stop datanode

- To stop tasktracker (remove tasktracker from the cluster) stop tasktracker

How to see the HDFS statistics of a Hadoop cluster

When you maintain a cluster then often you want to know the stats of HDFS, things like total capacity of HDFS, percentage of HDFS used and free, list of all data nodes that are active, list of all datanodes that are not active etc.

The following command provides this information
hadoop dfsadmin -report

If your hadoop cluster has a lot of datanodes (which most likely will be the case in a production environment) then this will be a huge report. You will have to create some type of summary report on your own.

Tuesday, November 30, 2010

For beginners, quickly install Hadoop 0.20 on Linux in cluster mode

Its very easy to set up a small Hadoop cluster on Linux for testing and development purposes. In this blog post I will demonstrate how to setup a small Hadoop 0.20 cluster in 10 easy steps. You will be able to set it up in less than an hour.

In the cluster we have 5 machines as follows
3sn.mydomain.comSecondary Namenodesn


1. Download Hadoop and Java to all machines 
See the INSTALL section on to get more details. For the purpose of this document we will assume that hadoop is installed in directory /home/${USER}/hadoop-0.20.2 and java is installed in directory /home/${USER}/jdk1.6.0_22

2. Ensure that machines in the cluster can see each other
 Setup password less ssh between following machines 
  1. jt to nn 
  2. jt to sn
  3. jt to tt1
  4. jt to tt2
  5. nn to jt
  6. nn to sn
  7. nn to tt1
  8. nn to tt2
  9. sn to nn
  10. sn to jt

3. Set up the Namenode
On, overwrite file /home/${USER}/hadoop-0.20.2/conf/core-site.xml with following lines
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->

4. Set up path to Java, Master and Slave directories/files 
In Hadoop, JobTracker and Namenode are called Masters and tasktracker and datanodes are called slaves. Every slave runs both Datanode and Tasktracker.
On, add following 3 lines to file  /home/${USER}/hadoop-0.20.2/conf/
export JAVA_HOME=/home/${USER}/jdk1.6.0_22
export HADOOP_SLAVES=${HADOOP_HOME}/conf/slaves

    5. Set up Jobtracker
    On, overwrite file /home/${USER}/hadoop-0.20.2/conf/mapred-site.xml with following lines
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->

    6. List Masters
    On, overwrite file /home/${USER}/hadoop-0.20.2/conf/masters with following line

    7. List Slaves
    On, overwrite file /home/${USER}/hadoop-0.20.2/conf/slaves with following lines

    8. Format Namenode
    Format the namenode by running the following command on
    /home/${USER}/hadoop-0.20.2/bin/hadoop namenode -format

    9. Start DFS
    On command prompt, run the following command to start HDFS daemon on Name node and data nodes and will also setup secondard name node
    sh /home/${USER}/hadoop-0.20.2/bin/

    10. Start MapReduce
    On command prompt, run the following command to start MapReduce daemon on Jobtracker and tasktrackers.
    sh /home/${USER}/hadoop-0.20.2/bin/

    That's it!  Hadoop cluster is up and running now. 


    • The cluster is defined in files slaves and masters
    • You can use IP addresses instead of host names, i.e. instead of
    • After you execute #8 and #9, you will notice that all files that were updated in steps #3-7 on are also updated on nn, sn, tt1 and tt2 machines. This happened because we set property HADOOP_MASTER  in step #4 to Setting this property means that use the config files on as master files and sync them across all nodes that you find in the cluster.
    • Jobtracker WebUI should be up and running on
    • HDFS WebUI should be up and running on
    I also found another excellent tutorial, better than my blog post for sure. 

    Error in starting jobtracker or namenode: rsync: connection unexpectedly closed (0 bytes received so far) [receiver]

    After setting the hadoop cluster, it is possible that you get error rsync: connection unexpectedly closed (0 bytes received so far) [receiver] or unexplained error (code 255) at io.c(463) [receiver=2.6.8] when you try to start the mapred daemon ( or dfs daemon ( or all daemons ( . 

    This means that for something is wrong in the ssh connections in the cluster. For rsync (or hadoop in cluster) to work, the you should be able to ssh between the following hadoop components without any password or prompts.
    - Jobtracker to Tasktrackers
    Jobtracker to Namenode

    Namenode to DataNodes
    Namenode to Jobtracker
    Datanodes to NameNode
    - Tasktrackers to Jobtracker

    Once ssh is working between the above 6 directions, these errors should go away.

    Supppose a hadoop cluster is composed of the following machines : Jobtracker Name node Datanode and Tasktracker Datanode and Tasktracker

    then from following ssh's should work
    Jobtracker to Tasktrackers >

    Jobtracker to Tasktrackers > >

    Namenode to Jobtracker >

    Namenode to DataNodes > >

    Datanodes to NameNode > >

    Tasktrackers to Jobtracker > >

    Please note that it is not required to be able to ssh from one task tracker to another

    Hadoop Administrator Interview Questions Part 1

    Following are some questions and answers to ask a Hadoop Administrator Interviewee

    Q1. What are the default configuration files that are used in Hadoop 
    As of 0.20 release, Hadoop supported the following read-only default configurations
    - src/core/core-default.xml
    - src/hdfs/hdfs-default.xml
    - src/mapred/mapred-default.xml

    Q2. How will you make changes to the default configuration files 
    Hadoop does not recommends changing the default configuration files, instead it recommends making all site specific changes in the following files
    - conf/core-site.xml
    - conf/hdfs-site.xml
    - conf/mapred-site.xml

    Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:
    - core-default.xml : Read-only defaults for hadoop.
    - core-site.xml: Site-specific configuration for a given hadoop installation.

    Hence if same configuration is defined in file core-default.xml and src/core/core-default.xml then the values in file core-default.xml (same is true for other 2 file pairs) is used.

    Q3. Consider case scenario where you have set property mapred.output.compress to true to ensure that all output files are compressed for efficient space usage on the cluster.  If a cluster user does not want to compress data for a specific job then what will you recommend him to do ? 
    Ask him to create his own configuration file and specify configuration mapred.output.compress to false and load this file as a resource in his job.

    Q4. In the above case scenario, how can ensure that user cannot override the configuration mapred.output.compress to false in any of his jobs
    This can be done by setting the property final to true in the core-site.xml file

    Q5. What of the following is the only required variable that needs to be set in file conf/ for hadoop to work 
    The only required variable to set is JAVA_HOME that needs to point to <java installation> directory

    Q6. List all the daemons required to run the Hadoop cluster 
    - NameNode
    - DataNode
    - JobTracker
    - TaskTracker

    Q7. Whats the default port that jobtrackers listens to

    Q8. Whats the default  port where the dfs namenode web ui will listen on

    Sunday, November 28, 2010

    Java interview questions for Hadoop developer Part 3

     Q21. Explain difference of Class Variable and Instance Variable and how are they declared in Java 
    Class Variable is a variable which is declared with static modifier.
    Instance variable is a variable in a class without static modifier.
    The main difference between the class variable and Instance variable is, that first time, when class is loaded in to memory, then only memory is allocated for all class variables. That means, class variables do not depend on the Objets of that classes. What ever number of objects are there, only one copy is created at the time of class loding.

    Q22. Since an Abstract class in Java cannot be instantiated then how can you use its non static methods 
    By extending it

    Q23. How would you make a copy of an entire Java object with its state? 
    Have this class implement Cloneable interface and call its method clone().

    Q24. Explain Encapsulation,Inheritance and Polymorphism 
    Encapsulation is a process of binding or wrapping the data and the codes that operates on the data into a single entity. This keeps the data safe from outside interface and misuse. One way to think about encapsulation is as a protective wrapper that prevents code and data from being arbitrarily accessed by other code defined outside the wrapper.
    Inheritance is the process by which one object acquires the properties of another object.
    The meaning of Polymorphism is something like one name many forms. Polymorphism enables one entity to be used as as general category for different types of actions. The specific action is determined by the exact nature of the situation. The concept of polymorphism can be explained as "one interface, multiple methods".

    Q25. Explain garbage collection? 
    Garbage collection is one of the most important feature of Java.
    Garbage collection is also called automatic memory management as JVM automatically removes the unused variables/objects (value is null) from the memory. User program cann't directly free the object from memory, instead it is the job of the garbage collector to automatically free the objects that are no longer referenced by a program. Every class inherits finalize() method from java.lang.Object, the finalize() method is called by garbage collector when it determines no more references to the object exists. In Java, it is good idea to explicitly assign null into a variable when no more in us

    Q26. What is similarities/difference between an Abstract class and Interface? 
    Differences- Interfaces provide a form of multiple inheritance. A class can extend only one other class.
    - Interfaces are limited to public methods and constants with no implementation. Abstract classes can have a partial implementation, protected parts, static methods, etc.
    - A Class may implement several interfaces. But in case of abstract class, a class may extend only one abstract class.
    - Interfaces are slow as it requires extra indirection to find corresponding method in in the actual class. Abstract classes are fast.
    - Neither Abstract classes or Interface can be instantiated

    Q27. What are different ways to make your class multithreaded in Java 
    There are two ways to create new kinds of threads:
    - Define a new class that extends the Thread class
    - Define a new class that implements the Runnable interface, and pass an object of that class to a Thread's constructor.

    Q28. What do you understand by Synchronization? How do synchronize a method call in Java? How do you synchonize a block of code in java ?
    Synchronization is a process of controlling the access of shared resources by the multiple threads in such a manner that only one thread can access one resource at a time. In non synchronized multithreaded application, it is possible for one thread to modify a shared object while another thread is in the process of using or updating the object's value. Synchronization prevents such type of data corruption.
    - Synchronizing a method: Put keyword synchronized as part of the method declaration
    - Synchronizing a block of code inside a method: Put block of code in synchronized (this) { Some Code }

    Q29. What is transient variable? 
    Transient variable can't be serialize. For example if a variable is declared as transient in a Serializable class and the class is written to an ObjectStream, the value of the variable can't be written to the stream instead when the class is retrieved from the ObjectStreamthe value of the variable becomes null.

    Q30. What is Properties class in Java. Which class does it extends? 
    The Properties class represents a persistent set of properties. The Properties can be saved to a stream or loaded from a stream. Each key and its corresponding value in the property list is a string

    Q31. Explain the concept of shallow copy vs deep copy in Java 
    In case of shallow copy, the cloned object also refers to the same object to which the original object refers as only the object references gets copied and not the referred objects themselves.
    In case deep copy, a clone of the class and all all objects referred by that class is made.

    Q32. How can you make a shallow copy of an object in Java 
    Use clone() method inherited by Object class

    Q33. How would you make a copy of an entire Java object (deep copy) with its state? 
    Have this class implement Cloneable interface and call its method clone().

    Java interview questions for Hadoop developer Part 2

    Q11. Which of the following object oriented principal is met with method overloading in java
    - Inheritance
    - Polymorphism
    - Inheritance 


    Q12. Which of the following object oriented principal is met with method overriding in java
    - Inheritance
    - Polymorphism
    - Inheritance 


    Q13. What is the name of collection interface used to maintain unique elements 

    Q14. What access level do you need to specify in the class declaration to ensure that only classes from the same directory can access it? What keyword is used to define this specifier? It has to have default specifier.
    You do not need to specify any access level, and Java will use a default package access level

    Q15. What's the difference between a queue and a stack? 
    Stacks works by last-in-first-out rule (LIFO), while queues use the FIFO rule

    Q16. How can you write user defined exceptions in Java 
    Make your class extend Exception Class

    Q17. What is the difference between checked and Unchecked Exceptions in Java ? Give an example of each type 
    All predefined exceptions in Java are either a checked exception or an unchecked exception. Checked exceptions must be caught using try .. catch() block or we should throw the exception using throws clause. If you dont, compilation of program will fail.
    - Example checked Exception: ParseTextException
    - Example unchecked exception: ArrayIndexOutOfBounds

    Q18. We know that FileNotFoundExceptionis inherited from IOExceptionthen does it matter in what order catch statements for FileNotFoundExceptionand IOExceptipon are written? 
    Yes, it does. The FileNoFoundExceptionis inherited from the IOException. Exception's subclasses have to be caught first.

    Q19. How do we find if two string are same or not in Java. If answer is equals() then why do we have to use equals, why cant we compare string like integers 
    We use method equals() to compare the values of the Strings. We can't use == like we do for primitive types like int because == checks if two variables point at the same instance of a String object.

    Q20. What is "package" keyword 
    This is a way to organize files when a project consists of multiple modules. It also helps resolve naming conflicts when different packages have classes with the same names. Packages access level also allows you to protect data from being used by the non-authorized classes

    Java interview questions for Hadoop developer Part 1

    Since Hadoop and all its eco-system is built in java hence when hiring for a hadoop developer it makes sense to test the core java skills of the interviewee as well. Following are some questions that I have compiled that test the basic java understanding of the candidate. I would expect any decent candidate to answer 90% of these questions

    Q1. What is mutable object and immutable object
    If a object value is changeable then we can call it as Mutable object. (Ex., StringBuffer) If you are not allowed to change the value of an object, it is immutable object. (Ex., String, Integer, Float)

    Q2. What are wrapped classes in Java. Why do they exist. Give examples 
    Wrapped classes are classes that allow primitive types to be accessed as objects, e.g. Integer, Float etc

    Q3. Even though garbage collection cleans memory, why can't it guarantee that a program will run out of memory? Give an example of a case when garbage collection will run out ot memory 
    Because it is possible for programs to use up memory resources faster than they are garbage collected. It is also possible for programs to create objects that are not subject to garbage collection. Once example can be if yuo try to load a very big file into an array.

    Q4. What is the difference between Process and Thread? 
    A process can contain multiple threads. In most multithreading operating systems, a process gets its own memory address space; a thread doesn't. Threads typically share the heap belonging to their parent process. For instance, a JVM runs in a single process in the host O/S. Threads in the JVM share the heap belonging to that process; that's why several threads may access the same object. Typically, even though they share a common heap, threads have their own stack space. This is how one thread's invocation of a method is kept separate from another's

    Q5. How can you write a indefinate loop in java 
    while(true) {
    for ( ; ; ){

    Q6. How can you create singleton class in Java 
    Make the constructor of the class private and provide a static method to get instance of the class

    Q7. What do keywords "this" and "super" do in Java 
    "this" is used to refer to current object. "super" is used to refer to the class extended by the current class

    Q8. What are access specifiers in java. List all of them. Access specifiers are used to define score of variables in Java. There are four levels of access specifiers in java- public
    - private
    - protected
    - default

    Q9. Which of the following 3 object oriented principals does access specifiers implement in java
    - Encapsulation
    - Polymorphism
    - Intheritance 


    Q10. What is method overriding and method overloading 
    With overriding, you change the method behavior for a subclass class. Overloading involves having a method with the same name within the class with different signature

    Hadoop Distributed File Systems (HDFS) Interview Questions

    Q1. What is HDFS  HDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications

    Q2. What does the statement "HDFS is block structured file system" means  It means that in HDFS individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity

    Q3. What does the term "Replication factor" mean  Replication factor is the number of times a file needs to be replicated in HDFS

    Q4. What is the default replication factor in HDFS  3

    Q5. What is the typical block size of an HDFS block  64Mb to 128Mb

    Q6. What is the benefit of having such big block size (when compared to block size of linux file system like ext)  It allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk

    Q7. Why is it recommended to have few very large files instead of a lot of small files in HDFS  This is because the Name node contains the meta data of each and every file in HDFS and more files means more metadata and since namenode loads all the metadata in memory for speed hence having a lot of files may make the metadata information big enough to exceed the size of the memory on the Name node

    Q8. True/false question. What is the lowest granularity at which you can apply replication factor in HDSF
    - You can choose replication factor per directory
    - You can choose replication factor per file in a directory
    - You can choose replication factor per block of a file

    - True
    - True
    - False

    Q9. What is a datanode in HDFS  ndividual machines in the HDFS cluster that hold blocks of data are called datanodes

    Q10. What is a Namenode in HDSF  The Namenode stores all the metadata for the file system

    Q11. What alternate way does HDFS provides to recover data in case a Namenode, without backup, fails and cannot be recovered  There is no way. If Namenode dies and there is no backup then there is no way to recover data

    Q12. Describe how a HDFS client will read a file in HDFS, like will it talk to data node or namenode ... how will data flow etc  To open a file, a client contacts the Name Node and retrieves a list of locations for the blocks that comprise the file. These locations identify the Data Nodes which hold each block. Clients then read file data directly from the Data Node servers, possibly in parallel. The Name Node is not directly involved in this bulk data transfer, keeping its overhead to a minimum.

    Q13. Using linux command line. how will you - List the the number of files in a HDFS directory
    - Create a directory in HDFS
    - Copy file from your local directory to HDSF
    - hadoop fs -ls
    - hadoop fs -mkdir
    - hadoop fs -put localfile hdfsfile OR hadoop fs -copyFromLocal localfile hdfsfile

    Saturday, November 27, 2010

    How to install standalone Hadoop for development and debugging purposes

    It is very easy to setup a standalone hadoop installation for development and testing purposes. Infact having a standlone hadoop installation on your local linux machine can be of great help in debugging issues.

    The following are the steps involved in setting up the standalone installation of Hadoop 0.20 on Java 5. Please note that setting up a Hadoop cluster is very different than setting up a standalone version.

    - Get a linux machine
       - Suppose your username is jeka and home directory is /home/jeka or ~
       - Create directories ~/java and ~/hadoop

    - Download required software if required
       - Download java release (file jdk-6u22-linux-i586.bin) from here  to directory ~/java
       - Download Hadoop release(file hadoop-0.20.2.tar.gz) from here to directory~/hadoop

    - Install Java
       - Make java file executable
                 chmod a+x java/jdk-6u22-linux-i586.bin
       - Install it

    - Install Hadoop
       - Unzip
              gunzip ~/hadoop/hadoop-0.20.2.tar.gz
        - Untar
               tar -xvf ~/hadoop/hadoop-0.20.2.tar.gz

    Thats it!. The installation is done and hadoop is ready to be used but to make life a little easier we should set up some environment variables.

    Both Java and Hadoop provides command line clients  (or executables) java and hadoop respectively. These executables can found in the bin directory of the installation.
    - Create a file ~/.hadoop_profile and add following lines in it

    export JAVA_HOME="~/java/jdk1.6.0_22"
    export HADOOP_HOME="~/hadoop/hadoop-0.20.2"

    Save this file and source it 
        source ~/.hadoop_profile

    Now instead of running hadoop like ~/hadoop/hadoop-0.20.2/bin/hadoop you can simple use it as hadoop. 

    Note: This job will run on your local machine and not HDFS
    File ~/hadoop/hadoop-0.20.2/hadoop-0.20.2-examples.jar comes with some examples. We can use one of the examples "grep" from that.

    In the following example, we will use one of the map reduce examples to read the number of times the work "copyright" appeared in file  LICENSE.txt. 

     cd ~/hadoop/hadoop-0.20.2
     hadoop jar hadoop-0.20.2-examples.jar grep LICENSE.txt ~/tmp/out "copyright"

    Output: 4
    cat ~/tmp/out/* 

    It's very simple to create your own jar and run it instead of using the examples jar. See blog post for more details