2016-04-16 23 views
1

Ich habe in den letzten Tagen über Akka Streams gelesen und arbeite seit ein paar Monaten mit Rx Bibliotheken in Scala. Für mich scheint es einige Überschneidungen zu geben, was diese beiden Bibliotheken zu bieten haben. RxScala war ein bisschen einfacher zu beginnen, zu verstehen und zu verwenden. Zum Beispiel, hier ist ein einfacher Anwendungsfall, in dem ich Scala's Rx-Bibliothek verwende, um eine Verbindung zum Kafka-Thema herzustellen, um es in ein Observable zu verpacken, damit ich Abonnenten bekommen kann, die diese Nachrichten bekommen.Erste Schritte mit Akka Streams

val consumerStream = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head 
val observableConsumer = Observable.fromIterator(consumerStream).map(_.message()) 

Dies ist ziemlich einfach und ordentlich. Irgendwelche Hinweise, wie ich mit Akka-Streams anfangen sollte? Ich möchte das gleiche Beispiel oben verwenden, in dem ich Ereignisse von der Quelle ausgeben möchte. Ich werde später einen Flow und einen Sink haben. Dann, in meiner Hauptklasse, werde ich diese 3 kombinieren, um den Anwendungsdatenfluss zu starten.

Irgendwelche Vorschläge?

Antwort

2

Also hier ist, was ich kam mit:

val kafkaStreamItr = consumer.createMessageStreamsByFilter(topicFilter(topics), 1, keyDecoder, valueDecoder).head 
Source.fromIterator(() => kafkaStreamItr).map(_.message)