Ich schreibe akka, Kafka Producer in scala, ich versuche, Nachrichten an kafka broker von scala kafka client zu senden und der Broker bekommt diese Nachricht nicht, ich habe es verifiziert Starten von Kafka Consumer über die Befehlszeile. Kafka Producer und Consumer funktionieren gut über die Eingabeaufforderung. Kafka ist Kerberos und SASL_PlainText Sicherheit aktiviert.Senden von Daten an Kerberos Enabled Kafka-Cluster von scala Client
Hier finden Sie meine conf-Datei, Client-Code und Anwendungsprotokolle. Ich denke, es muss ein Problem bei der Verbindung mit Kerberos aus dem Code sein.
Scala Auftraggeber:
package com.ABC.adds.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object faqProducer extends App with LazyLogging{
val config = ConfigFactory.load()
implicit val system = ActorSystem.create("adds-faq-producer", config)
implicit val mat = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
.withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
.withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")
val xstream = new XStream(new DomDriver)
val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
xstream.alias("faq", classOf[PPOfaq])
val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]
logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
logger.info(s"Sending message : {}", ppofaq)
logger.info("KafkaProducer Send first fetching Partitions for topics")
val kafkaProducer = producerSettings.createKafkaProducer()
kafkaProducer.partitionsFor("asp.adds.ppo.pems")
val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
val recordMetaData : RecordMetadata = done1.get()
logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset())
logger.info("KafkaProdcuer Send first fetching Partitions for topics end")
val done = Source.single(ppofaq)
.map { elem =>
new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
}
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(s) => {
logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
}
case Failure(e) => {
logger.error("Erorr occured while producing Topic", e)
e.printStackTrace()
e.fillInStackTrace()
e.getCause
e.getMessage
}
}
}
Dies ist Kafka_Client Conf Datei-I für die Kerberos-Authentifizierung bin mit:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
serviceName="zookeeper"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true;
};
Dies ist das Anwendungsprotokoll ich immer bin, wenn ich mein Glas auf Cluster ausgeführt: Anwendungsprotokolle:
[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/java/packages/lib/ext/*
14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected]
14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
configparser: Reading next config entry: KafkaClient
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
configparser: client=true
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=kafka
configparser: Reading next config entry: Client
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=zookeeper
Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is [email protected]
Will use keytab
[LoginContext]: login success
Commit Succeeded
[LoginContext]: commit success
14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.
Bitte lassen Sie mich wissen, wenn ia m etwas falsch machen. Danke, Mahendra Tonape