2017-09-28 2 views
1

Ich bin auf der Suche nach einer effizienten Möglichkeit zur Vorverarbeitung von CSV-Daten vor (oder während des) Dumps in einen Java-Stream.CSV-Daten effizient vor oder während parallelem Streaming verarbeiten

Unter normalen Umständen würde ich so etwas wie dies tun, um die Datei zu verarbeiten:

File input = new File("helloworld.csv"); 
InputStream is = new FileInputStream(input); 
BufferedReader br = new BufferedReader(new InputStreamReader(is)); 
br.lines().parallel().forEach(line -> { 
    System.out.println(line); 
}); 

jedoch in diesem aktuellen Fall ich Vorprozess müssen die Aufzeichnungen vor oder während sie Streaming und jedes Element in meiner Sammlung könnte davon abhängen, der Vorherige. Hier ist ein einfaches Beispiel CSV-Datei das Problem zu demonstrieren:

species, breed, name 
dog, lab, molly 
, greyhound, stella 
, beagle, stanley 
cat, siamese, toby 
, persian, fluffy 

In meinem Beispiel CSV die Art Spalte wird nur gefüllt, wenn es von den Rekord ändert aufzuzeichnen. Ich weiß, die einfache Antwort wäre, meine CSV-Ausgabe zu reparieren, aber in diesem Fall ist das nicht möglich.

Ich bin auf der Suche nach einem vernünftigen und effizienten Weg, die Datensätze von CSV zu verarbeiten, den Spezieswert aus dem vorherigen Datensatz zu kopieren, wenn er leer ist, und ihn nach der Vorverarbeitung in einen parallelen Stream zu übertragen.

Die Downstream-Verarbeitung kann lange dauern, so dass ich nach der Vorverarbeitung letztendlich parallel verarbeiten muss. Meine CSV-Dateien können auch groß sein, daher möchte ich vermeiden, dass zuerst die gesamte Datei in ein Objekt im Speicher geladen wird.

Ich hatte gehofft, es eine Möglichkeit, so etwas wie die folgenden (Warn schlecht Pseudo-Code) zu tun war:

parallelStream.startProcessing 

while read line { 
    if (line.doesntHaveSpecies) { 
     line.setSpecies 
    } 
    parallelStream.add(line) 
} 

Meine aktuelle Lösung ist die gesamte Datei zu verarbeiten und zu „fixieren“ dann streamen. Da die Datei groß sein kann, wäre es schön, die Datensätze sofort nach der "Fixierung" und vor der Verarbeitung der gesamten Datei zu bearbeiten.

Antwort

2

Sie haben den Staat in eine Spliterator verkapseln.

private static Stream<String> getStream(BufferedReader br) { 
    return StreamSupport.stream(
     new Spliterators.AbstractSpliterator<String>(
              100, Spliterator.ORDERED|Spliterator.NONNULL) { 
      String prev; 
      public boolean tryAdvance(Consumer<? super String> action) { 
       try { 
        String next = br.readLine(); 
        if(next==null) return false; 
        final int ix = next.indexOf(','); 
        if(ix==0) { 
         if(prev==null) 
          throw new IllegalStateException("first line without value"); 
         next = prev+next; 
        } 
        else prev=ix<0? next: next.substring(0, ix); 
        action.accept(next); 
        return true; 
       } catch (IOException ex) { 
        throw new UncheckedIOException(ex); 
       } 
      } 
     }, false); 
} 

die als

verwendet werden können
try(Reader r = new FileReader(input); 
    BufferedReader br = new BufferedReader(r)) { 

    getStream(br).forEach(System.out::println); 
} 

Die Spliterator wird immer sequentiell durchlaufen werden. Wenn die parallele Verarbeitung aktiviert ist, versucht die Stream-Implementierung, neue Spliterator Instanzen für andere Threads zu erhalten, indem sie trySplit aufruft. Da wir für diese Operation keine effiziente Strategie anbieten können, erben wir den Standard von AbstractSpliterator, der einige Array-basierte Pufferung durchführen wird. Dies funktioniert immer korrekt, zahlt sich aber nur aus, wenn Sie in der nachfolgenden Stream-Pipeline umfangreiche Berechnungen durchführen. Andernfalls können Sie einfach mit sequenzieller Ausführung bleiben. So

+0

Ich dachte, ich würde diese Frage nehmen und das Wochenende mit einer benutzerdefinierten Antwort auf 'Spliterator', ich denke nicht mehr :) – Eugene

+0

Wollen Sie den gleichen Nutzen für eine zeitaufwändige Operation sehen, die eine REST nicht unbedingt schwer rechnerisch (wie Schlagen ist Endpunkt)? –

+0

Je nach E/A-Vorgang und Umgebung kann ein Vorteil angezeigt werden, aber die Stream-API verwendet eine Konfiguration, die auf die Berechnung zugeschnitten ist, dh die Ziel-Parallelität entspricht der Anzahl der CPU-Kerne, die für einen bestimmten Teil möglicherweise nicht die beste Wahl ist E/A-Betrieb. – Holger

1

man kann es nicht mit parallelem Strom gestartet werden, da es der Reihe nach werden muss, führen Sie die Arten aus dem vorherige Zeile zu erhalten. So konnten wir einige Nebeneffekt Mapper vorstellen:

final String[] species = new String[1]; 
final Function<String, String> speciesAppending = l -> { 
    if (l.startsWith(",")) { 
     return species[0] + l; 
    } else { 
     species[0] = l.substring(0, l.indexOf(',')); 
     return l; 
    } 
}; 

try (Stream<String> stream = Files.lines(new File("helloworld.csv").toPath())) { 
    stream.map(speciesAppending).parallel()... // TODO 
} 
+0

was ist der beste Weg zu verfolgen/speichern die vor Spezies auf nachfolgende Aufrufe der Mapper verwiesen werden? Ich denke, was ich frage ist, wo definierst du Spezies? –

+1

Es ist ein einfaches String-Array, aktualisiert die Antwort, um es in den Code zu setzen. –

Verwandte Themen