Format of Mapper and Reducer

By | October 29, 2014
Share the joy
  •  
  •  
  •  
  •  
  •  
  •  

The fomular of MapReduce can be described as follow. Besides, you can find the configuration of a Job.
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );

list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);

list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );

Normally, (K2, V2) equals (K3, V3). I show the code1:

  1. public class Dedup2 {
  2.     public static class Map extends Mapper<Object,Text,Text,Text>{
  3.         private static Text line=new Text();
  4.         public void map(Object key,Text value,Context context)
  5.                 throws IOException,InterruptedException{
  6.             line=value;
  7.             context.write(line, new Text(“”));
  8.         }
  9.     }
  10.     public static class Reduce extends Reducer<Text,Text,Text,Text>{
  11.         public void reduce(Text key,Iterable<Text> values,Context context)
  12.                 throws IOException,InterruptedException{
  13.             context.write(key, new Text(“”));
  14.         }
  15.     }
  16.     public static void main(String[] args) throws Exception{
  17.         Configuration conf = new Configuration();
  18.         System.setProperty(“HADOOP_USER_NAME”, “root”);
  19.         String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup1”};
  20.         Job job = new Job(conf, “Data Deduplication”);
  21.         job.setJarByClass(Dedup.class);
  22.         job.setMapperClass(Map.class);
  23.         job.setCombinerClass(Reduce.class);
  24.         job.setReducerClass(Reduce.class);
  25.         job.setOutputKeyClass(Text.class);
  26.         job.setOutputValueClass(Text.class);
  27.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  28.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  29.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  30.      }
  31. }

But I want to set a different (K2, V2) from (K3, V3). So I did the following change:

  1. public class Dedup{
  2.     public static class Map extends Mapper<Object,Text,Text,IntWritable>{   //Change Text to IntWritable
  3.         private static Text line=new Text();
  4.         public void map(Object key,Text value,Context context)
  5.                 throws IOException,InterruptedException{
  6.             line=value;
  7.             context.write(line, new IntWritable(0));    //Change Text(“”) to IntWritable(0)
  8.         }
  9.     }
  10.     public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{     //Change Text to IntWritable
  11.         public void reduce(Text key,Iterable<IntWritable> values,Context context)   //Change Text to IntWritable
  12.                 throws IOException,InterruptedException{
  13.             context.write(key, new Text(“”));
  14.         }
  15.     }
  16.     public static void main(String[] args) throws Exception{
  17.         Configuration conf = new Configuration();
  18.         System.setProperty(“HADOOP_USER_NAME”, “root”);
  19.         String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup2”};
  20.         Job job = new Job(conf, “Data Deduplication”);
  21.         job.setJarByClass(Dedup.class);
  22.         job.setMapperClass(Map.class);
  23.         job.setCombinerClass(Reduce.class);
  24.         job.setReducerClass(Reduce.class);
  25.         job.setOutputKeyClass(Text.class);
  26.         job.setOutputValueClass(Text.class);
  27.     job.setMapOutputValueClass(IntWritable.class);    //New added. To indicate the Mapper Output Value is not default Text, but IntWritable
  28.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  29.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  30.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  31.      }
  32. }

After the change, I run the code. It doesn’t generate the output. I found out that Reducer is not running. There is no error information in eclipse. I added the try…catch… to Reducer code:

  1.     public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{
  2.         public void reduce(Text key,Iterable<IntWritable> values,Context context)
  3.                 throws IOException,InterruptedException{
  4.             try{
  5.                 context.write(key, new Text(“”));
  6.                 System.out.println(“in reducer”);
  7.             }catch(Exception e){
  8.                 System.out.println(e);
  9.             }
  10.         }
  11.     }

The error information shows:
java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable

Google it, I found it is because that the Combiner shouldn’t be set. So I commented this line:
//job.setCombinerClass(Reduce.class);

Ding… The output came out. This debug took me 1 day to solve.

Summary:
Back to the original rule of Mapper and Reducer. It is like below:

(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );

list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);

list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );

But if you want a different (K2, V2) from (K3, V3), the CombinerClass shouldn’t be set. Comment it.