2017-10-23 1 views
2

Ich spiele gerade mit der Java-API Apache Arrow (obwohl ich es von Scala für die Codebeispiele verwende), um etwas Vertrautheit mit diesem Werkzeug zu bekommen .Wie man eine CSV-Datei in Apache Arrow-Vektoren lädt und eine Pfeildatei auf Platte speichert

Als Übung habe ich gewählt, eine CSV-Datei in Pfeilvektoren zu laden und diese dann in einer Pfeildatei zu speichern. Der erste Teil schien leicht genug, und ich versuchte, es wie folgt aus:

val csvLines: Stream[Array[String]] = <open stream from CSV parser> 

// There are other types of allocator, but things work with this one... 
val allocator = new RootAllocator(Int.MaxValue) 

// Initialize the vectors 
val vectors = initVectors(csvLines.head, allocator) 
// Put their mutators into an array for easy access 
val mutators = vectors.map(_.getMutator) 

// Work on the data, zipping it with its index 
Stream.from(0) 
    .zip(csvLines.tail) // Work on the tail (head contains the headers) 
    .foreach(rowTup => // rowTup = (index, csvRow as an Array[String]) 
    Range(0, rowTup._2.size) // Iterate on each column... 
     .foreach(columnNumber => 
     writeToMutator(
      mutators(columnNumber), // get that column's mutator 
      idx=rowTup._1,   // pass the current row number 
      data=rowTup._2(columnNumber) // pass the entry of the curernt column 
     ) 
    ) 
) 

Mit initVectors() und writeToMutator() wie folgt definiert:

def initVectors(
    columns: Array[String], 
    alloc: RootAllocator): Array[NullableVarCharVector] = { 

    // Initialize a vector for each column 
    val vectors = columns.map(colName => 
    new NullableVarCharVector(colName, alloc)) 
    // 4096 size, for 1024 values initially. This is arbitrary 
    vectors.foreach(_.allocateNew(2^12,1024)) 
    vectors 
} 

def writeToMutator(
    mutator: NullableVarCharVector#Mutator, 
    idx: Int, 
    data: String): Unit = { 

    // The CSV may contain null values 
    if (data != null) { 
    val bytes = data.getBytes() 
    mutator.setSafe(idx, bytes, 0, bytes.length) 
    } 
    mutator.setNull(idx) 
} 

(ich zur Zeit nicht über die Verwendung des richtigen Typs ist es egal, und speichert alles als Strings oder VarChar in Pfeil der Seeschwalben)

Deshalb an dieser Stelle ich eine Sammlung von NullableVarCharVector habe und von/zu ihnen lesen und schreiben kann. Alles super an diesem Punkt. Nun, für den nächsten Schritt habe ich mich jedoch gefragt, wie ich sie zusammenpacken und in eine Pfeildatei serialisieren könnte. Ich stolperte über eine abstrakte Klasse AbstractFieldWriter, aber wie man die Implementierungen verwendet, ist unklar.

Also, die Frage in erster Linie ist:

  • was das ist - Art und Weise eine Reihe von Vektoren zu einer Pfeil-Datei zu speichern (am besten scheint es mehrere, zu sein?).
  • gibt es andere Möglichkeiten zum Laden von CSV-Spalten in Pfeilvektoren?

Edited hinzufügen: Die metadata description page bietet einen guten Überblick zu diesem Thema.

Die Testklassen der API scheinen ein paar Dinge zu enthalten, die helfen könnten, ich werde eine Antwort mit einem Beispiel veröffentlichen, sobald ich es ausprobiert habe.

+0

Halb verwandten, aber dennoch interessant zu lesen: https://mapr.com/ Blog/Apache-Pfeil-und-Werte-Vektoren / – Shastick

Antwort

4

Mit Blick auf TestArrowFile.java und BaseFileTest.java ich gefunden:

  • Wie man eine einzelne Pfeil-Datei auf der Festplatte
  • Eine alternative Art und Weise schreiben Vektoren Füllung, wie mein erster Versuch, mich daran gehindert, eine einzigen Pfeil Datei Zusammenbauen (oder zumindest auf einfache Weise).

So Vektoren füllen sieht nun wie:

// Open stream of rows 
val csvLines: Stream[Array[String]] = <open stream from CSV parser> 
// Define a parent to hold the vectors 
val parent = MapVector.empty("parent", allocator) 
// Create a new writer. VarCharWriterImpl would probably do as well? 
val writer = new ComplexWriterImpl("root", parent) 

// Initialise a writer for each column, using the header as the name 
val rootWriter = writer.rootAsMap() 
val writers = csvLines.head.map(colName => 
            rootWriter.varChar(colName)) 

Stream.from(0) 
    .zip(csvLines.tail) // Zip the rows with their index 
    .foreach(rowTup => { // Iterate on each (index, row) tuple 
    val (idx, row) = rowTup 
     Range(0, row.size) // Iterate on each field of the row 
     .foreach(column => 
      Option(row(column)) // row(column) may be null, 
      .foreach(str => // use the option as a null check 
       write(writers(column), idx, allocator, str) 
      ) 
    ) 
    } 
) 

toFile(parent.getChild("root"), "csv.arrow") // Save everything to a file 

mit write wie folgt definiert:

def write(writer: VarCharWriter, idx: Int, 
    allocator: BufferAllocator, data: String): Unit = { 
    // Set the position to the correct index 
    writer.setPosition(idx) 
    val bytes = data.getBytes() 
    // Apparently the allocator is required again to build a new buffer 
    val varchar = allocator.buffer(bytes.length) 
    varchar.setBytes(0, data.getBytes()) 
    writer.writeVarChar(0, bytes.length, varchar) 
} 

def toFile(parent: FieldVector, fName: String): Unit = { 
    // Extract a schema from the parent: that's the part I struggled with in the original question 
    val rootSchema = new VectorSchemaRoot(parent) 
    val stream = new FileOutputStream(fName) 
    val fileWriter = new ArrowFileWriter(
         rootSchema, 
         null, // We don't use dictionary encoding. 
         stream.getChannel) 
    // Write everything to file... 
    fileWriter.start() 
    fileWriter.writeBatch() 
    fileWriter.end() 
    stream.close() 
} 

Mit dem oben Ich bin in der Lage eine CSV-Datei zu speichern. Ich habe überprüft, dass alles gut gelaufen ist, indem ich es gelesen und wieder in eine CSV konvertiert habe, und der Inhalt ist unverändert.

Beachten Sie, dass die ComplexWriterImpl ermöglicht, Spalten verschiedener Typen zu schreiben, etwas, das nützlich sein wird, um zu vermeiden, Zahlenspalten als Zeichenfolgen zu speichern.

(Ich bin jetzt mit der Leseseite der Dinge zu spielen, werden diese Dinge verdienen wahrscheinlich ihre eigenen SO Fragen.)

Verwandte Themen