läuft ich einen Strom haben, dassAkka Strom parallel
- Streams für HTTP-Post eine Liste von Ereignissen
- mapconcat der Liste der Ereignisse in Stromelemente empfangen
- Ereignisse in kafka Rekord konvertieren
- produzieren die rekord mit reaktiven kafka (akka strom kafka sinkerzeuger)
Hier ist der vereinfachte code
// flow to split group of lines into lines
val splitLines = Flow[List[Evt]].mapConcat(list=>list)
// sink to produce kafka records in kafka
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt]
.map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value))
.toMat(Producer.plainSink(kafka))(Keep.right)
val routes = {
path("ingest") {
post {
(entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) =>
val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat)
val result = onComplete(ingest){
case Success(value) => complete(s"OK")
case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}"))
}
complete("eventList ingested: " + result)
}
}
}
}
Können Sie mich markieren, was parallel ausgeführt wird und was sequenziell ist?
Ich denke, die mapConcat Sequence die Ereignisse im Stream so wie könnte ich den Strom parallelisieren, so dass nach der mapConcat jeder Schritt parallel verarbeitet werden würde?
Wäre eine einfache mapAsyncUnordered ausreichend? Oder sollte ich das GraphDSL mit einem Balance and Merge verwenden?
das Problem mit extractDataBytes ist, dass ich easly nicht unmarshall JSON kann ... – vgkowski
Hmm ich sehe. Ich brauchte nie einen unendlichen Strom von Json zu analysieren, aber ich hörte, wie Jawn es unterstützt. https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert
Überprüfen Sie auch http://owlike.github.io/genson/ – expert