2017-05-02 3 views
0

I einen akka Strom wie akka stream consume web socket von einem Web-Buchse und möchte eine wiederverwendbare Graph Stufe bauen (inlet: Bach, FlowShape: Zum JSON Spezifizierungs Ursprung ein zusätzliches Feld dhakka Strom benutzerdefinierte Graph Stufe

hinzuzufügen
{ 
..., 
"origin":"blockchain.info" 
} 

und ein outlet zu kafka

ich die folgenden drei Problemen konfrontiert.

  • nicht in der Lage, meinen Kopf zu wickeln um die Schaffung fließt ein benutzerdefinierte Inlet aus dem Web-Buchse
  • nicht in der Lage zu integrieren kafka direkt in den Strom (den Code unten)
  • nicht sicher, ob der Transformator das zusätzliche Feld hinzuzufügen erforderlich wäre, um die json deserialisiert den Ursprung hinzufügen

Das Beispielprojekt (nur fließen) wie folgt aussieht:

import system.dispatcher 
implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

val incoming: Sink[Message, Future[Done]] = 
    Flow[Message].mapAsync(4) { 
     case message: TextMessage.Strict => 
     println(message.text) 
     Future.successful(Done) 
     case message: TextMessage.Streamed => 
     message.textStream.runForeach(println) 
     case message: BinaryMessage => 
     message.dataStream.runWith(Sink.ignore) 
    }.toMat(Sink.last)(Keep.right) 

val producerSettings = ProducerSettings(system, new ByteArraySerializer, new StringSerializer) 
    .withBootstrapServers("localhost:9092") 

val outgoing = Source.single(TextMessage("{\"op\":\"unconfirmed_sub\"}")).concatMat(Source.maybe)(Keep.right) 

val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("wss://ws.blockchain.info/inv")) 

val ((completionPromise, upgradeResponse), closed) = 
    outgoing 
     .viaMat(webSocketFlow)(Keep.both) 
     .toMat(incoming)(Keep.both) 
     // TODO not working integrating kafka here 
     // .map(_.toString) 
     // .map { elem => 
     //  println(s"PlainSinkProducer produce: ${elem}") 
     //  new ProducerRecord[Array[Byte], String]("topic1", elem) 
     // } 
     // .runWith(Producer.plainSink(producerSettings)) 
     .run() 

val connected = upgradeResponse.flatMap { upgrade => 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Future.successful(Done) 
    } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
     system.terminate 
    } 
    } 

// kafka that works/writes dummy data 
val done1 = Source(1 to 100) 
    .map(_.toString) 
    .map { elem => 
     println(s"PlainSinkProducer produce: ${elem}") 
     new ProducerRecord[Array[Byte], String]("topic1", elem) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

Antwort

1

Ein Thema, um die incoming Bühne, die als modelliert a Sink. wo sollte es als Flow modelliert werden. um anschließend Nachrichten an Kafka zu übermitteln.

Da eingehende Textnachrichten Streamed sein können. Sie können flatMapMerge combinator verwenden, da die Notwendigkeit zu vermeiden, folgt gesamte (potenziell große) Nachrichten im Speicher zu speichern:

val incoming: Flow[Message, String, NotUsed] = Flow[Message].mapAsync(4) { 
    case msg: BinaryMessage => 
     msg.dataStream.runWith(Sink.ignore) 
     Future.successful(None) 
    case TextMessage.Streamed(src) => 
     src.runFold("")(_ + _).map { msg => Some(msg) } 
    }.collect { 
    case Some(msg) => msg 
    } 

An dieser Stelle Sie etwas, das Strings bekam produziert und kann auf Kafka angeschlossen werden:

val addOrigin: Flow[String, String, NotUsed] = ??? 

    val ((completionPromise, upgradeResponse), closed) = 
    outgoing 
     .viaMat(webSocketFlow)(Keep.both) 
     .via(incoming) 
     .via(addOrigin) 
     .map { elem => 
     println(s"PlainSinkProducer produce: ${elem}") 
     new ProducerRecord[Array[Byte], String]("topic1", elem) 
     } 
     .toMat(Producer.plainSink(producerSettings))(Keep.both) 
     .run() 
+0

Das funktioniert super als Starter! Aber wie kann ich 1) einen Transformator erstellen, der '" origin "hinzufügt:" blockchain.info "' ein Feld und 2) daraus eine Grafikbühne erstellen? (hier ein lauffähiger Beispielcode https://github.com/geoHeil/akkaStreamsIngest) –

+0

1) fügen Sie einfach einen Fluss nach 'incoming' hinzu, der der Nachricht den gewünschten Inhalt hinzufügt (siehe geänderten Code) 2) was möchten Sie? Erstellen Sie eine Grafikbühne aus? –

Verwandte Themen