2016-04-23 13 views
0

Ich bin neu bei AKKA Streams. (Mit Akka v 2.4.4) Ich versuche, einen Websocket zu erstellen, der neue Benachrichtigungen an abonnierte Clients senden kann. Meine Strategie besteht darin, einen ActorPublisher zu implementieren, an den ich später eine Nachricht senden kann, und ihn dann an die Kunden weiterleiten zu lassen.Actorpublisher als Quelle in handleMessagesWithSinkSource

case class Tick() 

class TickActor extends ActorPublisher[Int] { 
    import scala.concurrent.duration._ 

    implicit val ec = context.dispatcher 

    val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())` 

    var cnt = 0 
    var buffer = Vector.empty[Int] 

    override def receive: Receive = { 
    case Tick() => { 
     cnt = cnt + 1 
     if (buffer.isEmpty && totalDemand > 0) { 
     onNext(cnt) 
     } 
     else { 
     buffer :+= cnt 
     if (totalDemand > 0) { 
      val (use,keep) = buffer.splitAt(totalDemand.toInt) 
      buffer = keep 
      use foreach onNext 
     } 
     } 
    } 
    } 

    override def postStop() = tick.cancel() 
} 

Mein Problem ist, dass ich weiß nicht, wie es als Quelle zu verwenden:

Um Ich kopierte ein Beispiel eines ActorPublisher loszulegen.

Ich habe versucht, die folgenden:

val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString)) 
    optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() { 
    case Some(upgrade) => 
     complete(
     upgrade.handleMessagesWithSinkSource(Sink.ignore,source)) 
    case None => 
     reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection) 
    } 

Aber wenn ich mit einem Klienten ich folgendes Classcast erhalten verbinden: java.lang.ClassCastException: java.lang.Integer kann nicht auf scala.runtime.Nothing gegossen werden $

Wenn ich die Quelle zu ändern:

val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt())) 
     .filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString)) 

Es läuft ganz gut.

Ich kämpfe ein bisschen die Punkte verbinden, so hoffentlich können Sie mich in die richtige Richtung führen.

Antwort

0

Ich habe Ihr Beispiel versucht und konnte das Problem reproduzieren. Ich habe nur eine Änderung vorgenommen, um das Problem zu beheben. Das ist der Typ-Parameter hinzugefügt und es macht jetzt Sinn, weil irgendwo im Akka-Stream gibt es einen Code wie elem.asInstanceOf[T]. Wenn der Typ im actorPublisher fehlt, wird der Typ als

val source = Source.actorPublisher[Int](Props[TickActor]).map(i => TextMessage(i.toString)) 
gefolgert
Verwandte Themen