2016-06-30 11 views
1

Um Dateien in Funken laden ich diese integrierten Methoden bin mit:Holen Sie sich die Details einer Datei mit Funken geladen

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH); 

oder

JavaPairRDD<String, String> miao = jsc.wholeTextFiles(SOURCE_PATH); 

Ich habe ein Byte oder eine String-Darstellung der Dateien, die ich aus dem Ordner abrufe, der im Wert des PairRDD gespeichert ist. Der Schlüssel enthält den Dateinamen.
Wie kann ich die Details dieser Dateien bekommen? Wie

File miao = new File(path); 
//this kind of details 
String date = miao.getLastModified(); 

Soll ich zurückzuverwandeln sie zurück auf Datei und dann lesen und dann von ihnen eine andere bytearray machen? Gibt es einen schnelleren Prozess?

Antwort

1

Sie können ein benutzerdefiniertes Eingabeformat schreiben und diese inputFormatClass an die Methode newApiHadoopFile in SparkContext übergeben. Dieses inputFormat verwendet einen benutzerdefinierten RecordReader, der benutzerdefinierte recordReader liest den fileContent zusammen mit anderen dateibezogenen Informationen (z. B. author, modifiedDate usw.). Sie müssen eine benutzerdefinierte Writable-Klasse schreiben, die die Dateiinformationen und den read-by-record-Reader fileContent enthält.

Der vollständige Funktionscode ist unten angegeben. Dieser Code verwendet eine benutzerdefinierte Eingabeformatklasse mit dem Namen RichFileInputFormat. Das RichFileInputFormat ist ein wholeFileInputFormat, was bedeutet, dass es nur eine Aufteilung pro Eingabedatei gibt. Das bedeutet außerdem, dass die Anzahl der rdd-Partitionen der Anzahl der Eingabedateien entspricht. Wenn Ihr Eingabepfad also 10 Dateien enthält, enthält die resultierende rdd 10 Partitionen, unabhängig von der Größe der Eingabedatei (en).

Dies ist, wie Sie das benutzerdefinierte inputformat aus SparkContext die Datei laden anrufen: -

JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(args[1], RichFileInputFormat.class, Text.class,FileInfoWritable.class, new Configuration()); 

So Ihre rdd Taste wird der filePath und Wert sein wird, eine FileInfoWritable, die sowohl den Dateiinhalt und andere Datei enthält verwandte Informationen.

Voll funktions Code ist unten eingefügt: -

  1. Benutzerdefinierte Eingabeformat Klasse

     package nk.stackoverflow.spark; 
    
         import java.io.IOException; 
    
         import org.apache.hadoop.fs.Path; 
         import org.apache.hadoop.io.Text; 
         import org.apache.hadoop.mapreduce.InputSplit; 
         import org.apache.hadoop.mapreduce.JobContext; 
         import org.apache.hadoop.mapreduce.RecordReader; 
         import org.apache.hadoop.mapreduce.TaskAttemptContext; 
         import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
    
         public class RichFileInputFormat extends FileInputFormat<Text, FileInfoWritable> { 
    
         @Override 
         public RecordReader<Text, FileInfoWritable> createRecordReader(InputSplit split, TaskAttemptContext context) 
           throws IOException, InterruptedException { 
    
          return new RichFileRecordReader(); 
         } 
    
         protected boolean isSplitable(JobContext context, Path filename) { 
          return false; 
         } 
         } 
    
    1. Rekord Leser

    Paket nk.stackoverflow.spark;

    import java.io.IOException; 
    
    import org.apache.hadoop.fs.FSDataInputStream; import 
    org.apache.hadoop.fs.FileStatus; import 
    org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.Text; import 
    org.apache.hadoop.mapreduce.InputSplit; import 
    org.apache.hadoop.mapreduce.RecordReader; import 
    org.apache.hadoop.mapreduce.TaskAttemptContext; import 
    org.apache.hadoop.mapreduce.lib.input.FileSplit; import 
    org.apache.spark.deploy.SparkHadoopUtil; 
    
    public class RichFileRecordReader extends RecordReader<Text, 
    FileInfoWritable> {  private String author; private String 
    createdDate; private String owner; private String lastModified; 
        private String content;  private boolean processed; 
    
        private Text key; private Path path; private FileSystem fs; 
    
        public RichFileRecordReader() { 
    
        } 
    
        @Override public void initialize(InputSplit split, 
    TaskAttemptContext context) throws IOException, InterruptedException 
    {  // this.recordReader.initialize(split, context);  final 
    FileSplit fileSplit = (FileSplit) split;  final Path path = 
    fileSplit.getPath();  this.fs = 
    path.getFileSystem(SparkHadoopUtil.get().getConfigurationFromJobContext(context)); 
         final FileStatus stat = this.fs.getFileStatus(path);  this.path = 
    path;  this.author = stat.getOwner();  this.createdDate = 
    String.valueOf(stat.getModificationTime());   this.lastModified = 
    String.valueOf(stat.getAccessTime());  this.key = new 
    Text(path.toString()); } 
    
        @Override public boolean nextKeyValue() throws IOException, 
    InterruptedException {  // TODO Auto-generated method stub 
         FSDataInputStream stream = null;  try {   if (!processed) { 
           int len = (int) this.fs.getFileStatus(this.path).getLen(); 
           final byte[] data = new byte[len]; 
    
           stream = this.fs.open(this.path); 
           int read = stream.read(data); 
           String content = new String(data, 0, read); 
           this.content = content; 
           processed = true; 
           return true;   }  } catch (IOException e) {   e.printStackTrace();   if (stream != null) { 
           try { 
            stream.close(); 
           } catch (IOException ie) { 
            ie.printStackTrace(); 
           }   }  }  return false; } 
    
        @Override public Text getCurrentKey() throws IOException, 
    InterruptedException {  // TODO Auto-generated method stub  return 
    this.key; } 
    
        @Override public FileInfoWritable getCurrentValue() throws 
    IOException, InterruptedException {   // TODO Auto-generated method 
    stub 
    
         final FileInfoWritable fileInfo = new FileInfoWritable(); 
         fileInfo.setContent(this.content); 
         fileInfo.setAuthor(this.author); 
         fileInfo.setCreatedDate(this.createdDate); 
         fileInfo.setOwner(this.owner); 
         fileInfo.setPath(this.path.toString());   return fileInfo; } 
    
        @Override public float getProgress() throws IOException, 
    InterruptedException {  // TODO Auto-generated method stub  return 
    processed ? 1.0f : 0.0f; } 
    
        @Override public void close() throws IOException {  // TODO 
    Auto-generated method stub 
    
        } 
    
    } 
    
    1. beschreibbaren Klasse

    Paket nk.stackoverflow.Funke;

    import java.io.DataInput; 
        import java.io.DataOutput; 
        import java.io.IOException; 
        import java.nio.charset.Charset; 
    
        import org.apache.hadoop.io.Writable; 
    
        import com.google.common.base.Charsets; 
    
        public class FileInfoWritable implements Writable { 
         private final static Charset CHARSET = Charsets.UTF_8; 
         private String createdDate; 
         private String owner; 
        // private String lastModified; 
         private String content; 
         private String path; 
         public FileInfoWritable() { 
    
         } 
    
         public void readFields(DataInput in) throws IOException { 
          this.createdDate = readString(in); 
          this.owner = readString(in); 
        //  this.lastModified = readString(in); 
          this.content = readString(in); 
          this.path = readString(in); 
         } 
    
         public void write(DataOutput out) throws IOException { 
          writeString(createdDate, out); 
          writeString(owner, out); 
        //  writeString(lastModified, out); 
          writeString(content, out); 
          writeString(path, out); 
         } 
    
         private String readString(DataInput in) throws IOException { 
          final int n = in.readInt(); 
          final byte[] content = new byte[n]; 
          in.readFully(content); 
          return new String(content, CHARSET); 
         } 
    
         private void writeString(String str, DataOutput out) throws IOException { 
          out.writeInt(str.length()); 
          out.write(str.getBytes(CHARSET)); 
         } 
    
         public String getCreatedDate() { 
          return createdDate; 
         } 
    
         public void setCreatedDate(String createdDate) { 
          this.createdDate = createdDate; 
         } 
    
         public String getAuthor() { 
          return owner; 
         } 
    
         public void setAuthor(String author) { 
          this.owner = author; 
         } 
    
         /*public String getLastModified() { 
          return lastModified; 
         }*/ 
    
         /*public void setLastModified(String lastModified) { 
          this.lastModified = lastModified; 
         }*/ 
    
         public String getOwner() { 
          return owner; 
         } 
    
         public void setOwner(String owner) { 
          this.owner = owner; 
         } 
    
         public String getContent() { 
          return content; 
         } 
    
         public void setContent(String content) { 
          this.content = content; 
         } 
    
         public String getPath() { 
          return path; 
         } 
    
         public void setPath(String path) { 
          this.path = path; 
         } 
    
    
        } 
    
    1. Hauptklasse zeigt, wie

    Paket nk.stackoverflow.spark zu verwenden;

    import org.apache.hadoop.conf.Configuration; import 
    org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import 
    org.apache.spark.api.java.JavaPairRDD; import 
    org.apache.spark.api.java.JavaSparkContext; import 
    org.apache.spark.api.java.function.VoidFunction; 
    
    import scala.Tuple2; 
    
    public class CustomInputFormat { public static void main(String[] 
    args) {   
         SparkConf conf = new SparkConf(); 
    
         conf.setAppName(args[0]); 
         conf.setMaster("local[*]");   
         final String inputPath = args[1]; 
    JavaSparkContext sc = new 
    JavaSparkContext(conf);   
    JavaPairRDD<Text, FileInfoWritable> rdd = sc.newAPIHadoopFile(inputPath, RichFileInputFormat.class, 
    Text.class, 
           FileInfoWritable.class, new Configuration()); 
    
         rdd.foreach(new VoidFunction<Tuple2<Text, FileInfoWritable>>() { 
    
          public void call(Tuple2<Text, FileInfoWritable> t) throws 
    Exception { 
           final Text filePath = t._1(); 
           final String fileContent = t._2().getContent(); 
           System.out.println("file " + filePath + " has contents= " + fileContent);   }  }); 
    
         sc.close();  } } 
    
+0

Was für ein Job! Funktioniert die newAPIHadoopFile-Methode auch für lokale Verzeichnisse? – Vale

+0

Ja, das wird auch mit dem lokalen Dateisystem funktionieren. Geben Sie den absoluten Pfad von inputDirectory in lokalem Dateisystem ein, während Sie die newApiHadoopFile-Methode auf SparkContext aufrufen. So sieht der absolute Pfad des lokalen Dateisystems aus: file: /// users/ram/inputDir – mrnakumar

0

Analysieren Sie diese RDD mithilfe der Kartenumwandlung. Innerhalb Ihrer Map-Funktion rufen Sie eine Funktion auf, die eine Zeichenfolge (d. H. Den Dateinamen) akzeptiert und diese Zeichenfolge verwendet, um die Datei zu öffnen und zu bearbeiten. Es ist also nichts anderes als eine Map-RDD-Transformation, die für jede Zeile dieser RDD eine Funktion aufruft.

+0

Dies ist, was ich für die Antwort tatsächlich, aber dank vermeiden wollte. Und der Grund dafür ist, dass dies zum Fahrer zurückgehen würde, nicht wahr? Wofür würde ich Spark verwenden, wenn jedes Element einfach zu Java zurückgehen sollte. Ich denke, ich sollte ein PairRDD von mir machen ...? – Vale

+0

Ja. Wenn Sie in einem Cluster arbeiten, können Sie diese RDD in viele Executoren partitionieren. Dies würde Ihre Programmresultate befestigen. Wenn du lokal arbeitest, habe ich Angst nein. Da Sie diese Dateien lesen müssen, was "Aktion" bedeutet, würden Sie die Dateien im Speicher des Treibers unweigerlich beibehalten. –

+0

Also, wenn ich in einer Cluster-Umgebung bin, würde diese map-> (Datei laden (Pfad)) sowieso auf den Arbeitern laufen? Würde es dem Fahrer nicht trotzdem zurückgehen? Ich sollte tatsächlich Dateien von hdfs laden und sie entsprechend ihren Details ausarbeiten. Es scheint immer noch zu aufwendig: Ich lade die Dateien, um sie ein zweites Mal zu laden .., – Vale

Verwandte Themen