::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyKey.java : CUSTOM KEY CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
package com.amandoon.HadoopExample;: FILE: MyKey.java : CUSTOM KEY CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class MyKey implements WritableComparable<MyKey> {
String timestamp = null;
String vrid = null;
public MyKey() {
}
public void set(String timestamp, String vrid) {
this.timestamp = timestamp;
this.vrid = vrid;
}
///////////////////////////////////////////// Implement Writable
@Override
public void write(DataOutput daout) throws IOException {
daout.writeUTF(vrid);
daout.writeUTF(timestamp);
}
///////////////////////////////////////////// Implement Writable
@Override
public void readFields(DataInput dain) throws IOException {
vrid = dain.readUTF();
timestamp = dain.readUTF();
}
//////////////////////////////////////////// Implement Comparable
@Override
public int compareTo(MyKey ob) {
int cmp = this.vrid.compareTo(ob.vrid);
if (cmp != 0) {
return cmp;
}
return this.timestamp.compareTo(ob.timestamp);
}
@Override
public boolean equals(Object ob) {
if (ob == null || this.getClass() != ob.getClass()) return false;
MyKey k = (MyKey) ob;
if (k.timestamp != null && this.timestamp != null && !k.timestamp.equals(this.timestamp)) return false;
if (k.vrid != null && this.vrid != null && !k.vrid.equals(this.vrid)) return false;
return true;
}
@Override
public int hashCode() {
int result = vrid != null ? vrid.hashCode() : 0;
return 31 * result;
}
@Override
public String toString() {
return vrid + "\u0002" + timestamp;
}
}
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyGroupComparator.java : CUSTOM KEY CLASS GROUP COMPARATOR
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
package com.amandoon.HadoopExample;: FILE: MyGroupComparator.java : CUSTOM KEY CLASS GROUP COMPARATOR
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
class MyGroupComparator extends WritableComparator {
protected MyGroupComparator(){
super(MyKey.class,true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2){
MyKey k1 = (MyKey) w1;
MyKey k2 = (MyKey) w2;
return k1.vrid.compareTo(k2.vrid);
}
}
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMapper.java : MAPPER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
package com.amandoon.HadoopExample;: FILE: MyMapper.java : MAPPER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {
private MyKey ok = new MyKey();
private Text ov = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] valarr = value.toString().split("\u0002", -1);
if (valarr.length > 3) {
ok.set(valarr[0], valarr[1]);
context.write(ok, new Text(valarr[2]));
} else {
context.getCounter("AMAN'S COUNTERS","!!! Bad Array !!!").increment(1);
}
}
}
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyReducer.java : REDUCER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
package com.amandoon.HadoopExample;: FILE: MyReducer.java : REDUCER CLASS
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReducer extends Reducer<MyKey, Text, MyKey, Text> {
private MyKey ok = new MyKey();
@Override
public void reduce(MyKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(key, value);
}
}
}
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
: FILE: MyMr.java : RUN JOB
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
package com.amandoon.HadoopExample;: FILE: MyMr.java : RUN JOB
::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option;
public class MyMr extends Configured implements Tool {
@Option(name = "--input", required = true)
String inputdir;
@Option(name = "--output", required = true)
String outputdir;
@Option(name = "--reducers")
int reducers;
public int run(String[] arg) throws Exception {
Configuration conf = super.getConf();
conf.set("mapred.input.dir", inputdir);
conf.set("mapred.output.dir", outputdir);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set("mapreduce.map.class", "com.amandoon.HadoopExample.MyMapper");
conf.set("mapreduce.reduce.class", "com.amandoon.HadoopExample.MyReducer");
conf.set("mapreduce.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.TextInputFormat");
conf.set("mapreduce.outputformat.class", "org.apache.hadoop.mapreduce.lib.output.TextOutputFormat");
conf.setInt("mapred.reduce.tasks", reducers);
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("mapred.output.key.class", "com.amandoon.HadoopExample.MyKey");
conf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");
conf.set("mapred.job.name", "CustomKey");
conf.set("mapred.output.value.groupfn.class", "com.amandoon.HadoopExample.MyGroupComparator");
Job job = new Job(conf);
job.setJarByClass(com.amandoon.HadoopExample.MyMr.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
MyMr fc = new MyMr();
new CmdLineParser(fc).parseArgument(args);
ToolRunner.run(fc, args);
}
}