Ich verwende Kuchenlösung Akka client for scala and Kafka. Während ich einen KafkaProducerActor
Akteur erschaffe und versuche, Nachricht unter Verwendung ask
des Musters zu schicken und Zukunft und einige Operationen zurückzubringen, aber jedes Mal, werde ich ask
Zeitlimitausnahme gegenüberstellen. Unten ist mein Code:Apache Kafka: KafkaProducerActor löst Ausnahme-ASK-Timeout aus.
class SimpleAkkaProducer (config: Config, system: ActorSystem) {
private val producerConf = KafkaProducer.
Conf(config,
keySerializer = new StringSerializer,
valueSerializer = new StringSerializer)
val actorRef = system.actorOf(KafkaProducerActor.props(producerConf))
def sendMessageWayOne(record: ProducerRecords[String, String]) = {
actorRef ! record
}
def sendMessageWayTwo(record: ProducerRecords[String, String]) = {
implicit val timeout = Timeout(100.seconds)
val future = (actorRef ? record).mapTo[String]
future onComplete {
case Success(data) => println(s" >>>>>>>>>>>> ${data}")
case Failure(ex) => ex.printStackTrace()
}
}
}
object SimpleAkkaProducer {
def main(args: Array[String]): Unit = {
val system = ActorSystem("KafkaProducerActor")
val config = ConfigFactory.defaultApplication()
val simpleAkkaProducer = new SimpleAkkaProducer(config, system)
val topic = config.getString("akka.topic")
val messageOne = ProducerRecords.fromKeyValues[String, String](topic,
Seq((Some("Topics"), "First Message")), None, None)
simpleAkkaProducer.sendMessageWayOne(messageOne)
simpleAkkaProducer.sendMessageWayTwo(messageOne)
}
}
Es folgt Ausnahme:
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://KafkaProducerActor/user/$a#-1520717141]] after [100000 ms]. Sender[null] sent message of type "cakesolutions.kafka.akka.ProducerRecords".
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:864)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:862)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:745)
Fügen Sie die Definition von 'KafkaProducerActor'. –
'KafkaProducerActor' ist Design von API und Implementierung wie https://github.com/cakesolutions/scala-kafka-client/blob/1cbeccbb183ca06585f4b9fb1e366048e993ac51/akka/src/main/scala/cakesolutions/kafka/akka/KafkaProducerActor.scala –
Hey @YuvalItzchakov hast du irgendeine Lösung dafür gefunden? –