2017-02-12 3 views
1

Ich verwende Kuchenlösung Akka client for scala and Kafka. Während ich einen KafkaProducerActor Akteur erschaffe und versuche, Nachricht unter Verwendung ask des Musters zu schicken und Zukunft und einige Operationen zurückzubringen, aber jedes Mal, werde ich ask Zeitlimitausnahme gegenüberstellen. Unten ist mein Code:Apache Kafka: KafkaProducerActor löst Ausnahme-ASK-Timeout aus.

class SimpleAkkaProducer (config: Config, system: ActorSystem) { 

    private val producerConf = KafkaProducer. 
    Conf(config, 
     keySerializer = new StringSerializer, 
     valueSerializer = new StringSerializer) 

    val actorRef = system.actorOf(KafkaProducerActor.props(producerConf)) 

    def sendMessageWayOne(record: ProducerRecords[String, String]) = { 
    actorRef ! record 
    } 

    def sendMessageWayTwo(record: ProducerRecords[String, String]) = { 
    implicit val timeout = Timeout(100.seconds) 
    val future = (actorRef ? record).mapTo[String] 
    future onComplete { 
     case Success(data) => println(s" >>>>>>>>>>>> ${data}") 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

object SimpleAkkaProducer { 
    def main(args: Array[String]): Unit = { 
    val system = ActorSystem("KafkaProducerActor") 
    val config = ConfigFactory.defaultApplication() 
    val simpleAkkaProducer = new SimpleAkkaProducer(config, system) 

    val topic = config.getString("akka.topic") 
    val messageOne = ProducerRecords.fromKeyValues[String, String](topic, 
     Seq((Some("Topics"), "First Message")), None, None) 

    simpleAkkaProducer.sendMessageWayOne(messageOne) 
    simpleAkkaProducer.sendMessageWayTwo(messageOne) 
    } 
} 

Es folgt Ausnahme:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords". 
    at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604) 
    at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) 
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864) 
    at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109) 
    at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103) 
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862) 
    at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) 
    at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) 
    at java.lang.Thread.run(Thread.java:745) 
+0

Fügen Sie die Definition von 'KafkaProducerActor'. –

+0

'KafkaProducerActor' ist Design von API und Implementierung wie https://github.com/cakesolutions/scala-kafka-client/blob/1cbeccbb183ca06585f4b9fb1e366048e993ac51/akka/src/main/scala/cakesolutions/kafka/akka/KafkaProducerActor.scala –

+0

Hey @YuvalItzchakov hast du irgendeine Lösung dafür gefunden? –

Antwort

2

Der Produzent Schauspieler reagiert nur auf den Absender, wenn Sie die successResponse und failureResponse Werte in der ProducerRecords geben etwas anderes als None zu sein. Der Wert successResponse wird zurück an den Absender gesendet, wenn der Kafka-Schreibvorgang erfolgreich ist, und der Wert failureResponse wird zurückgeschickt, wenn der Kafka-Schreibvorgang fehlschlägt.

Beispiel:

val record = ProducerRecords.fromKeyValues[String, String](
    topic = topic, 
    keyValues = Seq((Some("Topics"), "First Message")), 
    successResponse = Some("success"), 
    failureResponse = Some("failure") 
) 

val future = (actorRef ? record).mapTo[String] 
future onComplete { 
    case Success("success") => println("Send succeeded!") 
    case Success("failure") => println("Send failed!") 
    case Success(data) => println(s"Send result: $data") 
    case Failure(ex) => ex.printStackTrace() 
} 
+0

Okay @Jaakko, das macht Sinn, Das funktioniert, aber wie bekommen wir 'RrecordMetaData' nach dem Senden der Nachricht? –

+0

Leider gibt es keine Möglichkeit, die 'RecordMetaData' mit der aktuellen Version des Producer Actors zu erhalten. Patches und Vorschläge sind auf der Projektseite willkommen. :) –

+0

Okay danke @Jaakko für Ihre Hilfe. –