Tuesday, April 24, 2012

Hadoop Example: Using a custom Java class as Key and Group Comparator

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyKey.java : CUSTOM KEY CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MyKey implements WritableComparable<MyKey> {
    String timestamp = null;
    String vrid = null;

    public MyKey() {
    }

    public void set(String timestamp, String vrid) {
        this.timestamp = timestamp;
        this.vrid = vrid;

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void write(DataOutput daout) throws IOException {
        daout.writeUTF(vrid);
        daout.writeUTF(timestamp);

    }

    ///////////////////////////////////////////// Implement Writable
    @Override
    public void readFields(DataInput dain) throws IOException {
        vrid = dain.readUTF();
        timestamp = dain.readUTF();
    }

    //////////////////////////////////////////// Implement Comparable
    @Override
    public int compareTo(MyKey ob) {
        int cmp = this.vrid.compareTo(ob.vrid);
        if (cmp != 0) {
            return cmp;
        }
        return this.timestamp.compareTo(ob.timestamp);
    }

    @Override
    public boolean equals(Object ob) {
        if (ob == null || this.getClass() != ob.getClass()) return false;

        MyKey k = (MyKey) ob;
        if (k.timestamp != null && this.timestamp != null && !k.timestamp.equals(this.timestamp)) return false;
        if (k.vrid != null && this.vrid != null && !k.vrid.equals(this.vrid)) return false;
        return true;
    }

    @Override
    public int hashCode() {
        int result = vrid != null ? vrid.hashCode() : 0;
        return 31 * result;

    }

    @Override
    public String toString() {
        return vrid + "\u0002" + timestamp;
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyGroupComparator.java : CUSTOM KEY CLASS GROUP COMPARATOR
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

class MyGroupComparator extends WritableComparator {

    protected MyGroupComparator(){
        super(MyKey.class,true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2){
        MyKey k1 = (MyKey) w1;
        MyKey k2 = (MyKey) w2;
        return k1.vrid.compareTo(k2.vrid);
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMapper.java : MAPPER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 package com.amandoon.HadoopExample;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {

    private MyKey ok = new MyKey();
    private Text ov = new Text();

    @Override
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] valarr = value.toString().split("\u0002", -1);
        if (valarr.length > 3) {
            ok.set(valarr[0], valarr[1]);
            context.write(ok, new Text(valarr[2]));
        } else {
            context.getCounter("AMAN'S COUNTERS","!!! Bad Array !!!").increment(1);
        }
    }
}

::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyReducer.java : REDUCER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class MyReducer extends Reducer<MyKey, Text, MyKey, Text> {

    private MyKey ok = new MyKey();

    @Override
    public void reduce(MyKey key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(key, value);
        }
    }
}


::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMr.java : RUN JOB
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
 
package com.amandoon.HadoopExample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option;

public class MyMr extends Configured implements Tool {
    @Option(name = "--input", required = true)
    String inputdir;

    @Option(name = "--output", required = true)
    String outputdir;

    @Option(name = "--reducers")
    int reducers;

    public int run(String[] arg) throws Exception {

        Configuration conf = super.getConf();
        conf.set("mapred.input.dir", inputdir);
        conf.set("mapred.output.dir", outputdir);

        conf.setBoolean("mapred.mapper.new-api", true);
        conf.setBoolean("mapred.reducer.new-api", true);

        conf.set("mapreduce.map.class", "com.amandoon.HadoopExample.MyMapper");
        conf.set("mapreduce.reduce.class", "com.amandoon.HadoopExample.MyReducer");

        conf.set("mapreduce.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
        conf.set("mapreduce.outputformat.class", "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat");

        conf.setInt("mapred.reduce.tasks", reducers);

        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);

        conf.set("mapred.output.key.class", "com.amandoon.HadoopExample.MyKey");
        conf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");

        conf.set("mapred.job.name", "CustomKey");
        conf.set("mapred.output.value.groupfn.class", "com.amandoon.HadoopExample.MyGroupComparator");

        Job job = new Job(conf);
        job.setJarByClass(com.amandoon.HadoopExample.MyMr.class);

        job.waitForCompletion(true);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        MyMr fc = new MyMr();
        new CmdLineParser(fc).parseArgument(args);
        ToolRunner.run(fc, args);
    }
}

Which hadoop property needs to be set to change the key GroupComparator

If you have a custom key GroupComparator defined for your MR job then you can set it in your hadoop config file as

<property>
<name>mapred.output.value.groupfn.class</name>
<value>com.amandoon.CustomKey.MyGroupComparator</value>
</property>


OR you can set it for a job using the Job object
jobObject.setGroupingComparatorClass(MyGroupComparator.class)

See hadoop example that uses Custom Key GroupComparator
http://hadoop-blog.blogspot.com/2012/04/hadoop-example-using-custom-java-class.html

Tips & techniques to write data intensive text processing task in Hadoop

Stumbled upon a good book by Jimmy Lin and Chris Dyer (University of Maryland, College Park). The book discusses various techniques used to write data intensive alogorithms using Hadoop Map Reduce.

https://docs.google.com/viewer?a=v&pid=sites&srcid=ZGVmYXVsdGRvbWFpbnxoYWRvb3BhbmRoaXZlfGd4OjZjNmEzMzlhMzlhNjdkMWQ&pli=1

Got a fairly good starting point on how to approach graph algorithms using Map Reduce.