2016-04-26 2 views
2

Ich habe einen einfachen Builder, der eine Source und Sink 's von Akka Streams akzeptiert und bei der Kompilierzeit überprüft, ob eine Methode, die auf diesen ausgeführt wird, Typen hat, die Source und Senken entsprechen.Generic stream builder mit formlosen HLists

class EventProcessorTask(config: EventProcessorConfig =  
    EventProcessorConfig()) { 
    def source[In, MatIn](source: Source[In, MatIn]): SourcedTask[In, MatIn] = new SourcedTask[In, MatIn](source, config) 
} 

class SourcedTask[In, MatIn](source: Source[In, MatIn], config: EventProcessorConfig) { 
    def withPartitioning[Id](partitioningF: In => Id): SourcedTaskWithPartitioning[In, MatIn, Id] = 
    new SourcedTaskWithPartitioning[In, MatIn, Id](source, partitioningF, config) 
} 

class SourcedTaskWithPartitioning[In, MatIn, Id](source: Source[In, MatIn], partitioningF: In => Id, config: EventProcessorConfig) { 
    def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil] = 
    new WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil](source, sink :: HNil, partitioningF, config) 
} 

class WiredTask[In, MatIn, L <: HList, Id, SinksTypes <: HList](
                  source: Source[In, MatIn], 
                  sinks: SinksTypes, 
                  partitioningF: In => Id, 
                  config: EventProcessorConfig 
                  ) { 
    def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes] = 
new WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes](
    source, sink :: sinks, partitioningF, config 
) 

    def execute[N <: Nat, P <: Product, F, R <: HList, SinksRev <: HList] 
    (executionMethod: In => Future[P])(
    implicit generic: Generic.Aux[P, R], 
    rev: Reverse.Aux[L, R], 
    sinksRev: Reverse.Aux[SinksTypes, SinksRev], 
    executionContext: ExecutionContext, 
    l: Length.Aux[SinksRev, N] 
    ): Unit = { 
    val sinksReversed = sinksRev(sinks) 

// val sinksLength= sinksReversed.length.toInt 
} 
} 

Der obige Code kompiliert, aber wenn ich versuche, eine Broadcast für die Sinks bauen Ich kann nicht einmal die Größe der Liste erhalten (auf Kommentar-Code). Der nächste Schritt wäre, alle Sinks in SinksRev zu entsprechen Typ von P, die mir erlauben würde, Nachrichten zu senden, die von executionMethod erzeugt werden, die ein Tupel an Sinks zurückgibt, das P Typen entspricht.

I.e.

new EventProcessorTask() 
    .source(Source.single("str")) 
    .withPartitioning(r => 1) 
    .withSink(Sink.head[Long]) 
    .withSink(Sink.foreach((s: String) =>())) 
    .execute(
    in => Future.successful((null.asInstanceOf[Long], null.asInstanceOf[String])) 
) 

Long zum ersten Sink und String zu zweiten gehen sollte.

Jede Hilfe würde sehr geschätzt werden. Ich könnte hier etwas sehr falsch machen, aber das Konzept schien nett zu der Zeit, als ich anfing, daran zu arbeiten (jetzt nicht so viel). So oder so würde ich gerne verstehen, was ich hier vermisse.

Zusammenfassend sind die Fragen: 1. Warum kann ich nicht Int Darstellung von SinksRev Größe bekommen? 2. Wie passt man Sink s von SinksRev zu den entsprechenden Elementen in P an, um eine Broadcast basierte GraphShape zu bauen?

Antwort

1

Ich bin in formlos und akka Streams vor kurzem selbst, also habe ich beschlossen, diese Frage zu versuchen. Ihr Fall scheint ziemlich kompliziert zu sein, also nahm ich, was ich verstand, vereinfachte es ein bisschen und kam mit Code, der etwas zu tun scheint, was dem ähnlich ist, was Sie wahrscheinlich wollen. Ich kann immer noch nicht die Länge berechnen, aber da es ein Builder ist, könnte += 1 ausreichen.

Hier ist das Ergebnis mit Optionen statt Senken. Es nimmt eine Liste von Optionen und wendet eine Funktion auf den Inhalt von Optionen an. Wie ich bereits erwähnte, habe ich den Fall vereinfacht.

import shapeless._ 

object Shapes extends App { 

    import ops.function._ 
    import syntax.std.function._ 

    case class Thing[Types <: HList, Out] private(sources: List[Option[_]]) { 

    def withOption[T](o: Option[T]) = Thing[T :: Types, Out](o :: sources) 

    def withOutput[T] = Thing[Types, T](sources) 

    def apply[F](f: F)(implicit fp: FnToProduct.Aux[F, Types => Out]) = { 
     val a: Types = sources.foldLeft[HList](HNil)((m, v) ⇒ v.get :: m).asInstanceOf[Types] 
     f.toProduct(a) 
    } 
    } 

    object Thing { 
    def withOption[T](o: Option[T]) = Thing[T :: HNil, AnyVal](o :: Nil) 
    } 

    val r = Thing 
    .withOption(Some(1)) 
    .withOption(Some(2)) 
    .withOption(Some(3)) 
    .withOutput[Unit] 
    .apply { 
     (x: Int, y: Int, z: Int) ⇒ println(x + y + z) 
    } 

    println(r) 
}