Beispielszenario: Gruppenbytes eines Streams in Blöcke von Größen, die von einem anderen Stream (von Ganzzahlen) bestimmt werden.Wie aggregiert man Elemente eines Akka-Streams basierend auf Elementen eines anderen?
def partition[A, B, C](
first:Source[A, NotUsed],
second:Source[B, NotUsed],
aggregate:(Int => Seq[A], B) => C
):Source[C, NotUsed] = ???
val bytes:Source[Byte, NotUsed] = ???
val sizes:Source[Int, NotUsed] = ???
val chunks:Source[ByteString, NotUsed] =
partition(bytes, sizes, (grab, count) => ByteString(grab(count)))
Mein erster Versuch, eine Kombination aus Flow#scan und Flow#prefixAndTail, aber es fühlt sich nicht ganz richtig (siehe unten). Ich habe auch einen Blick auf Framing geworfen, aber es scheint nicht auf das obige Beispielszenario anwendbar zu sein (und es ist auch nicht allgemein genug, um Bytestring-Streams aufzunehmen). Ich schätze, meine einzige Option ist es, Graphs (oder die allgemeinere FlowOps#transform) zu verwenden, aber ich bin nicht annähernd genug (noch) mit Akka-Streams, um das zu versuchen.
Hier ist, was ich in der Lage war, mit so weit zu kommen (spezifisch für das Beispielszenario):
val chunks:Source[ByteString, NotUsed] = sizes
.scan(bytes prefixAndTail 0) {
(grouped, count) => grouped flatMapConcat {
case (chunk, remainder) => remainder prefixAndTail count
}
}
.flatMapConcat(identity)
.collect { case (chunk, _) if chunk.nonEmpty => ByteString(chunk:_*) }
das Problem bei diesem Ansatz ist, dass die 'size' Anzahl der Zeichen wird * immer * * von der Vorderseite * von' bytes' abgeholt werden, so dass die Elemente des resultierenden Strom wird alle das gleiche Präfix. – Andrey
@Andrey du hast Recht. Ich habe meine Antwort aktualisiert, mit der tatsächlichen Implementierung einer 'GraphStage', die als etwas zwischen dem Zip und Framing funktioniert. – lpiepiora