The fomular of MapReduce can be described as follow. Besides, you can find the configuration of a Job.
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );
list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);
list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );
Normally, (K2, V2) equals (K3, V3). I show the code1:
- public class Dedup2 {
- public static class Map extends Mapper<Object,Text,Text,Text>{
- private static Text line=new Text();
- public void map(Object key,Text value,Context context)
- throws IOException,InterruptedException{
- line=value;
- context.write(line, new Text(“”));
- }
- }
- public static class Reduce extends Reducer<Text,Text,Text,Text>{
- public void reduce(Text key,Iterable<Text> values,Context context)
- throws IOException,InterruptedException{
- context.write(key, new Text(“”));
- }
- }
- public static void main(String[] args) throws Exception{
- Configuration conf = new Configuration();
- System.setProperty(“HADOOP_USER_NAME”, “root”);
- String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup1”};
- Job job = new Job(conf, “Data Deduplication”);
- job.setJarByClass(Dedup.class);
- job.setMapperClass(Map.class);
- job.setCombinerClass(Reduce.class);
- job.setReducerClass(Reduce.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
But I want to set a different (K2, V2) from (K3, V3). So I did the following change:
- public class Dedup{
- public static class Map extends Mapper<Object,Text,Text,IntWritable>{ //Change Text to IntWritable
- private static Text line=new Text();
- public void map(Object key,Text value,Context context)
- throws IOException,InterruptedException{
- line=value;
- context.write(line, new IntWritable(0)); //Change Text(“”) to IntWritable(0)
- }
- }
- public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{ //Change Text to IntWritable
- public void reduce(Text key,Iterable<IntWritable> values,Context context) //Change Text to IntWritable
- throws IOException,InterruptedException{
- context.write(key, new Text(“”));
- }
- }
- public static void main(String[] args) throws Exception{
- Configuration conf = new Configuration();
- System.setProperty(“HADOOP_USER_NAME”, “root”);
- String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup2”};
- Job job = new Job(conf, “Data Deduplication”);
- job.setJarByClass(Dedup.class);
- job.setMapperClass(Map.class);
- job.setCombinerClass(Reduce.class);
- job.setReducerClass(Reduce.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class); //New added. To indicate the Mapper Output Value is not default Text, but IntWritable
- FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
- FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
After the change, I run the code. It doesn’t generate the output. I found out that Reducer is not running. There is no error information in eclipse. I added the try…catch… to Reducer code:
- public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{
- public void reduce(Text key,Iterable<IntWritable> values,Context context)
- throws IOException,InterruptedException{
- try{
- context.write(key, new Text(“”));
- System.out.println(“in reducer”);
- }catch(Exception e){
- System.out.println(e);
- }
- }
- }
The error information shows:
java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable
Google it, I found it is because that the Combiner shouldn’t be set. So I commented this line:
//job.setCombinerClass(Reduce.class);
Ding… The output came out. This debug took me 1 day to solve.
Summary:
Back to the original rule of Mapper and Reducer. It is like below:
(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );
list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);
list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );
But if you want a different (K2, V2) from (K3, V3), the CombinerClass shouldn’t be set. Comment it.