2017-07-11 2 views
3

Ich habe Probleme bekommen immer den letzten Wert von Kombination erhalten mit einem CombineLatest Operator.Disable Flowable/Observable Buffer

Ich habe 2 heißen fließfähige (a, b) Erzeugen Ereignisse mit hoher Frequenz (ein Ereignis alle 100 ms pro Stück):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value); 
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value); 

mit einem combineLatest kombiniert, die fast 300 ms dauert Job es ist zu tun.

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,  OrderBookCouple::new).observeOn(Schedulers.newThread()); 
combined.subscribe((bookCouple) -> { 
       System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp()); 
       System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp()); 
       Thread.sleep(300); 
      } 

Nach einer Ausführung des Kombinierer, würde ich die allerletzte Kombination Prozess wie von Ereignis generiert, das heißt (Lasta, lastb).

Das Standardverhalten des kombinierten Flusses besteht darin, alle Kombinationen von Ereignissen in einem eigenen Puffer zwischenzuspeichern, so dass der kombinierte Fluss Kombinationen erhält, die sehr alt sind und diese Zeitlücke explodiert.

Wie sollte ich meinen Code ändern, um diesen Puffer zu deaktivieren und immer die allerletzte Kombination zu erhalten?

Antwort

4

Sie können onBackpressureLatest beide gelten auf flowA und flowB und die overload of combineLatest benutzen, die wir an, Sie die Prefetch-Menge angeben.

Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()), 
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]), 
    1 
) 
.onBackpressureLatest() 
.observeOn(Schedulers.newThread(), false, 1) 

Leider gibt es keine Überlastung, die sowohl einen bifunktionellen und buffer nimmt damit Sie Casting Array-Elemente zurückgreifen müssen.

bearbeiten

eine zweite onBackpressureLatest Anwendung und die Puffergröße auf observeOn Begrenzung sollten Sie näher an das gewünschte Muster erhalten, obwohl combineLatest nicht auf diesen Anwendungsfall richtet. Wahrscheinlich wollten Sie einen Multi-Sample-Operator.

+0

Ich habe Ihre Lösung mit einem vereinfachten Beispiel hier versucht und es scheint, dass es immer noch die Pufferung gibt. Der Code ist hier verfügbar: [link] (https://gist.github.com/Ambros94/06d50869eec77a5758c30c4ed66ab101) –

+2

Meine Antwort aktualisiert. – akarnokd

+0

Dies funktioniert, ich habe den Grund entsprechend aktualisiert. Vielen Dank. –