2013-09-05 7 views
6

Ich möchte ein benutzerdefiniertes InputFormat erstellen können, das Sequenzdateien liest, aber zusätzlich den Dateipfad und den Offset innerhalb der Datei anzeigt, in der sich der Datensatz befindet.SequenceFileInputFormat erweitern, um Dateiname + Offset einzubeziehen

Um einen Schritt zurückzukommen, hier ist der Anwendungsfall: Ich habe eine Sequenzdatei mit Daten unterschiedlicher Größe. Die Schlüssel sind meistens irrelevant, und die Werte sind bis zu ein paar Megabyte, die eine Vielzahl von verschiedenen Feldern enthalten. Ich möchte einige dieser Felder in elasticsearch zusammen mit dem Dateinamen und dem Offset indizieren. Auf diese Weise kann ich diese Felder von elasticsearch abfragen und dann den Dateinamen und den Versatz verwenden, um zur Sequenzdatei zurückzukehren und den ursprünglichen Datensatz zu erhalten, anstatt das Ganze in ES zu speichern.

Ich habe diesen ganzen Prozess als ein einzelnes Java-Programm arbeiten. Die SequenceFile.Reader-Klasse gibt praktischerweise getPosition und seek Methoden, um dies zu ermöglichen.

Es wird jedoch wahrscheinlich viele Terabytes an Daten geben, daher muss ich dies in einen MapReduce-Job konvertieren (wahrscheinlich nur Map-only). Da die eigentlichen Schlüssel in der Sequenzdatei irrelevant sind, würde ich mit dem Ansatz ein benutzerdefiniertes InputFormat erstellen, das das SequenceFileInputFormat erweitert oder irgendwie verwendet, aber anstelle der eigentlichen Schlüssel einen zusammengesetzten Schlüssel zurückgibt, der aus der Datei besteht und Offset.

Dies erweist sich jedoch in der Praxis als schwieriger. Es scheint, als ob es möglich sein sollte, aber angesichts der tatsächlichen APIs und was ausgesetzt ist, ist es schwierig. Irgendwelche Ideen? Vielleicht sollte ich einen alternativen Ansatz wählen?

Antwort

5

Falls jemand auf ein ähnliches Problem stößt, hier ist die Lösung, die ich gefunden habe. Ich habe einfach einen Teil des Codes in SequenceFileInputFormat/RecordReader dupliziert und modifiziert. Ich hatte gehofft, entweder eine Unterklasse oder ein Dekorateur oder etwas zu schreiben ... diese Art und Weise ist nicht schön, aber es funktioniert:

SequenceFileOffsetInputFormat.java:

import java.io.IOException; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.SequenceFile; 
import org.apache.hadoop.io.Writable; 
import org.apache.hadoop.mapreduce.InputSplit; 
import org.apache.hadoop.mapreduce.JobContext; 
import org.apache.hadoop.mapreduce.RecordReader; 
import org.apache.hadoop.mapreduce.TaskAttemptContext; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> { 

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> { 

     private SequenceFile.Reader in; 
     private long start; 
     private long end; 
     private boolean more = true; 
     private PathOffsetWritable key = null; 
     private Writable k = null; 
     private V value = null; 
     private Configuration conf; 

     @Override 
     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
      FileSplit fileSplit = (FileSplit) split; 
      conf = context.getConfiguration(); 
      Path path = fileSplit.getPath(); 
      FileSystem fs = path.getFileSystem(conf); 
      this.in = new SequenceFile.Reader(fs, path, conf); 
      try { 
       this.k = (Writable) in.getKeyClass().newInstance(); 
       this.value = (V) in.getValueClass().newInstance(); 
      } catch (InstantiationException e) { 
       throw new IOException(e); 
      } catch (IllegalAccessException e) { 
       throw new IOException(e); 
      } 
      this.end = fileSplit.getStart() + fileSplit.getLength(); 

      if (fileSplit.getStart() > in.getPosition()) { 
       in.sync(fileSplit.getStart()); 
      } 

      this.start = in.getPosition(); 
      more = start < end; 

      key = new PathOffsetWritable(path, start); 
     } 

     @Override 
     public boolean nextKeyValue() throws IOException, InterruptedException { 
      if (!more) { 
       return false; 
      } 
      long pos = in.getPosition(); 

      more = in.next(k, value); 
      if (!more || (pos >= end && in.syncSeen())) { 
       key = null; 
       value = null; 
       more = false; 
      } else { 
       key.setOffset(pos); 
      } 
      return more; 
     } 

     @Override 
     public PathOffsetWritable getCurrentKey() { 
      return key; 
     } 

     @Override 
     public V getCurrentValue() { 
      return value; 
     } 

     @Override 
     public float getProgress() throws IOException, InterruptedException { 
      if (end == start) { 
       return 0.0f; 
      } else { 
       return Math.min(1.0f, (in.getPosition() - start)/(float)(end - start)); 
      } 
     } 

     @Override 
     public void close() throws IOException { 
      in.close(); 
     } 

    } 

    @Override 
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { 
     return new SequenceFileOffsetRecordReader<V>(); 
    } 

    @Override 
    public List<InputSplit> getSplits(JobContext context) throws IOException { 
     return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context); 
    } 

    @Override 
    public long getFormatMinSplitSize() { 
     return SequenceFile.SYNC_INTERVAL; 
    } 


} 

PathOffsetWritable.java:

import java.io.DataInput; 
import java.io.DataOutput; 
import java.io.IOException; 

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> { 

    private Text t = new Text(); 
    private Path path; 
    private long offset; 

    public PathOffsetWritable(Path path, long offset) { 
     this.path = path; 
     this.offset = offset; 
    } 

    public Path getPath() { 
     return path; 
    } 

    public long getOffset() { 
     return offset; 
    } 

    public void setPath(Path path) { 
     this.path = path; 
    } 

    public void setOffset(long offset) { 
     this.offset = offset; 
    } 

    @Override 
    public void readFields(DataInput in) throws IOException { 
     t.readFields(in); 
     path = new Path(t.toString()); 
     offset = in.readLong(); 
    } 

    @Override 
    public void write(DataOutput out) throws IOException { 
     t.set(path.toString()); 
     t.write(out); 
     out.writeLong(offset); 
    } 

    @Override 
    public int compareTo(PathOffsetWritable o) { 
     int x = path.compareTo(o.path); 
     if (x != 0) { 
      return x; 
     } else { 
      return Long.valueOf(offset).compareTo(Long.valueOf(o.offset)); 
     } 
    } 


} 
Verwandte Themen