2017-07-29 2 views
0

Ich versuche, meinen Kopf um reaktive Programmierung zu bekommen, damit ich es möchte fragen, ob jeder Vorteil hier mit einem Flux ist:Mit einem Flux anstelle einer For-Schleife, irgendwelche Vorteile?

override fun notifyObserversOnMessage(message: Message) { 
     Flux.fromStream(observers.stream()) 
       .map { observer -> Mono.just(observer.reactOnMessage(message)) } 
       .subscribe()   
    } 

statt:

override fun notifyObserversOnMessage(message: Message) { 
     for (observer in observers) { 
      observer.reactOnMessage(message) 
     }  
    } 

es auf Hängt die Arbeit, die jeder Beobachter macht und ob das IO ist oder nicht?

Antwort

1

Kommt drauf an.

Wenn es sinnvoll ist observers sequentiell zu verarbeiten, gibt es keinen Vorteil durch die Verwendung hier Flux und

override fun notifyObserversOnMessage(message: Message) { 
    observers.forEach { observer -> 
     observer.reactOnMessage(message) 
    } 
} 

oder einfach

override fun notifyObserversOnMessage(message: Message) { 
    observers.forEach { it.reactOnMessage(message) } 
} 

ist in Ordnung.

Für naive Parallelität,

override fun notifyObserversOnMessage(message: Message) { 
    observers.parallelStream().forEach { it.reactOnMessage(message) } 
} 

verwendet werden kann, aber an diesem Punkt ist es eher wahrscheinlich, dass Sie zusätzliche Anforderungen wie Arbeiter-Pools oder Timeouts. In diesem Fall ist die Aussagekraft des Reaktors nützlich.

+0

Wenn jeder Beobachter die Nachricht in einen DB schreibt, wird es asynchron mit einem Flux gemacht oder die Schleife wird immer noch sequenziell laufen? – Orestis

+0

@Orestis Wie Sie es geschrieben haben, ist es sequenziell. Eine Möglichkeit, um Parallelität zu erhalten, ist etwas wie 'beobachter.zuFlux(). FlatMap {Beobachter -> Mono.fromCallable {observer.reactOnMessage (Nachricht)} .subscribeOn (Schedulers.parallel())} .subscribe()'. – ephemient

Verwandte Themen