2017-03-02 2 views
0

läuft ich einen Strom haben, dassAkka Strom parallel

  1. Streams für HTTP-Post eine Liste von Ereignissen
  2. mapconcat der Liste der Ereignisse in Stromelemente empfangen
  3. Ereignisse in kafka Rekord konvertieren
  4. produzieren die rekord mit reaktiven kafka (akka strom kafka sinkerzeuger)

Hier ist der vereinfachte code

// flow to split group of lines into lines 
    val splitLines = Flow[List[Evt]].mapConcat(list=>list) 

// sink to produce kafka records in kafka 
val kafkaSink: Sink[Evt, Future[Done]] = Flow[Evt] 
    .map(evt=> new ProducerRecord[Array[Byte], String](evt.eventType, evt.value)) 
    .toMat(Producer.plainSink(kafka))(Keep.right) 

val routes = { 
    path("ingest") { 
     post { 
     (entity(as[List[ReactiveEvent]]) & extractMaterializer) { (eventIngestList,mat) => 
      val ingest= Source.single(eventIngestList).via(splitLines).runWith(kafkaSink)(mat) 
      val result = onComplete(ingest){ 
       case Success(value) => complete(s"OK") 
       case Failure(ex) => complete((StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")) 
      } 
      complete("eventList ingested: " + result) 
      } 
     } 
    } 
    } 

Können Sie mich markieren, was parallel ausgeführt wird und was sequenziell ist?

Ich denke, die mapConcat Sequence die Ereignisse im Stream so wie könnte ich den Strom parallelisieren, so dass nach der mapConcat jeder Schritt parallel verarbeitet werden würde?

Wäre eine einfache mapAsyncUnordered ausreichend? Oder sollte ich das GraphDSL mit einem Balance and Merge verwenden?

Antwort

2

In Ihrem Fall wird es sequentiell sein, denke ich. Außerdem erhalten Sie eine vollständige Anfrage, bevor Sie beginnen, Daten an Kafka zu senden. Ich würde extractDataBytes Direktive verwenden, die Ihnen src: Source[ByteString, Any] gibt. Dann würde ich es verarbeiten wie

src 
    .via(Framing.delimiter(ByteString("\n"), 1024 /* Max size of line */ , allowTruncation = true).map(_.utf8String)) 
    .mapConcat { line => 
    line.split(",") 
    }.async 
    .runWith(kafkaSink)(mat) 
+0

das Problem mit extractDataBytes ist, dass ich easly nicht unmarshall JSON kann ... – vgkowski

+0

Hmm ich sehe. Ich brauchte nie einen unendlichen Strom von Json zu analysieren, aber ich hörte, wie Jawn es unterstützt. https://github.com/non/jawn/blob/master/parser/src/main/scala/jawn/AsyncParser.scala – expert

+0

Überprüfen Sie auch http://owlike.github.io/genson/ – expert