2017-06-08 4 views
0

Es gibt zwei Tabellen TableA und TableB.Kopieren einiger Datensätze von TableA nach TableB mit Slick-Streaming und Akka-Streaming

Ich muss einige Datensätze von TableA zu TableB kopieren. Ich benutze slick-3.0 und verwenden Sie die folgende Art und Weise:

import akka.stream._ 
import akka.stream.scaladsl._ 
... 

//{{ READ DATA FROM TABLE A 
val q = TableA.filter(somePredicate).result 
val source = Source.fromPublisher { 
     db.stream(q.result).mapResult { r => 
     val record: RecordA = someTransformation(r) 
     record 
     } 
    }.grouped(50) // grouping because I want to write records in batch mode 
//}} 

//{{ WRITE DATA TO TABLE B 
val f:Future[Done] = source.runWith(Sink.foreach { batch: Seq[RecordA] => 
     //TODO how to write batch to TableB asynchronously? 
     val insertAction = TableB ++= batch // insert batch to table 
     val fInsert: Future[_] = db.run(insertAction) 
     Await.result(fInsert, ...)   // #1 this works only with blocking 
}) 
//}} 

Aber ich habe mit einem Problem konfrontiert - wie Charge TableB schreiben asynchron (siehe TODO). Jetzt funktioniert der obige Code nur mit der Blockierung in die innere Zukunft (siehe # 1 Kommentar). Gibt es einen richtigen Weg, um diese Aufgabe asynchron zu implementieren?

+0

Was passiert, wenn Sie die innere Zukunft nicht blockieren? – thwiegan

+0

@thwiegan, Wenn ich nicht auf die innere Zukunft blockieren und es dann zurückgeben wird es nicht abgeschlossen –

+1

Dies scheint Ihr Anwendungsfall zu sein: https://stackoverflow.com/questions/36400152/how-are-reactive-streams -in Slick-for-Inserting-Daten verwendet sehe ich nichts anderes als Ihr Beispiel – thwiegan

Antwort

2

Verwenden Sie mapAsync es erwartet eine Zukunft zurückgegeben werden und legt das "unwrapped" Ergebnis in der nächsten Stufe.

source.mapAsync(4){batch: Seq[RecordA] => 
     val insertAction = TableB ++= batch // insert batch to table 
     db.run(insertAction) 
}).to(Sink.ignore).run 
Verwandte Themen