2013-07-26 13 views
9

Ich habe riesige Nr. von kleinen Dateien möchte ich CombineFileInputFormat verwenden, um die Dateien so zusammenzuführen, dass jede Datei als ein einzelner Datensatz in meinem MR-Job erscheint. Ich habe http://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html gefolgt und versuchte, sie in die neue apiDatei als einzelnen Datensatz in Hadoop lesen

Ich bin vor zwei Probleme zu konvertieren:

a) Ich teste es nur mit zwei kleinen Dateien, noch 2 Mapper abgefeuert werden. Ich erwartete 1

b) Jede Zeile als einzelner Datensatz kommt, ich will die ganze Datei als einzelner Datensatz.

Es kann schmerzhaft sein, aber bitte unter dem Code suchen. Ich bin immer noch ein naiver in hadoop

Der Fahrer Klasse

public class MRDriver extends Configured implements Tool { 


@Override 
public int run(String[] args) throws Exception { 
    FileSystem fs = new Path(".").getFileSystem(getConf()); 
    fs.printStatistics(); 
    Job job = new Job(getConf()); 
    job.setJobName("Enron MR"); 
    job.setMapperClass(EnronMailReadMapper.class); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(IntWritable.class); 
    job.setNumReduceTasks(0); 
    job.setJarByClass(EnronMailReadMapper.class); 
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0])); 
    job.setOutputFormatClass(TextOutputFormat.class); 
    TextOutputFormat.setOutputPath(job, new Path(args[1])); 
    return job.waitForCompletion(true) ? 0 :1; 
} 

public static void main(String[] args) throws Exception { 
    int exitCode = ToolRunner.run(new MRDriver(), args); 
    System.exit(exitCode); 
} 

}

Die unten Klasse meist kopieren ist Paste aus LineRecordReader mit Modifikation zu initialisieren() & NextKeyValue() Funktion

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> { 
    private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class); 

    private long start; 
    private long pos; 
    private long end; 
    private LineReader in; 
    private int maxLineLength; 
    private LongWritable key = null; 
    private Text value = null; 

    public void initialize(InputSplit genericSplit, 
         TaskAttemptContext context) throws IOException { 
    FileSplit split = (FileSplit) genericSplit; 
    Configuration job = context.getConfiguration(); 
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", 
            Integer.MAX_VALUE); 
    start = split.getStart(); 
    end = start + split.getLength(); 
    final Path file = split.getPath(); 

    // open the file and seek to the start of the split 
    FileSystem fs = file.getFileSystem(job); 
    FSDataInputStream fileIn = fs.open(split.getPath()); 

     fileIn.seek(start); 
     in = new LineReader(fileIn, job); 
    // If this is not the first split, we always throw away first record 
    // because we always (except the last split) read one extra line in 
    // next() method. 
    if (start != 0) { 
     start += in.readLine(new Text(), 0, maxBytesToConsume(start)); 
    } 
    this.pos = start; 
    } 

    private int maxBytesToConsume(long pos) { 
    return (int) Math.min(Integer.MAX_VALUE, end - pos); 
    } 

    private long getFilePosition() throws IOException { 
    long retVal= pos; 
    return retVal; 
    } 

    public boolean nextKeyValue() throws IOException { 
    if (key == null) { 
     key = new LongWritable(); 
    } 
    key.set(pos); 
    if (value == null) { 
     value = new Text(); 
    } 
    int newSize = 0; 
    StringBuffer totalValue = new StringBuffer(); 
    // We always read one extra line, which lies outside the upper 
    // split limit i.e. (end - 1) 
    while (getFilePosition() <= end) { 
     newSize = in.readLine(value, maxLineLength, 
      Math.max(maxBytesToConsume(pos), maxLineLength)); 
     if (newSize == 0) { 
     break; 
     } 
     totalValue.append(value.toString()+"\n"); 
     pos += newSize; 
     if (newSize < maxLineLength) { 
     break; 
     } 

     // line too long. try again 
     LOG.info("Skipped line of size " + newSize + " at pos " + 
       (pos - newSize)); 
    } 
    if (newSize == 0) { 
     key = null; 
     value = null; 
     return false; 
    } else { 
     value = new Text(totalValue.toString()); 
     return true; 
    } 
    } 

    @Override 
    public LongWritable getCurrentKey() { 
    return key; 
    } 

    @Override 
    public Text getCurrentValue() { 
    return value; 
    } 

    /** 
    * Get the progress within the split 
    */ 
    public float getProgress() throws IOException { 
    if (start == end) { 
     return 0.0f; 
    } else { 
     return Math.min(1.0f, 
     (getFilePosition() - start)/(float)(end - start)); 
    } 
    } 

    public synchronized void close() throws IOException { 
    try { 
     if (in != null) { 
     in.close(); 
     } 
    } finally { 

    } 
    } 

}

Ot ihre Dateien

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{ 

@Override 
public RecordReader<LongWritable, Text> createRecordReader(
     InputSplit split, TaskAttemptContext context) throws IOException { 
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class); 
} 

}

Und

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > { 

private CombineFileSplit split; 
private TaskAttemptContext context; 
private int index; 
private RecordReader< LongWritable, Text > rr; 

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) { 
    this.split = split; 
    this.context = context; 
    this.index = index; 
    this.rr = new SingleFileRecordReader(); 
} 
@Override 
public void initialize(InputSplit split, TaskAttemptContext context) 
     throws IOException, InterruptedException { 
    this.split = (CombineFileSplit) split; 
     this.context = context; 

     if (null == rr) { 
     rr = new SingleFileRecordReader(); 
     } 

     FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
              this.split.getOffset(index), 
              this.split.getLength(index), 
              this.split.getLocations()); 
     this.rr.initialize(fileSplit, this.context); 

} 

@Override 
public boolean nextKeyValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.nextKeyValue(); 
} 

@Override 
public LongWritable getCurrentKey() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentKey(); 
} 

@Override 
public Text getCurrentValue() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getCurrentValue(); 
} 

@Override 
public float getProgress() throws IOException, InterruptedException { 
    // TODO Auto-generated method stub 
    return this.rr.getProgress(); 
} 

@Override 
public void close() throws IOException { 
    if (rr != null) { 
      rr.close(); 
      rr = null; 
    }  
} 

}

+0

Sie benötigen die maximale Spaltgröße einzustellen, bis zu dem die Dateien kombiniert sind. Sie können das in der Dokumentation des 'CombineFileInputFormat' lesen. –

Antwort

10

einen Blick in diesen Eingang format.This Nehmen Sie ein Eingabeformat für mehrere Dateien in einer einzigen Karte Aufgabe zu lesen. Genau eine (nicht geteilte) Datei wird von jedem Datensatz gelesen, der an den Mapper übergeben wird. Der WholeFileRecordReader sorgt dafür, dass ein Dateiinhalt als ein Wert gesendet wird. Der zurückgegebene Schlüssel ist NullWritable und Wert ist der Inhalt jeder Datei als Ganzes. Jetzt können Sie das verwenden und Ihren MapReduce-Job ausführen und sehen, wie viele Mapper tatsächlich laufen und prüfen, ob die Ausgabe korrekt ist oder nicht.

Datensätze werden von WholeFileRecordReaders konstruiert.

public class WholeFileInputFormat extends CombineFileInputFormat<NullWritable, Text>{ 

     @Override 
     protected boolean isSplitable(JobContext context, Path file) { 
      return false; 
     } 

/** 
    * Creates a CombineFileRecordReader to read each file assigned to this InputSplit. 
    * Note, that unlike ordinary InputSplits, split must be a CombineFileSplit, and therefore 
    * is expected to specify multiple files. 
    * 
    * @param split The InputSplit to read. Throws an IllegalArgumentException if this is 
    *  not a CombineFileSplit. 
    * @param context The context for this task. 
    * @return a CombineFileRecordReader to process each file in split. 
    *   It will read each file with a WholeFileRecordReader. 
    * @throws IOException if there is an error. 
    */ 

    @Override 
    public RecordReader<NullWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException { 

     if (!(split instanceof CombineFileSplit)) { 
       throw new IllegalArgumentException("split must be a CombineFileSplit"); 
      } 
      return new CombineFileRecordReader<NullWritable, Text>((CombineFileSplit) split, context, WholeFileRecordReader.class); 
    } 

    } 

Hier oben können Sie WholeFileRecordReader verwendet wird, das wie folgt lautet: -

public class WholeFileRecordReader extends RecordReader<NullWritable, Text> { 
    private static final Logger LOG = Logger.getLogger(WholeFileRecordReader.class); 

     /** The path to the file to read. */ 
     private final Path mFileToRead; 
     /** The length of this file. */ 
     private final long mFileLength; 

     /** The Configuration. */ 
     private final Configuration mConf; 

     /** Whether this FileSplit has been processed. */ 
     private boolean mProcessed; 
     /** Single Text to store the file name of the current file. */ 
    // private final Text mFileName; 
     /** Single Text to store the value of this file (the value) when it is read. */ 
     private final Text mFileText; 

     /** 
     * Implementation detail: This constructor is built to be called via 
     * reflection from within CombineFileRecordReader. 
     * 
     * @param fileSplit The CombineFileSplit that this will read from. 
     * @param context The context for this task. 
     * @param pathToProcess The path index from the CombineFileSplit to process in this record. 
     */ 
     public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext context, 
      Integer pathToProcess) { 
     mProcessed = false; 
     mFileToRead = fileSplit.getPath(pathToProcess); 
     mFileLength = fileSplit.getLength(pathToProcess); 
     mConf = context.getConfiguration(); 

     assert 0 == fileSplit.getOffset(pathToProcess); 
     if (LOG.isDebugEnabled()) { 
      LOG.debug("FileToRead is: " + mFileToRead.toString()); 
      LOG.debug("Processing path " + pathToProcess + " out of " + fileSplit.getNumPaths()); 

      try { 
      FileSystem fs = FileSystem.get(mConf); 
      assert fs.getFileStatus(mFileToRead).getLen() == mFileLength; 
      } catch (IOException ioe) { 
      // oh well, I was just testing. 
      } 
     } 

    // mFileName = new Text(); 
     mFileText = new Text(); 
     } 

     /** {@inheritDoc} */ 
     @Override 
     public void close() throws IOException { 
     mFileText.clear(); 
     } 

     /** 
     * Returns the absolute path to the current file. 
     * 
     * @return The absolute path to the current file. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public NullWritable getCurrentKey() throws IOException, InterruptedException { 
     return NullWritable.get(); 
     } 

     /** 
     * <p>Returns the current value. If the file has been read with a call to NextKeyValue(), 
     * this returns the contents of the file as a BytesWritable. Otherwise, it returns an 
     * empty BytesWritable.</p> 
     * 
     * <p>Throws an IllegalStateException if initialize() is not called first.</p> 
     * 
     * @return A BytesWritable containing the contents of the file to read. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public Text getCurrentValue() throws IOException, InterruptedException { 
     return mFileText; 
     } 

     /** 
     * Returns whether the file has been processed or not. Since only one record 
     * will be generated for a file, progress will be 0.0 if it has not been processed, 
     * and 1.0 if it has. 
     * 
     * @return 0.0 if the file has not been processed. 1.0 if it has. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public float getProgress() throws IOException, InterruptedException { 
     return (mProcessed) ? (float) 1.0 : (float) 0.0; 
     } 

     /** 
     * All of the internal state is already set on instantiation. This is a no-op. 
     * 
     * @param split The InputSplit to read. Unused. 
     * @param context The context for this task. Unused. 
     * @throws IOException never. 
     * @throws InterruptedException never. 
     */ 
     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) 
      throws IOException, InterruptedException { 
     // no-op. 
     } 

     /** 
     * <p>If the file has not already been read, this reads it into memory, so that a call 
     * to getCurrentValue() will return the entire contents of this file as Text, 
     * and getCurrentKey() will return the qualified path to this file as Text. Then, returns 
     * true. If it has already been read, then returns false without updating any internal state.</p> 
     * 
     * @return Whether the file was read or not. 
     * @throws IOException if there is an error reading the file. 
     * @throws InterruptedException if there is an error. 
     */ 
     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
     if (!mProcessed) { 
      if (mFileLength > (long) Integer.MAX_VALUE) { 
      throw new IOException("File is longer than Integer.MAX_VALUE."); 
      } 
      byte[] contents = new byte[(int) mFileLength]; 

      FileSystem fs = mFileToRead.getFileSystem(mConf); 
      FSDataInputStream in = null; 
      try { 
      // Set the contents of this file. 
      in = fs.open(mFileToRead); 
      IOUtils.readFully(in, contents, 0, contents.length); 
      mFileText.set(contents, 0, contents.length); 

      } finally { 
      IOUtils.closeStream(in); 
      } 
      mProcessed = true; 
      return true; 
     } 
     return false; 
     } 

} 

Das Folgende ist Ihre Treiber-Code: -

public int run(String[] arg) throws Exception { 
    Configuration conf=getConf(); 
    FileSystem fs = FileSystem.get(conf); 
    //estimate reducers 
    Job job = new Job(conf); 
    job.setJarByClass(WholeFileDriver.class); 
    job.setJobName("WholeFile"); 
    job.setOutputKeyClass(Text.class); 
    job.setOutputValueClass(Text.class); 
    job.setInputFormatClass(WholeFileInputFormat.class); 
    job.setMapperClass(WholeFileMapper.class); 
    job.setNumReduceTasks(0); 

    FileInputFormat.addInputPath(job, new Path(arg[0])); 
    Path output=new Path(arg[1]); 
    try { 
     fs.delete(output, true); 
    } catch (IOException e) { 
     LOG.warn("Failed to delete temporary path", e); 
    } 
    FileOutputFormat.setOutputPath(job, output); 

    boolean ret=job.waitForCompletion(true); 
    if(!ret){ 
     throw new Exception("Job Failed"); 
    } 
+0

danke herr. binary ... Ich hatte job.setInputFormatClass (WholeFileInputFormat.class) nicht hinzugefügt. Ich fragte mich, ob RawCombineFileInputFormat.addInputPath hinzugefügt wurde (Job, neuer Pfad (args [0])); Warum wurde der Formattyp nicht abgeholt? Irgendwelche ans? –

Verwandte Themen