2014-09-03 8 views
5

Ich habe einige große Textdateien, die ich verarbeiten möchte, indem ich ihre Zeilen gruppiere.Ist es möglich, eine faule groupby, einen Stream, in Java 8 zu tun?

Ich versuchte, die neuen Streaming-Funktionen zu benutzen, wie

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

Das Problem ist, dass, AFAIK, das eine Karte erzeugt.

Gibt es eine Möglichkeit, einen High-Level-Code wie den obigen zu haben, der zum Beispiel einen Stream von Einträgen generiert?

UPDATE: Was ich suche ist etwas wie Pythons itertools.groupby. Meine Dateien sind bereits sortiert (nach Paar [0]), ich möchte nur die Gruppen nacheinander laden.

Ich habe bereits eine iterative Lösung. Ich frage mich nur, ob es eine deklarative Art gibt, das zu tun. BTW, Guava oder eine andere 3rd-Party-Bibliothek verwenden wäre kein großes Problem.

+2

Wie kann du machst ein faules gruppiere nach? Um nach einer Eigenschaft des im Stream enthaltenen Objekts zu gruppieren, müssen Sie über alle Elemente im Stream iterieren. – Eran

+0

Was meinen Sie mit "Gruppierung seiner Linien?" meinst du binning wie die stream 'groupBy' Methode oder meinst du das Lesen mehrerer Zeilen gleichzeitig? – dkatzel

+0

Danke für die Kommentare, ein Update zur Frage hinzugefügt. –

Antwort

3

Die Aufgabe, die Sie erreichen möchten, unterscheidet sich erheblich von der Gruppierung. groupingBy verlässt sich nicht auf die Reihenfolge der Stream Elemente, sondern auf den Map 's-Algorithmus auf den Klassifikator Function Ergebnis angewendet.

Sie möchten benachbarte Elemente mit einem gemeinsamen Eigenschaftswert in einen List Artikel falten. Es ist nicht einmal notwendig, die Stream nach dieser Eigenschaft sortiert zu haben, solange Sie garantieren können, dass alle Elemente mit demselben Eigenschaftswert gruppiert sind.

Vielleicht ist es möglich, diese Aufgabe als eine Reduktion zu formulieren, aber für mich sieht die resultierende Struktur zu kompliziert aus.

Also, es sei denn, direkte Unterstützung für diese Funktion zu den Stream s hinzugefügt wird, ein Iterator basierten Ansatz pragmatischsten mir aussieht:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

Der faule Art des Stream gefalteten resultierenden besten nachgewiesen werden kann, durch die Anwendung es zu einem unendlichen Strom:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

leichte Korrektur: 'groupingBy' behält tatsächlich die Reihenfolge des ursprünglichen Streams bei, wenn der Downstream-Kollektor kooperiert (die meisten, außer denen mit der UNORDERED-Eigenschaft); Die Teilmenge von Elementen in einem gegebenen Bucket wird dem nachgeschalteten Kollektor in derselben Reihenfolge präsentiert, in der sie in der Eingabe vorhanden waren. –

+1

@Brian Goetz: Ja, es * behält * die Reihenfolge bei, aber alles, was ich in meiner Antwort gesagt habe, ist, dass es sich nicht * auf die Reihenfolge für die Bildung der Gruppen verlässt. Übrigens. Das war einer der Testfälle, die ich für meine Lösung gemacht habe: das Sammeln eines Streams, der von meiner Lösung in eine "Map" zurückgegeben wurde, muss genau die gleiche "Map" wie "groupingBy" mit dem gleichen Klassifikator erzeugen. – Holger

1

cyclops-react, I-Bibliothek eines beizutragen, bietet sowohl sharding und Gruppierungs funcitonality die das tun könnte, was Sie wollen.

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

Der groupedStatefullyWhile Operator ermöglicht Elemente basierend auf dem aktuellen Stand der Batch zusammengefasst werden. ReactiveSeq ist ein sequenzieller Stream mit einem einzigen Thread.

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

Dies wird eine LazyFutureStream erstellen (die java.util.stream.Stream implementiert), die die Daten in der Datei asynchron und parallel verarbeitet. Es ist faul und beginnt nicht mit der Verarbeitung, bis die Daten durchgezogen sind.

Der einzige Vorbehalt ist, dass Sie die Shards vorher definieren müssen. I.e. der Parameter 'shards', über dem eine Karte von async.Queue's ist, die durch den Schlüssel mit dem Shard verschlüsselt ist (möglicherweise welches Paar [0] ist?).

z.B.

Map<Integer,Queue<String>> shards; 

There is a sharding example with video here und test code here

0

Es kann durch collapse mit StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

Wir können peek und limit zu überprüfen, fügen Sie tun, wenn es faul Berechnung ist:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println); 
Verwandte Themen