2016-04-10 5 views
2

In meiner Anwendung verwende ich mehrere Streams, die Elemente des Formulars (ID, Wert) bereitstellen. Ein Element wird durch die folgende Klasse definiert:So führen Sie einen äußeren Join bei zwei oder mehr Streams durch

static final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final T value; 

    Element(int id, T value) { 
     this.id = id; 
     this.value = value; 
    } 

    @Override 
    public int compareTo(Element o) { 
     return Long.compare(id, o.id); 
    } 
} 

Mein Ziel ist es Streams zwei oder mehr der durch das Element-IDs zu verbinden (in jedem Strom werden die IDs sortiert und streng monotone), zB:

Stream <Element> colour = Arrays.stream(new Element[]{new Element(1, "red"), new Element(2, "green"), new Element(4, "red"), new Element(6, "blue")}); 
    Stream <Element> length = Arrays.stream(new Element[]{new Element(2, 28), new Element(3, 9), new Element(4, 17), new Element(6, 11)}); 
    Stream <Element> mass = Arrays.stream(new Element[]{new Element(1, 87.9f), new Element(2, 21.0f), new Element(3, 107f)}); 

in einen einzigen Strom, der Elemente der Form (ID, [T1, T2, T3]) enthält:

Stream<Element<Object[]>> allProps = joinStreams(colour, length, mass); 

indem irgendein Verfahren wie folgt aus:

public Stream<Element<Object[]>> joinStreams(Stream<Element>... streams) { 
    return ...; 
} 

Der resultierende Strom sollte ein FULL OUTER JOIN, das heißt für das obige Beispiel liefern: für solche Aufgaben

1, "red", null, 87.9 
2, "green", 28, 21.0 
3, null, 9, 107 
4, "red" 17, null 
6, "blue", 11, null 

Seit meiner Erfahrung mit API Java-Streaming recht einfach ist bisher normalerweise Iteratoren ich verwende.

Gibt es eine idiomatische (und effiziente) Möglichkeit, diese Art von Join mit Streams durchzuführen? Gibt es irgendwelche Dienstprogrammbibliotheken, die ich verwenden könnte?

Seitliche Anmerkung: Das Beispiel ist vereinfacht. Die Anwendung empfängt die Daten von etwas wie einem spaltenorientierten Datenspeicher (kein echtes DMBS), der mehrere Gigabyte groß ist und nicht leicht in den Speicher passt. Es gibt auch keine integrierte Unterstützung für diese Art von Join-Operation.

+0

'myElementsStream.collect (Collectors.groupingBy (e -> e.id))'? – fge

+0

Ich habe hier drei Streams - wie definierst du myElementsStream? – Matthias

Antwort

1

Um eine vollständige äußere Join Stream-Implementierung zu erstellen, verwende ich zwei blockierende Warteschlangen. Jedem Datenstrom ist eine Warteschlange zugeordnet, und eine Filler-Klasse (eine ausführbare Implementierung) liest Daten aus einem Datenstrom und schreibt sie in die Warteschlange. Wenn der Füllklasse keine Daten mehr zur Verfügung stehen, wird eine Ende-von-Stream-Markierung in die Warteschlange geschrieben. Ich konstruiere dann einen Splitterator von AbstractSpliterator. Die tryAdvance-Methodenimplementierung verwendet einen Wert aus der linken und der rechten Warteschlange und verwendet diese Werte abhängig vom Vergleichsergebnis. Ich verwende eine Variation deiner Element-Klasse. Siehe den folgenden Code:

import java.util.ArrayList; 
import java.util.Collection; 

public final class Element<T> implements Comparable<Element<T>> { 
    final long id; 
    final Collection<T> value; 

    public Element(int id, T value) { 
     this.id = id; 
     // Order preserving 
     this.value = new ArrayList<T>(); 
     this.value.add(value); 
    } 

    Element(long id, Element<T> e1, Element<T> e2) { 
     this.id = id; 
     this.value = new ArrayList<T>(); 
     add(e1); 
     add(e2); 
    } 

    private void add(Element<T> e1) { 
     if(e1 == null) { 
      this.value.add(null);   
     } else { 
      this.value.addAll(e1.value); 
     } 
    } 

    /** 
    * Used as End-of-Stream marker 
    */ 
    Element() { 
     id = -1; 
     value = null; 
    } 

    @Override 
    public int compareTo(Element<T> o) { 
     return Long.compare(id, o.id); 
    } 
} 

Implementierung Registriert

import java.util.Comparator; 
import java.util.Spliterator; 
import java.util.Spliterators; 
import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.function.Consumer; 
import java.util.stream.Stream; 
import java.util.stream.StreamSupport; 

public class OuterJoinSpliterator<T> extends Spliterators.AbstractSpliterator<Element<T>> { 

    private final class Filler implements Runnable { 
     private final Stream<Element<T>> stream; 
     private final BlockingQueue<Element<T>> queue; 

     private Filler(Stream<Element<T>> stream, BlockingQueue<Element<T>> queue) { 
      this.stream = stream; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      stream.forEach(x -> { 
       try { 
        queue.put(x); 
       } catch (final InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }); 
      try { 
       queue.put(EOS); 
      } catch (final InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 

    public final Element<T> EOS = new Element<T>(); 
    private final int queueSize; 
    private final BlockingQueue<Element<T>> leftQueue; 
    private final BlockingQueue<Element<T>> rightQueue; 
    protected Element<T> leftValue; 
    protected Element<T> rightValue; 

    private OuterJoinSpliterator(long estSize, int characteristics, int queueSize, 
      Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     super(estSize, characteristics); 
     this.queueSize = queueSize; 
     leftQueue = createQueue(); 
     rightQueue = createQueue(); 
     createFillerThread(leftStream, leftQueue).start(); 
     createFillerThread(rightStream, rightQueue).start(); 
    } 

    private Element<T> acceptBoth(long id, Element<T> left, Element<T> right) { 
     return new Element<T>(id, left, right); 
    } 

    private final Element<T> acceptLeft(Element<T> left) { 
     return acceptBoth(left.id, left, null); 
    } 

    private final Element<T> acceptRight(Element<T> right) { 
     return acceptBoth(right.id, null, right); 
    } 

    private final Thread createFillerThread(Stream<Element<T>> leftStream, BlockingQueue<Element<T>> queue) { 
     return new Thread(new Filler(leftStream, queue)); 
    } 

    private final ArrayBlockingQueue<Element<T>> createQueue() { 
     return new ArrayBlockingQueue<>(queueSize); 
    } 

    @Override 
    public Comparator<? super Element<T>> getComparator() { 
     return null; 
    } 

    private final boolean isFinished() { 
     return leftValue == EOS && rightValue == EOS; 
    } 

    @Override 
    public final boolean tryAdvance(Consumer<? super Element<T>> action) { 
     try { 
      updateLeft(); 

      updateRight(); 

      if (isFinished()) { 
       return false; 
      } 

      if (leftValue == EOS) { 
       action.accept(acceptRight(rightValue)); 
       rightValue = null; 
      } else if (rightValue == EOS) { 
       action.accept(acceptLeft(leftValue)); 
       leftValue = null; 
      } else { 
       switch (leftValue.compareTo(rightValue)) { 
       case -1: 
        action.accept(acceptLeft(leftValue)); 
        leftValue = null; 
        break; 
       case 1: 
        action.accept(acceptRight(rightValue)); 
        rightValue = null; 
        break; 
       default: 
        action.accept(acceptBoth(leftValue.id, leftValue, rightValue)); 
        leftValue = null; 
        rightValue = null; 
       } 
      } 
     } catch (final InterruptedException e) { 
      return false; 
     } 
     return true; 
    } 

    private final void updateLeft() throws InterruptedException { 
     if (leftValue == null) { 
      leftValue = leftQueue.take(); 
     } 
    } 

    private final void updateRight() throws InterruptedException { 
     if (rightValue == null) { 
      rightValue = rightQueue.take(); 
     } 
    } 

    public static <T> Stream<Element<T>> join(long estSize, int characteristics, int queueSize, boolean parallel, Stream<Element<T>> leftStream, Stream<Element<T>> rightStream) { 
     Spliterator<Element<T>> spliterator = new OuterJoinSpliterator<>(estSize, characteristics, queueSize, leftStream, rightStream); 
     return StreamSupport.stream(spliterator, parallel); 
    } 
} 

Sie Long.MAX_VALUE als geschätzte Größe verwenden können. Eine Beschreibung der verschiedenen Stream-Eigenschaften finden Sie auf der Spliterator-Oberfläche. Siehe Kommentare für AbstractSpliterator für weitere Informationen.

-1

Die einfachste Lösung besteht darin, einen Iterator zu schreiben und dann StreamSupport :: stream zu verwenden, um einen Stream vom Iterator zu erstellen. Aber Sie können einige Probleme mit der Leistung finden, wenn Sie den parallelen Strom verwenden werden.

Verwandte Themen