2016-07-17 2 views
2

Ich entwickelte ein mapReduce-Programm zum Zählen und Anmelden in einer Anfrage-Datei die Anzahl der Anfrage von 30 Minuten und das meist gesuchte Wort in diesem Zeitraum.MapReduce: zwei Splits (Ausführung der Kartenmethode) für eine Zeile Eingabedatei

Meine Eingabedatei:

01_11_2012 12_02_10 132.227.045.028 life 
02_11_2012 02_52_10 132.227.045.028 restaurent+kitchen 
03_11_2012 12_32_10 132.227.045.028 guitar+music 
04_11_2012 13_52_10 132.227.045.028 book+music 
05_11_2012 12_22_10 132.227.045.028 animal+life 
05_11_2012 12_22_10 132.227.045.028 history 

DD_MM_YYYY | HH_MM_SS | ip |

between 02h30 and 2h59 restaurent 1 
between 13h30 and 13h59 book 1 
between 12h00 and 12h29 life 3 
between 12h30 and 12h59 guitar 1 

Erste Zeile:: gesuchte Worte

sollen meine Ausgabedatei so etwas wie Diplay sucht Wort für den Zeitraum zwischen 02h30 restaurent die meisten ist und 2h59 und 1 repräsentieren die Anzahl der Anfrage.

Mein Problem ist, dass ich Redundent Map-Ausführung für die gleiche Zeile bekomme. Also teste ich das Programm mit folgender Eingabe (1 Zeile in meiner Datei).

01_11_2012 12_02_10 132.227.045.028 life

Wenn ich mit Eclipse Linie debuggen pro Zeile, einen Haltepunkt auf der folgenden Karte Linie setzen.

context.write(key, result); 

Mein Programm zweimal in dieser Zeile durchlaufen und zwei Mal die gleiche Information für die eindeutige Eingabezeile schreiben.

Ich bin an diesem Punkt fest und ich weiß nicht, warum ich 2 Kartenaufgabe bekomme, da ich nur eine Aufteilung in Bezug auf meine Eingabe haben sollte.

Das Programm ist wie folgt. (sorry für mein Englisch)

package fitec.lab.booble; 

import java.io.IOException; 
import java.util.Comparator; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.TreeMap; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

public class BoobleByMinutes { 

    public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> { 

     private final int TIME_INDEX = 1; 
     private final int WORDS_INDEX = 3; 

     @Override 
     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 

      String[] attributesTab = value.toString().split(" "); 

      Text reduceKey = new Text(); 
      Text words = new Text(); 

      String time = attributesTab[TIME_INDEX]; 
      String[] timeSplitted = time.split("_"); 

      String heures = timeSplitted[0]; 
      String minutes = timeSplitted[1]; 

      if (29 < Integer.parseInt(minutes)) { 
       reduceKey.set("entre " + heures + "h30 et " + heures + "h59"); 
      } else { 
       reduceKey.set("entre " + heures + "h00 et " + heures + "h29"); 
      } 
      words.set(attributesTab[WORDS_INDEX]); 
      context.write(reduceKey, words); 
     } 
    } 

    public static class PriceSumReducer extends Reducer<Text, Text, Text, Text> { 

     public void reduce(Text key, Iterable<Text> groupedWords, Context context) 
       throws IOException, InterruptedException { 
      Text result = new Text(); 
      int requestCount = 0; 
      Map<String, Integer> firstWordAndRequestCount = new HashMap<String, Integer>(); 
      for (Text words : groupedWords) { 
       ++requestCount; 
       String wordsString = words.toString().replace("+", "--"); 
       System.out.println(wordsString.toString()); 
       String[] wordTab = wordsString.split("--"); 
       for (String word : wordTab) { 

        if (firstWordAndRequestCount.containsKey(word)) { 
         Integer integer = firstWordAndRequestCount.get(word) + 1; 
         firstWordAndRequestCount.put(word, integer); 
        } else { 
         firstWordAndRequestCount.put(word, new Integer(1)); 
        } 
       } 
      } 

      ValueComparator valueComparator = new ValueComparator(firstWordAndRequestCount); 
      TreeMap<String, Integer> sortedProductsSale = new TreeMap<String, Integer>(valueComparator); 
      sortedProductsSale.putAll(firstWordAndRequestCount); 
      result.set(sortedProductsSale.firstKey() + "__" + requestCount); 
      context.write(key, result); 
     } 

     class ValueComparator implements Comparator<String> { 
      Map<String, Integer> base; 

      public ValueComparator(Map<String, Integer> base) { 
       this.base = base; 
      } 

      public int compare(String a, String b) { 
       if (base.get(a) >= base.get(b)) { 
        return -1; 
       } else { 
        return 1; 
       } 
      } 
     } 
    } 

    public static void main(String[] args) throws Exception { 

     Job job = new org.apache.hadoop.mapreduce.Job(); 
     job.setJarByClass(BoobleByMinutes.class); 
     job.setJobName("Booble mot le plus recherché et somme de requete par tranche de 30 minutes"); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 
     job.setJarByClass(BoobleByMinutes.class); 
     job.setMapperClass(TokenizerMapper.class); 
//  job.setCombinerClass(PriceSumReducer.class); 
     job.setReducerClass(PriceSumReducer.class); 

     job.setNumReduceTasks(1); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

@Radim whene i das Glas in reale hadoop mit Garn starten i bekommen Anzahl der Split = 2

i das Protokoll unter

setzen
16/07/18 02:56:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/07/18 02:56:40 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 
16/07/18 02:56:42 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
16/07/18 02:56:42 INFO input.FileInputFormat: Total input paths to process : 2 
16/07/18 02:56:43 INFO mapreduce.JobSubmitter: number of splits:2 
16/07/18 02:56:43 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1468802929497_0002 
16/07/18 02:56:44 INFO impl.YarnClientImpl: Submitted application application_1468802929497_0002 
16/07/18 02:56:44 INFO mapreduce.Job: The url to track the job: http://moussa:8088/proxy/application_1468802929497_0002/ 
16/07/18 02:56:44 INFO mapreduce.Job: Running job: job_1468802929497_0002 
16/07/18 02:56:56 INFO mapreduce.Job: Job job_1468802929497_0002 running in uber mode : false 
16/07/18 02:56:56 INFO mapreduce.Job: map 0% reduce 0% 
16/07/18 02:57:14 INFO mapreduce.Job: map 100% reduce 0% 
16/07/18 02:57:23 INFO mapreduce.Job: map 100% reduce 100% 
16/07/18 02:57:25 INFO mapreduce.Job: Job job_1468802929497_0002 completed successfully 
16/07/18 02:57:25 INFO mapreduce.Job: Counters: 49 
    File System Counters 
     FILE: Number of bytes read=66 
     FILE: Number of bytes written=352628 
     FILE: Number of read operations=0 
     FILE: Number of large read operations=0 
     FILE: Number of write operations=0 
     HDFS: Number of bytes read=278 
     HDFS: Number of bytes written=31 
     HDFS: Number of read operations=9 
     HDFS: Number of large read operations=0 
     HDFS: Number of write operations=2 
    Job Counters 
     Launched map tasks=2 
     Launched reduce tasks=1 
     Data-local map tasks=2 
     Total time spent by all maps in occupied slots (ms)=29431 
     Total time spent by all reduces in occupied slots (ms)=6783 
     Total time spent by all map tasks (ms)=29431 
     Total time spent by all reduce tasks (ms)=6783 
     Total vcore-milliseconds taken by all map tasks=29431 
     Total vcore-milliseconds taken by all reduce tasks=6783 
     Total megabyte-milliseconds taken by all map tasks=30137344 
     Total megabyte-milliseconds taken by all reduce tasks=6945792 
    Map-Reduce Framework 
     Map input records=2 
     Map output records=2 
     Map output bytes=56 
     Map output materialized bytes=72 
     Input split bytes=194 
     Combine input records=0 
     Combine output records=0 
     Reduce input groups=1 
     Reduce shuffle bytes=72 
     Reduce input records=2 
     Reduce output records=1 
     Spilled Records=4 
     Shuffled Maps =2 
     Failed Shuffles=0 
     Merged Map outputs=2 
     GC time elapsed (ms)=460 
     CPU time spent (ms)=2240 
     Physical memory (bytes) snapshot=675127296 
     Virtual memory (bytes) snapshot=5682606080 
     Total committed heap usage (bytes)=529465344 
    Shuffle Errors 
     BAD_ID=0 
     CONNECTION=0 
     IO_ERROR=0 
     WRONG_LENGTH=0 
     WRONG_MAP=0 
     WRONG_REDUCE=0 
    File Input Format Counters 
     Bytes Read=84 
    File Output Format Counters 
     Bytes Written=31 
+0

Sie in der Datei überprüfen können, ob Datei zwei Zeilen mit gleichzeitig 'grep hat‚12_02_10‘inputFile' ? –

+0

Versuchen Sie, es in echten Hadoop-Setup statt Eclipse ausführen, die einige unerwartete Dinge tun können. Ich würde etwas wie Sicherungskopie einer Datei vom Herausgeber verdächtigen, die Ihre Eingabe duplizieren kann. – Radim

+0

Ich habe den Befehl grep '12_02_10' myFile ausprobiert und gebe eine Zeile zurück. –

Antwort

0

Ich bekomme die Lösung von diesem Link: why is my sequence file being read twice in my hadoop mapper class?

Ich habe nicht gesehen, dass ich Gesamteingabe Pfade zu verarbeiten: 2 in meinem Protokoll. , wie sie in der Verbindung sagen, dass ich die Linie

FileInputFormat.addInputPath(job, new Path(args[0])); 

Ich habe nicht den Kommentar verstehen Kommentar brauchte nur „diese Zeile einfach Eingabe config anhängt zurück“ ist kann jemand erklären, bitte irgendwelche Ideen

geschätzt
1

In Ihrem main (Job) -Methode werden diese Zeilen dupliziert:

FileInputFormat.addInputPath(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 

auch: job.setJarByClass(BoobleByMinutes.class);

aber diese Zeile sollte verursacht die doppelte eingegeben werden: FileInputFormat.addInputPath(job, new Path(args[0]));

so sollte Ihre Hauptmethode sein:

public static void main(String[] args) throws Exception { 

     Job job = new org.apache.hadoop.mapreduce.Job(); 
     job.setJarByClass(BoobleByMinutes.class); 
     job.setJobName("Booble mot le plus recherché et somme de requete par tranche de 30 minutes"); 

     job.setMapperClass(TokenizerMapper.class); 
//  job.setCombinerClass(PriceSumReducer.class); 
     job.setReducerClass(PriceSumReducer.class); 

     job.setNumReduceTasks(1); 
     job.setMapOutputKeyClass(Text.class); 
     job.setMapOutputValueClass(Text.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     FileInputFormat.addInputPath(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
Verwandte Themen