2017-01-16 2 views
0

Ich habe bereits eine Source[T], aber ich muss es an eine Funktion übergeben, die eine Stream[T] erfordert.Wie man eine Akka-Quelle in eine scala.stream konvertiert

Ich könnte .run die Quelle und materialisieren alles zu einer Liste und dann eine .toStream auf das Ergebnis, aber das entfernt die Lazy/Stream-Aspekt, den ich behalten möchte.

Ist dies der einzige Weg dies zu erreichen oder fehlt mir etwas?

EDIT:

Nach Wladimirs Kommentar zu lesen, ich glaube, ich bin in der falschen Art und Weise meine Frage nähern. Hier ist ein einfaches Beispiel dafür, was ich habe und was ich will, erstellen:

 // Given a start index, returns a list from startIndex to startIndex+10. Stops at 50. 
     def getMoreData(startIndex: Int)(implicit ec: ExecutionContext): Future[List[Int]] = { 
      println(s"f running with $startIndex") 
      val result: List[Int] = (startIndex until Math.min(startIndex + 10, 50)).toList 
      Future.successful(result) 
     } 

So getMoreData nur einen Dienst emuliert, die Daten, die von der Seite zu mir zurück. Mein erstes Ziel ihm die folgende Funktion zu erstellen:

def getStream(startIndex: Int)(implicit ec: ExecutionContext): Stream[Future[List[Int]]] 

, wo das nächste Future[List[Int]] Element in dem Strom auf dem vorhergehenden abhängt, den letzten Index von dem vorherigen Future Wert im Stream gelesen Einnahme. ZB mit einem Startindex 0:

getStream(0)(0) would return Future[List[0 until 10]] 
getStream(0)(1) would return Future[List[10 until 20]] 
... etc 

Sobald ich diese Funktion haben, möchte ich dann eine zweite Funktion erstellen sie weiter zur Karte auf eine einfache Stream[Int] nach unten. ZB:

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int] 

Streams beginnen wie das falsche Werkzeug für den Job zu fühlen, und ich soll nur eine einfache Schleife stattdessen schreiben. Mir gefiel die Idee von Streams, weil der Konsument den Stream so abbilden/modifizieren/konsumieren kann, wie er es für richtig hält, ohne dass der Produzent etwas davon wissen muss.

+0

Ich bin nicht sicher, ob dies eine gute Idee ist. Was denkst du, wie sich dein 'Stream' verhalten sollte, wenn du das nächste Element daraus erhalten willst und der materialisierte reaktive Stream es (noch) nicht liefern kann? Wie sollten Fehler behandelt werden? Wie sollte der Konsument des "Stream" den materialisierten reaktiven Strom schließen, wenn er damit fertig ist? Ich denke, dass es möglich ist, diese Fragen zu beantworten, aber die resultierende Abstraktion wäre undicht und unhandlich. –

+0

Hmm, ich bin neu in beiden Scala-Streams und Akka-Streams, also nähere ich mich vielleicht dem falschen Weg.Ich habe die 'Source [T]' erstellt, weil ich das 'T' von einer URL holen muss, was bedeutet, dass ich stattdessen eine' Future [T] 'habe. Ich dachte, ich würde 'mapAsync' verwenden, um dieses Problem zu umgehen. Ich denke, was ich wirklich will ist, einen 'Stream [Future [T]]' in einen 'Stream [T]' zu konvertieren, nur dann die URL aufzurufen, wenn sie tatsächlich mehr benötigt. Bin ich in die völlig falsche Richtung gegangen? – JBY

+0

Einige Codebeispiele in der Frage konnten nicht schaden ... –

Antwort

0

Scala Streams sind eine gute Möglichkeit, Ihre Aufgabe innerhalb getStream zu erfüllen; hier ist eine grundlegende Art und Weise zu konstruieren, was Sie suchen:

def getStream(startIndex : Int) 
      (implicit ec: ExecutionContext): Stream[Future[List[Int]]] = 
    Stream 
    .from(startIndex, 10) 
    .map(getMoreData) 

Wo es kompliziert ist, mit Ihrer getFlattenedStream Funktion. Es ist möglich, die Future Wrapper um Ihre List[Int] Werte zu beseitigen, aber es wird eine Await.result Funktionsaufruf, der in der Regel ein Fehler ist erforderlich.

In den meisten Fällen ist es am besten, mit den Futures zu arbeiten und asynchrone Operationen allein zuzulassen. Wenn Sie Ihre ultimative Anforderung/Ziel analysieren, ist es normalerweise nicht notwendig, auf eine Zukunft zu warten.

Aber genau dann, wenn Sie unbedingt die Zukunft fällt dann hier ist der Code, kann es erreichen:

val awaitDuration = 10 Seconds 

def getFlattenedStream(stream: Stream[Future[List[Int]]]): Stream[Int] = 
    stream 
    .map(f => Await.result(f, awaitDuration)) 
    .flatten 
+0

Der Gotcha-Teil ist, dass '.from (startIndex, 10)' nur funktioniert, wenn wir im Voraus wissen, was die Indizes sein werden. Was ich jedoch will ist, dass der nächste Index (10 in diesem Fall) durch das Ergebnis der vorherigen Zukunft bestimmt werden muss. Jeder Schritt im Stream muss warten, bis die vorherige Zukunft abgeschlossen ist, bevor er berechnet werden kann. – JBY

Verwandte Themen