2017-12-27 4 views
1

Ziel ist es, Daten aus einer Datenbank zu streamen, einige Berechnungen auf diesem Datenblock auszuführen (diese Berechnung gibt eine Zukunft einer bestimmten Fallklasse zurück) und diese Daten als Chunked Response zu senden für den Benutzer. Momentan kann ich Daten streamen und die Antwort senden, ohne irgendeine Berechnung durchzuführen. Ich kann diese Berechnung jedoch nicht durchführen und das Ergebnis dann streamen.Slick Streaming-Daten transformieren und Chunked Response senden mit Akka Http

Dies ist die Route, die ich implementiert habe.

def streamingDB1 = 
path("streaming-db1") { 
    get { 
    val src = Source.fromPublisher(db.stream(getRds)) 
    complete(src) 
    } 
} 

Die Funktion getRds gibt die Zeilen einer Tabelle in einem Fall Klasse gemappt (Slick verwenden). Betrachten wir nun die Funktion compute, die jede Zeile als Eingabe nimmt und eine Future einer anderen Fallklasse zurückgibt. So etwas wie

def compute(x: Tweet) : Future[TweetNew] = ? 

Wie kann ich diese Funktion auf Variable src implementieren und die Chunked Antwort senden (als Stream) diese Berechnung für den Benutzer.

Antwort

1

Sie könnten die Quelle verwandeln mapAsync mit:

val src = 
    Source.fromPublisher(db.stream(getRds)) 
     .mapAsync(parallelism = 3)(compute) 

complete(src) 

den Grad der Parallelität einstellen je nach Bedarf.

+0

Dies funktioniert nicht. Ich führe den curl-Befehl aus, um den Endpunkt zu treffen. Die Verbindung wird jedoch geschlossen. – user3294786