2017-06-28 2 views
2

Ich möchte mehrere (in der Regel 2-10) Kafka-Themen nach außen verbinden, idealerweise mithilfe der Streaming-API. Alle Themen haben denselben Schlüssel und dieselben Partitionen. Eine Möglichkeit, dies zu tun beizutreten, ist ein KStream für jedes Thema und Ketten Anrufe KStream.outerJoin zu erstellen:Gibt es eine effiziente Möglichkeit, mehrere (mehr als zwei) kafka-Themen zu verbinden?

stream1 
    .outerJoin(stream2, ...) 
    .outerJoin(stream3, ...) 
    .outerJoin(stream4, ...) 

jedoch die documentation von KStream.outerJoin schlägt vor, dass jeder Aufruf outerJoin seine beiden Eingänge materialisieren Ströme so das obige Beispiel würde materialisieren nicht nur Ströme 1 bis 4, sondern auch stream1.outerJoin(stream2, ...) und stream1.outerJoin(stream2, ...).outerJoin(stream3, ...). Es würde viel unnötige Serialisierung, Deserialisierung und I/O im Vergleich zu dem direkten Verbinden der 4 Ströme geben. Ein weiteres Problem mit dem obigen Ansatz ist, dass die JoinWindow nicht über alle 4 Eingangsströme konsistent wäre: ein JoinWindow würde verwendet werden, um die Ströme 1 und 2 zu verbinden, aber dann würde ein separates Verbindungsfenster verwendet werden, um diesen Strom zu verbinden Stream 3 usw. Ich gebe zum Beispiel ein Join-Fenster von 10 Sekunden für jeden Join an und Einträge mit einem bestimmten Schlüssel erscheinen in Stream 1 bei 0 Sekunden, Stream 2 bei 6 Sekunden, Stream 3 bei 12 Sekunden und Stream 4 bei 18 Sekunden, würde das verbundene Objekt nach 18 Sekunden ausgegeben werden, was zu einer zu hohen Verzögerung führen würde. Die Ergebnisse hängen von der Reihenfolge der Joins ab, was unnatürlich erscheint.

Gibt es einen besseren Ansatz für Mehrwegeverbindungen mit Kafka?

Antwort

0

Letztendlich entschied ich mich, eine benutzerdefinierte leichte Joiner zu erstellen, die Materialisierung vermeidet und strikt die Ablaufzeit einhält. Es sollte im Durchschnitt O (1) sein. Es passt besser zur Consumer-API als zur Stream-API: Für jeden Verbraucher wird der Joiner wiederholt mit den empfangenen Daten abgefragt und aktualisiert. Wenn der Tischler einen vollständigen Attributsatz zurückgibt, leiten Sie ihn weiter. Hier ist der Code:

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.LinkedHashMap; 
import java.util.Map; 
import java.util.Optional; 

/** 
* Inner joins multiple streams of data by key into one stream. It is assumed 
* that a key will appear in a stream exactly once. The values associated with 
* each key are collected and if all values are received within a certain 
* maximum wait time, the joiner returns all values corresponding to that key. 
* If not all values are received in time, the joiner never returns any values 
* corresponding to that key. 
* <p> 
* This class is not thread safe: all calls to 
* {@link #update(Object, Object, long)} must be synchronized. 
* @param <K> The type of key. 
* @param <V> The type of value. 
*/ 
class StreamInnerJoiner<K, V> { 

    private final Map<K, Vals<V>> idToVals = new LinkedHashMap<>(); 
    private final int joinCount; 
    private final long maxWait; 

    /** 
    * Creates a stream inner joiner. 
    * @param joinCount The number of streams being joined. 
    * @param maxWait The maximum amount of time after an item has been seen in 
    * one stream to wait for it to be seen in the remaining streams. 
    */ 
    StreamInnerJoiner(final int joinCount, final long maxWait) { 
     this.joinCount = joinCount; 
     this.maxWait = maxWait; 
    } 

    private static class Vals<A> { 
     final long firstSeen; 
     final Collection<A> vals = new ArrayList<>(); 
     private Vals(final long firstSeen) { 
      this.firstSeen = firstSeen; 
     } 
    } 

    /** 
    * Updates this joiner with a value corresponding to a key. 
    * @param key The key. 
    * @param val The value. 
    * @param now The current time. 
    * @return If all values for the specified key have been received, the 
    * complete collection of values for thaht key; otherwise 
    * {@link Optional#empty()}. 
    */ 
    Optional<Collection<V>> update(final K key, final V val, final long now) { 
     expireOld(now - maxWait); 
     final Vals<V> curVals = getOrCreate(key, now); 
     curVals.vals.add(val); 
     return expireAndGetIffFull(key, curVals); 
    } 

    private Vals<V> getOrCreate(final K key, final long now) { 
     final Vals<V> existingVals = idToVals.get(key); 
     if (existingVals != null) 
      return existingVals; 
     else { 
      /* 
      Note: we assume that the item with the specified ID has not already 
      been seen and timed out, and therefore that its first seen time is 
      now. If the item has in fact already timed out, it is doomed and 
      will time out again with no ill effect. 
      */ 
      final Vals<V> curVals = new Vals<>(now); 
      idToVals.put(key, curVals); 
      return curVals; 
     } 
    } 

    private void expireOld(final long expireBefore) { 
     final Iterator<Vals<V>> i = idToVals.values().iterator(); 
     while (i.hasNext() && i.next().firstSeen < expireBefore) 
      i.remove(); 
    } 

    private Optional<Collection<V>> expireAndGetIffFull(final K key, final Vals<V> vals) { 
     if (vals.vals.size() == joinCount) { 
      // as all expired entries were already removed, this entry is valid 
      idToVals.remove(key); 
      return Optional.of(vals.vals); 
     } else 
      return Optional.empty(); 
    } 
} 
Verwandte Themen