2015-05-30 4 views
5

Ich habe einen Mapper, einen Reducer und eine Combiner-Klasse verwendet, aber ich erhalte den Fehler wie folgt:falscher Wert Klasse: Klasse org.apache.hadoop.io.Text ist keine Klasse org.apache.hadoop.io.IntWritable

java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable 
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:199) 
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1307) 
at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1623) 
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105) 
at BookPublished1$Combine.reduce(BookPublished1.java:47) 
at BookPublished1$Combine.reduce(BookPublished1.java:1) 
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) 
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1644) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1618) 
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1467) 
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699) 
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:769) 
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339) 
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:415) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) 
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157) 

Mein ganzes Programm sieht unter wie:

import java.io.IOException; 

import org.apache.hadoop.io.FloatWritable; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.fs.Path; 


public class BookPublished1 { 

    public static class Map extends Mapper<LongWritable,Text,Text,IntWritable>{ 

     public void map(LongWritable key, Text value,Context context) 
       throws IOException,InterruptedException { 

      String line = value.toString(); 
      String [] strYear = line.split(";"); 
      context.write(new Text(strYear[3]), new IntWritable(1)); 
      } 


     } 


    public static class Combine extends Reducer<Text,IntWritable,Text,Text>{ 

     public void reduce(Text key, Iterable<IntWritable> values,Context context) 
       throws IOException,InterruptedException { 
      int sum=0; 
      // TODO Auto-generated method stub 
      for(IntWritable x: values) 
      { 
       sum+=x.get(); 
      } 



      context.write(new Text("BookSummary"), new Text(key + "_"+ sum)); 

     } 

    } 
public static class Reduce extends Reducer<Text,Text,Text,FloatWritable>{ 

     public void reduce(Text key, Iterable<Text> values,Context context)throws IOException,InterruptedException 
      { 
      Long publishYear =0L, max=Long.MAX_VALUE; 
          Text publishYear1 = null,maxYear=null; 
          Long publishValue= 0L; 
      String compositeString; 
      String compositeStringArray[]; 
      // TODO Auto-generated method stub 
      for(Text x: values) 
      { 
               compositeString = x.toString(); 
       compositeStringArray = compositeString.split("_"); 
       publishYear1=new Text(compositeStringArray[0]); 
       publishValue=new Long(compositeStringArray[1]); 
       if(publishValue > max){ 
       max=publishValue; 
       maxYear=publishYear1; 

      } 
      } 
     Text keyText= new Text("max" + " (" + maxYear.toString() + ") : "); 

      context.write(keyText, new FloatWritable(max)); 



    } 
} 


    public static void main(String[] args) throws Exception { 
     Configuration conf= new Configuration(); 
     Job job = new Job(conf,"BookPublished"); 

     job.setJarByClass(BookPublished1.class); 
     job.setMapperClass(Map.class); 
     job.setReducerClass(Reduce.class); 
     job.setCombinerClass(Combine.class); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(IntWritable.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(FloatWritable.class); 
     job.setInputFormatClass(TextInputFormat.class); 
     job.setOutputFormatClass(TextOutputFormat.class); 



     Path outputPath = new Path(args[1]);  
       FileInputFormat.addInputPath(job, new Path(args[0])); 
       FileOutputFormat.setOutputPath(job, new Path(args[1])); 


     outputPath.getFileSystem(conf).delete(outputPath); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 

} 

mir bitte mit der Auflösung helfen.

Antwort

7

Ausgabetypen eines Combiners müssen Ausgabetypen eines Mappers übereinstimmen. Hadoop gibt keine Garantie, wie oft der Combiner angewendet wird oder überhaupt angewendet wird. Und genau das passiert in Ihrem Fall.

Werte von Karte (<Text, IntWritable>) gehen direkt zum Reduzieren, wo die Typen <Text, Text> erwartet werden.

+0

Danke vanekjar .. –

Verwandte Themen