2016-03-20 2 views
3

Beispielszenario: Gruppenbytes eines Streams in Blöcke von Größen, die von einem anderen Stream (von Ganzzahlen) bestimmt werden.Wie aggregiert man Elemente eines Akka-Streams basierend auf Elementen eines anderen?

def partition[A, B, C](
    first:Source[A, NotUsed], 
    second:Source[B, NotUsed], 
    aggregate:(Int => Seq[A], B) => C 
):Source[C, NotUsed] = ??? 

val bytes:Source[Byte, NotUsed] = ??? 
val sizes:Source[Int, NotUsed] = ??? 

val chunks:Source[ByteString, NotUsed] = 
    partition(bytes, sizes, (grab, count) => ByteString(grab(count))) 

Mein erster Versuch, eine Kombination aus Flow#scan und Flow#prefixAndTail, aber es fühlt sich nicht ganz richtig (siehe unten). Ich habe auch einen Blick auf Framing geworfen, aber es scheint nicht auf das obige Beispielszenario anwendbar zu sein (und es ist auch nicht allgemein genug, um Bytestring-Streams aufzunehmen). Ich schätze, meine einzige Option ist es, Graphs (oder die allgemeinere FlowOps#transform) zu verwenden, aber ich bin nicht annähernd genug (noch) mit Akka-Streams, um das zu versuchen.


Hier ist, was ich in der Lage war, mit so weit zu kommen (spezifisch für das Beispielszenario):

val chunks:Source[ByteString, NotUsed] = sizes 
    .scan(bytes prefixAndTail 0) { 
    (grouped, count) => grouped flatMapConcat { 
     case (chunk, remainder) => remainder prefixAndTail count 
    } 
    } 
    .flatMapConcat(identity) 
    .collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) } 

Antwort

4

Ich glaube, Sie GraphStage die Verarbeitung als benutzerdefinierte implementieren können. Die Bühne hätte zwei Inlet Elemente. Einer nimmt die Bytes und der andere nimmt die Größen. Es würde ein Outlet Element haben, das die Werte produziert.

Betrachten Sie die folgenden Eingabeströme.

def randomChars = Iterator.continually(Random.nextPrintableChar()) 
def randomNumbers = Iterator.continually(math.abs(Random.nextInt() % 50)) 

val bytes: Source[Char, NotUsed] = 
    Source.fromIterator(() => randomChars) 

val sizes: Source[Int, NotUsed] = 
    Source.fromIterator(() => randomNumbers).filter(_ != 0) 

mit dann Informationen zur Beschreibung individuelle Stream Processing (http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html) Sie GraphStage konstruieren können.

case class ZipFraming() extends GraphStage[FanInShape2[Int, Char, (Int, ByteString)]] { 

    override def initialAttributes = Attributes.name("ZipFraming") 

    override val shape: FanInShape2[Int, Char, (Int, ByteString)] = 
    new FanInShape2[Int, Char, (Int, ByteString)]("ZipFraming") 

    val inFrameSize: Inlet[Int] = shape.in0 
    val inElements: Inlet[Char] = shape.in1 

    def out: Outlet[(Int, ByteString)] = shape.out 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    new GraphStageLogic(shape) { 
     // we will buffer as much as 512 characters from the input 
     val MaxBufferSize = 512 
     // the buffer for the received chars 
     var buffer = Vector.empty[Char] 
     // the needed number of elements 
     var needed: Int = -1 
     // if the downstream is waiting 
     var isDemanding = false 

     override def preStart(): Unit = { 
     pull(inFrameSize) 
     pull(inElements) 
     } 

     setHandler(inElements, new InHandler { 
     override def onPush(): Unit = { 
      // we buffer elements as long as we can 
      if (buffer.size < MaxBufferSize) { 
      buffer = buffer :+ grab(inElements) 
      pull(inElements) 
      } 
      emit() 
     } 
     }) 

     setHandler(inFrameSize, new InHandler { 
     override def onPush(): Unit = { 
      needed = grab(inFrameSize) 
      emit() 
     } 
     }) 

     setHandler(out, new OutHandler { 
     override def onPull(): Unit = { 
      isDemanding = true 
      emit() 
     } 
     }) 

     def emit(): Unit = { 
     if (needed > 0 && buffer.length >= needed && isDemanding) { 
      val (emit, reminder) = buffer.splitAt(needed) 
      push(out, (needed, ByteString(emit.map(_.toByte).toArray))) 
      buffer = reminder 
      needed = -1 
      isDemanding = false 
      pull(inFrameSize) 
      if (!hasBeenPulled(inElements)) pull(inElements) 
     } 
     } 
    } 
} 

Und das ist, wie Sie es ausführen.

RunnableGraph.fromGraph(GraphDSL.create(bytes, sizes)(Keep.none) { implicit b => 
    (bs, ss) => 
    import GraphDSL.Implicits._ 

    val zipFraming = b.add(ZipFraming()) 

    ss ~> zipFraming.in0 
    bs ~> zipFraming.in1 

    zipFraming.out ~> Sink.foreach[(Int, ByteString)](e => println((e._1, e._2.utf8String))) 

    ClosedShape 
}).run() 
+0

das Problem bei diesem Ansatz ist, dass die 'size' Anzahl der Zeichen wird * immer * * von der Vorderseite * von' bytes' abgeholt werden, so dass die Elemente des resultierenden Strom wird alle das gleiche Präfix. – Andrey

+0

@Andrey du hast Recht. Ich habe meine Antwort aktualisiert, mit der tatsächlichen Implementierung einer 'GraphStage', die als etwas zwischen dem Zip und Framing funktioniert. – lpiepiora

Verwandte Themen