Ich habe einen einfachen Builder, der eine Source
und Sink
's von Akka Streams akzeptiert und bei der Kompilierzeit überprüft, ob eine Methode, die auf diesen ausgeführt wird, Typen hat, die Source und Senken entsprechen.Generic stream builder mit formlosen HLists
class EventProcessorTask(config: EventProcessorConfig =
EventProcessorConfig()) {
def source[In, MatIn](source: Source[In, MatIn]): SourcedTask[In, MatIn] = new SourcedTask[In, MatIn](source, config)
}
class SourcedTask[In, MatIn](source: Source[In, MatIn], config: EventProcessorConfig) {
def withPartitioning[Id](partitioningF: In => Id): SourcedTaskWithPartitioning[In, MatIn, Id] =
new SourcedTaskWithPartitioning[In, MatIn, Id](source, partitioningF, config)
}
class SourcedTaskWithPartitioning[In, MatIn, Id](source: Source[In, MatIn], partitioningF: In => Id, config: EventProcessorConfig) {
def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil] =
new WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil](source, sink :: HNil, partitioningF, config)
}
class WiredTask[In, MatIn, L <: HList, Id, SinksTypes <: HList](
source: Source[In, MatIn],
sinks: SinksTypes,
partitioningF: In => Id,
config: EventProcessorConfig
) {
def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes] =
new WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes](
source, sink :: sinks, partitioningF, config
)
def execute[N <: Nat, P <: Product, F, R <: HList, SinksRev <: HList]
(executionMethod: In => Future[P])(
implicit generic: Generic.Aux[P, R],
rev: Reverse.Aux[L, R],
sinksRev: Reverse.Aux[SinksTypes, SinksRev],
executionContext: ExecutionContext,
l: Length.Aux[SinksRev, N]
): Unit = {
val sinksReversed = sinksRev(sinks)
// val sinksLength= sinksReversed.length.toInt
}
}
Der obige Code kompiliert, aber wenn ich versuche, eine Broadcast
für die Sinks
bauen Ich kann nicht einmal die Größe der Liste erhalten (auf Kommentar-Code). Der nächste Schritt wäre, alle Sinks
in SinksRev
zu entsprechen Typ von P
, die mir erlauben würde, Nachrichten zu senden, die von executionMethod
erzeugt werden, die ein Tupel an Sinks
zurückgibt, das P
Typen entspricht.
I.e.
new EventProcessorTask()
.source(Source.single("str"))
.withPartitioning(r => 1)
.withSink(Sink.head[Long])
.withSink(Sink.foreach((s: String) =>()))
.execute(
in => Future.successful((null.asInstanceOf[Long], null.asInstanceOf[String]))
)
Long
zum ersten Sink
und String
zu zweiten gehen sollte.
Jede Hilfe würde sehr geschätzt werden. Ich könnte hier etwas sehr falsch machen, aber das Konzept schien nett zu der Zeit, als ich anfing, daran zu arbeiten (jetzt nicht so viel). So oder so würde ich gerne verstehen, was ich hier vermisse.
Zusammenfassend sind die Fragen: 1. Warum kann ich nicht Int
Darstellung von SinksRev
Größe bekommen? 2. Wie passt man Sink
s von SinksRev
zu den entsprechenden Elementen in P
an, um eine Broadcast
basierte GraphShape
zu bauen?