Es gibt zwei Tabellen TableA
und TableB
.Kopieren einiger Datensätze von TableA nach TableB mit Slick-Streaming und Akka-Streaming
Ich muss einige Datensätze von TableA
zu TableB
kopieren. Ich benutze slick-3.0
und verwenden Sie die folgende Art und Weise:
import akka.stream._
import akka.stream.scaladsl._
...
//{{ READ DATA FROM TABLE A
val q = TableA.filter(somePredicate).result
val source = Source.fromPublisher {
db.stream(q.result).mapResult { r =>
val record: RecordA = someTransformation(r)
record
}
}.grouped(50) // grouping because I want to write records in batch mode
//}}
//{{ WRITE DATA TO TABLE B
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] =>
//TODO how to write batch to TableB asynchronously?
val insertAction = TableB ++= batch // insert batch to table
val fInsert: Future[_] = db.run(insertAction)
Await.result(fInsert, ...) // #1 this works only with blocking
})
//}}
Aber ich habe mit einem Problem konfrontiert - wie Charge TableB
schreiben asynchron (siehe TODO). Jetzt funktioniert der obige Code nur mit der Blockierung in die innere Zukunft (siehe # 1 Kommentar). Gibt es einen richtigen Weg, um diese Aufgabe asynchron zu implementieren?
Was passiert, wenn Sie die innere Zukunft nicht blockieren? – thwiegan
@thwiegan, Wenn ich nicht auf die innere Zukunft blockieren und es dann zurückgeben wird es nicht abgeschlossen –
Dies scheint Ihr Anwendungsfall zu sein: https://stackoverflow.com/questions/36400152/how-are-reactive-streams -in Slick-for-Inserting-Daten verwendet sehe ich nichts anderes als Ihr Beispiel – thwiegan