2017-08-24 6 views
0

Ich verwende den folgenden Code zur Zeit eine Akka Quelle (wie beim Lesen einer Datei mit Akka des FileIO empfangen) konvertieren zu einem RxJava2 Flowable:Konvertieren von Akka-Quellen in RxJava2 Flowable?

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 
    final Publisher<ByteString> uncompressedData = 
     data.via(compType) 
      .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer); 
    return Flowable.fromPublisher(uncompressedData) 
     .map(bytes -> Buffer.buffer(bytes.toArray())); 
} 
ist

Mein Problem mit diesem (in Betrieb) Lösung, dass zumindest so weit ich es derzeit verstehe, führt der Methodenaufruf .runWith() bereits den Code aus, dh sammelt alle Daten von der gegebenen Quelle, puffert sie und legt sie dann in einen Verleger. Gibt es einen Weg um es an dieser Stelle zu führen? Ich möchte einfach die Konvertierung an dieser Stelle ohne den Materializer definieren und erst dann alles ausführen, wenn etwas zu dem Flowable zu einem späteren Zeitpunkt abonniert wird.

Danke!

Antwort

0

Verwenden defer (Nebenbei bemerkt: Ich hatte das schon oft zu tun, weil Akka Quellen ein Schuss sind):

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data, 
     Flow<ByteString, ByteString, NotUsed> compType) { 

    return Flowable.defer(() -> data.via(compType) 
     .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer) 
    ).map(bytes -> Buffer.buffer(bytes.toArray())); 
} 
Verwandte Themen