2017-09-25 2 views
1

Ich versuche, Big Data Mapreduce zu üben, indem ich das Movie-Empfehlungssystem mache. Mein Code:Mapreduce-Code verstehen

*imports 



public class MRS { 
    public static class Map extends Mapper<LongWritable, Text, Text, Text> { 
     public void map(LongWritable key, Text value, Context con) 
       throws IOException, InterruptedException { 
      String line = value.toString(); 

      StringTokenizer token = new StringTokenizer(line); 

     while(token.hasMoreTokens()){ 
      String userId = token.nextToken(); 
      String movieId = token.nextToken(); 
      String ratings =token.nextToken(); 
      token.nextToken(); 
      con.write(new Text(userId), new Text(movieId + "," + ratings)); 
     } 

    } 
} 

public static class Reduce extends 
     Reducer<Text, IntWritable, Text, Text> { 
    public void reduce(Text key, Iterable<Text> value,Context con) throws IOException, InterruptedException{ 
     int item_count=0; 
     int item_sum =0; 
     String result="["; 
     for(Text t : value){ 
      String s = t.toString(); 
      StringTokenizer token = new StringTokenizer(s,","); 
      while(token.hasMoreTokens()){ 
      token.nextToken(); 
      item_sum=item_sum+Integer.parseInt(token.nextToken()); 
      item_count++; 
      } 
      result=result+"("+s+"),"; 


     } 
     result=result.substring(0, result.length()-1); 
     result=result+"]"; 
     result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result; 

     con.write(key, new Text(result)); 
    } 
} 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration con = new Configuration(); 
    Job job = new Job(con,"Movie Recommendation"); 

    job.setJarByClass(MRS.class); 


    job.setMapperClass(Map.class); 
    job.setCombinerClass(Reduce.class); 
    job.setReducerClass(Reduce.class); 


    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 


    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 


    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 


    System.exit(job.waitForCompletion(true) ? 0 : 1); 

} 

} 

Ich bin mit dem Movielens-Datensatz aus here

Davon Eingabedatei u.data ist

und meine Leistung nach diesem Code ausgeführt wird, soll wie

userId seine ITEM_COUNT , Item_sum, [Liste der Film_ID mit Bewertung]

Allerdings bekomme ich diese

99 173,4 
99 288,4 
99 66,3 
99 203,4 
99 105,2 
99 12,5 
99 1,4 
99 741,3 
99 895,3 
99 619,4 
99 742,5 
99 294,4 
99 196,4 
99 328,4 
99 120,2 
99 246,3 
99 232,4 
99 181,5 
99 201,3 
99 978,3 
99 123,3 
99 433,4 
99 345,3 

Dies sollte die Ausgabe der Karte Klasse

Antwort

0

ich einige Anpassung an den Code vorgenommen und es gibt mir die genaue erwartete Ergebnis. Hier ist mein neuer Code

Importe *

public class MRS { 
public static class Map extends 
     Mapper<LongWritable, Text, IntWritable, Text> { 
    public void map(LongWritable key, Text value, Context con) 
      throws IOException, InterruptedException { 
     String line = value.toString(); 
     String[] s = line.split("\t"); 
     StringTokenizer token = new StringTokenizer(line); 

     while (token.hasMoreTokens()) { 
      IntWritable userId = new IntWritable(Integer.parseInt(token 
        .nextToken())); 
      String movieId = token.nextToken(); 
      String ratings = token.nextToken(); 
      token.nextToken(); 
      con.write(userId, new Text(movieId + "," + ratings)); 
     } 

    } 
} 

public static class Reduce extends 
     Reducer<IntWritable, Text, IntWritable, Text> { 
    public void reduce(IntWritable key, Iterable<Text> value, Context con) 
      throws IOException, InterruptedException { 
     int item_count = 0; 
     int item_sum = 0; 
     String result = ""; 
     for (Text t : value) { 
      String s = t.toString(); 
      StringTokenizer token = new StringTokenizer(s, ","); 

      result = result + "[" + s + "],"; 

     } 
     result = result.substring(1, result.length() - 2); 

     System.out.println(result); 
     con.write(key, new Text(result)); 
    } 
} 

public static void main(String[] args) throws IOException, 
     ClassNotFoundException, InterruptedException { 
    Configuration con = new Configuration(); 
    Job job = new Job(con, "Movie Recommendation"); 

    job.setJarByClass(MRS.class); 

    job.setMapperClass(Map.class); 
    job.setCombinerClass(Reduce.class); 
    job.setReducerClass(Reduce.class); 

    job.setOutputKeyClass(IntWritable.class); 
    job.setOutputValueClass(Text.class); 

    job.setInputFormatClass(TextInputFormat.class); 
    job.setOutputFormatClass(TextOutputFormat.class); 

    FileInputFormat.addInputPath(job, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job, new Path(args[1])); 

    System.exit(job.waitForCompletion(true) ? 0 : 1); 

} 

} 

, was ich ändern ist Treibercode

job.setOutputKeyClass(IntWritable.class); 

Mapper Code

Mapper<LongWritable, Text, IntWritable, Text> 

Reducer Code

public static class Reduce extends 
    Reducer<Text, IntWritable, Text, Text> { 
    public void reduce(Text key, Iterable<Text> value,Context con) throws 
IOException, InterruptedException{ 

ich glaube, das Problem war, dass die outputkey und Outputdaten, um die Mapper-Klasse das ist, ist passend, warum es Mapper druckt und nicht einmal executng Minderer

mich korrigieren, wenn ich falsch bin.