2013-07-10 1 views
5

Ich führe eine Map-Aufgabe auf einer kleinen Datei (3-4 MB) aus, aber die Kartenausgabe ist relativ groß (150 MB). Nachdem die Karte 100% angezeigt wurde, dauert es lange, bis der Überlauf beendet ist. Bitte schlagen Sie vor, wie ich diesen Zeitraum reduzieren kann. Im Folgenden sind einige Beispiele für Protokolle ..."Starten der Räumung der Kartenausgabe" dauert sehr lange in der Hadoop-Map-Aufgabe

13/07/10 17:45:31 INFO mapred.MapTask: Starting flush of map output 
13/07/10 17:45:32 INFO mapred.JobClient: map 98% reduce 0% 
13/07/10 17:45:34 INFO mapred.LocalJobRunner: 
13/07/10 17:45:35 INFO mapred.JobClient: map 100% reduce 0% 
13/07/10 17:45:37 INFO mapred.LocalJobRunner: 
13/07/10 17:45:40 INFO mapred.LocalJobRunner: 
13/07/10 17:45:43 INFO mapred.LocalJobRunner: 
13/07/10 17:45:46 INFO mapred.LocalJobRunner: 
13/07/10 17:45:49 INFO mapred.LocalJobRunner: 
13/07/10 17:45:52 INFO mapred.LocalJobRunner: 
13/07/10 17:45:55 INFO mapred.LocalJobRunner: 
13/07/10 17:45:58 INFO mapred.LocalJobRunner: 
13/07/10 17:46:01 INFO mapred.LocalJobRunner: 
13/07/10 17:46:04 INFO mapred.LocalJobRunner: 
13/07/10 17:46:07 INFO mapred.LocalJobRunner: 
13/07/10 17:46:10 INFO mapred.LocalJobRunner: 
13/07/10 17:46:13 INFO mapred.LocalJobRunner: 
13/07/10 17:46:16 INFO mapred.LocalJobRunner: 
13/07/10 17:46:19 INFO mapred.LocalJobRunner: 
13/07/10 17:46:22 INFO mapred.LocalJobRunner: 
13/07/10 17:46:25 INFO mapred.LocalJobRunner: 
13/07/10 17:46:28 INFO mapred.LocalJobRunner: 
13/07/10 17:46:31 INFO mapred.LocalJobRunner: 
13/07/10 17:46:34 INFO mapred.LocalJobRunner: 
13/07/10 17:46:37 INFO mapred.LocalJobRunner: 
13/07/10 17:46:40 INFO mapred.LocalJobRunner: 
13/07/10 17:46:43 INFO mapred.LocalJobRunner: 
13/07/10 17:46:46 INFO mapred.LocalJobRunner: 
13/07/10 17:46:49 INFO mapred.LocalJobRunner: 
13/07/10 17:46:52 INFO mapred.LocalJobRunner: 
13/07/10 17:46:55 INFO mapred.LocalJobRunner: 
13/07/10 17:46:58 INFO mapred.LocalJobRunner: 
13/07/10 17:47:01 INFO mapred.LocalJobRunner: 
13/07/10 17:47:04 INFO mapred.LocalJobRunner: 
13/07/10 17:47:07 INFO mapred.LocalJobRunner: 
13/07/10 17:47:10 INFO mapred.LocalJobRunner: 
13/07/10 17:47:13 INFO mapred.LocalJobRunner: 
13/07/10 17:47:16 INFO mapred.LocalJobRunner: 
13/07/10 17:47:19 INFO mapred.LocalJobRunner: 
13/07/10 17:47:22 INFO mapred.LocalJobRunner: 
13/07/10 17:47:25 INFO mapred.LocalJobRunner: 
13/07/10 17:47:28 INFO mapred.LocalJobRunner: 
13/07/10 17:47:31 INFO mapred.LocalJobRunner: 
13/07/10 17:47:34 INFO mapred.LocalJobRunner: 
13/07/10 17:47:37 INFO mapred.LocalJobRunner: 
13/07/10 17:47:40 INFO mapred.LocalJobRunner: 
13/07/10 17:47:43 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.MapTask: Finished spill 0 
13/07/10 17:47:45 INFO mapred.Task: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting 
13/07/10 17:47:45 INFO mapred.LocalJobRunner: 
13/07/10 17:47:45 INFO mapred.Task: Task 'attempt_local_0003_m_000000_0' done. 
............................... 
............................... 
............................... 
13/07/10 17:47:52 INFO mapred.JobClient: Counters: 22 
13/07/10 17:47:52 INFO mapred.JobClient: File Output Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Written=13401245 
13/07/10 17:47:52 INFO mapred.JobClient: FileSystemCounters 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_READ=18871098 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_READ=7346566 
13/07/10 17:47:52 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=35878426 
13/07/10 17:47:52 INFO mapred.JobClient:  HDFS_BYTES_WRITTEN=18621307 
13/07/10 17:47:52 INFO mapred.JobClient: File Input Format Counters 
13/07/10 17:47:52 INFO mapred.JobClient:  Bytes Read=2558288 
13/07/10 17:47:52 INFO mapred.JobClient: Map-Reduce Framework 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input groups=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output materialized bytes=13320006 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map input records=71040 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce shuffle bytes=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce output records=740000 
13/07/10 17:47:52 INFO mapred.JobClient:  Spilled Records=1480000 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output bytes=119998400 
13/07/10 17:47:52 INFO mapred.JobClient:  CPU time spent (ms)=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Total committed heap usage (bytes)=1178009600 
13/07/10 17:47:52 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/07/10 17:47:52 INFO mapred.JobClient:  Combine input records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  Map output records=7499900 
13/07/10 17:47:52 INFO mapred.JobClient:  SPLIT_RAW_BYTES=122 
13/07/10 17:47:52 INFO mapred.JobClient:  Reduce input records=740000 

Map Task-Quellcode:

public class GsMR2MapThree extends Mapper<Text, Text, LongWritable,DoubleWritable>{ 

    private DoubleWritable distGexpr = new DoubleWritable(); 
    private LongWritable m2keyOut = new LongWritable(); 
    int trMax,tstMax; 

    protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { 

     Configuration conf =context.getConfiguration(); 
     tstMax = conf.getInt("mtst", 10); 
     trMax = conf.getInt("mtr", 10); 

    } 

    public void map(Text key, Text values, Context context) throws IOException, InterruptedException { 
     String line = values.toString(); 

     double Tij=0.0,TRij=0.0, dist=0; 
     int i=0,j; 
     long m2key=0; 
     String[] SLl = new String[]{}; 

     Configuration conf =context.getConfiguration(); 

     m2key = Long.parseLong(key.toString()); 
     StringTokenizer tokenizer = new StringTokenizer(line); 
     j=0; 
     while (tokenizer.hasMoreTokens()) { 

      String test = tokenizer.nextToken(); 
      if(j==0){ 
       Tij = Double.parseDouble(test); 
      } 
      else if(j==1){ 
       TRij = Double.parseDouble(test); 
      } 
      else if(j==2){ 
       SLl = StringUtils.split(conf.get(test),","); 
      } 
      j++; 
     } 
     //Map input ends 

     //Distance Measure function 
     dist = (long)Math.pow((Tij - TRij), 2); 

     //remove gid from key 
     m2key = m2key/100000; 
     //Map2 <key,value> emit starts 
     for(i=0; i<SLl.length;i++){ 
       long m2keyNew = (Integer.parseInt(SLl[i])*(trMax*tstMax))+m2key; 
      m2keyOut.set(m2keyNew); 
      distGexpr.set(dist); 
      context.write(m2keyOut,distGexpr); 
     } 
     //<key,value> emit done 
    } 

} 

Sample Map Input: Die letzte Variable in jeder Zeile erhalten eine ganze Reihe von Broadcast-Variablen. Jede Zeile erzeugt ungefähr 100-200 Ausgangsdatensätze.

10100014 1356.3238 1181.63 gs-4-56 
10100026 3263.1167 3192.4131 gs-3-21 
10100043 1852.0 1926.3962 gs-4-76 
10100062 1175.5925 983.47125 gs-3-19 
10100066 606.59125 976.26625 gs-8-23 

Sample Map Ausgang:

10101 8633.0 
10102 1822.0 
10103 13832.0 
10104 2726470.0 
10105 1172991.0 
10107 239367.0 
10109 5410384.0 
10111 7698352.0 
10112 6.417 
+1

Können Sie Ihren Mapper-Code (oder zumindest eine Beschreibung dessen, was Ihr Mapper funktional macht), einen Beispiel-Input-Record und einen Output-Record (s) veröffentlichen? Haben Sie eine Reinigungsmethode? –

+0

Danke für Ihre Antwort. Ich habe den Quellcode für diese Map-Aufgabe hinzugefügt und die Eingabe und Ausgabe der Samples durchgeführt. Ich habe keine Reinigungsmethode benutzt. Tatsächlich gab es früher viele Flecken. Also habe ich io.sort.record.percent und einige andere Einstellungen geändert. Dann werden Überläufe minimiert, aber die gesamte Ausführungszeit blieb gleich. –

Antwort

0

Ich nehme an, Sie gelöst haben, dass (2 Jahre nach der ursprünglichen Nachricht Posting), aber nur für jeden, der in das gleiche Problem tritt, werde ich versuchen, einige Vorschläge machen.

Ausgehend von Ihren Zählern verstehe ich, dass Sie bereits die Komprimierung verwenden (da die Anzahl der materialisierten Bytes der Kartenausgabe von der Anzahl der Kartenausgabebytes abweicht), was eine gute Sache ist. Sie können die Ausgabe des Mappers weiter komprimieren, indem Sie die codierte VLongWritable Klasse mit variabler Länge als Map-Ausgabe-Schlüsseltyp verwenden. (Es gab früher auch eine VDoubleWritable-Klasse, wenn ich mich nicht irre, aber inzwischen muss es veraltet sein).

In der for-Schleife, in der Sie die Ausgabe ausgeben, müssen Sie die Variable distGexpr nicht jedesmal setzen. Es ist immer das Gleiche, also setze es direkt vor der for-Schleife. Sie können auch eine lange mit dem Produkt von trMax*tstMax außerhalb der Schleife speichern und nicht bei jeder Iteration berechnen.

Wenn möglich, machen Sie Ihre Eingabe-Taste LongWritable (aus dem vorherigen Job), so dass Sie die Long.parseLong() und die Aufrufe speichern können.

Wenn möglich (abhängig von Ihrem Reduzierer), verwenden Sie einen Kombinierer, um die Größe der verschütteten Bytes zu reduzieren.

Ich konnte keine Möglichkeit finden, den Integer.parseInt() Aufruf innerhalb der for-Schleife zu überspringen, aber es würde etwas Zeit sparen, wenn Sie zuerst SLl als int[] laden könnten.

Verwandte Themen