2017-06-12 6 views
0

Ich schreibe akka, Kafka Producer in scala, ich versuche, Nachrichten an kafka broker von scala kafka client zu senden und der Broker bekommt diese Nachricht nicht, ich habe es verifiziert Starten von Kafka Consumer über die Befehlszeile. Kafka Producer und Consumer funktionieren gut über die Eingabeaufforderung. Kafka ist Kerberos und SASL_PlainText Sicherheit aktiviert.Senden von Daten an Kerberos Enabled Kafka-Cluster von scala Client

Hier finden Sie meine conf-Datei, Client-Code und Anwendungsprotokolle. Ich denke, es muss ein Problem bei der Verbindung mit Kerberos aus dem Code sein.

Scala Auftraggeber:

package com.ABC.adds.producer 

import akka.actor.ActorSystem 
import akka.kafka.ProducerSettings 
import akka.kafka.scaladsl.Producer 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import com.ABC.adds.models.Models.GMMOfaq 
import com.ABC.adds.producer.serializer.ModelSerializer 
import com.thoughtworks.xstream.XStream 
import com.thoughtworks.xstream.io.xml.DomDriver 
import com.typesafe.config.ConfigFactory 
import com.typesafe.scalalogging.LazyLogging 
import org.apache.kafka.clients.CommonClientConfigs 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} 
import org.apache.kafka.common.serialization.ByteArraySerializer 

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 

object faqProducer extends App with LazyLogging{ 

    val config = ConfigFactory.load() 
    implicit val system = ActorSystem.create("adds-faq-producer", config) 
    implicit val mat = ActorMaterializer() 

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq])) 
    .withBootstrapServers("jbt12324.systems.pfk.ABC:3382") 
    .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") 
     .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234") 
     .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1") 

    val xstream = new XStream(new DomDriver) 
    val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString 
    xstream.alias("faq", classOf[PPOfaq]) 
    val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq] 

    logger.info(s"Producer Configuration is : {} ", producerSettings.toString) 
    logger.info(s"Sending message : {}", ppofaq) 

    logger.info("KafkaProducer Send first fetching Partitions for topics") 
    val kafkaProducer = producerSettings.createKafkaProducer() 
    kafkaProducer.partitionsFor("asp.adds.ppo.pems") 
    val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)) 
    val recordMetaData : RecordMetadata = done1.get() 

    logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset()) 

    logger.info("KafkaProdcuer Send first fetching Partitions for topics end") 

    val done = Source.single(ppofaq) 
    .map { elem => 
     new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

    done onComplete { 
    case Success(s) => { 
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!") 
    } 
    case Failure(e) => { 
    logger.error("Erorr occured while producing Topic", e) 
    e.printStackTrace() 
    e.fillInStackTrace() 
    e.getCause 
    e.getMessage 
    } 
} 
} 

Dies ist Kafka_Client Conf Datei-I für die Kerberos-Authentifizierung bin mit:

KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
doNotPrompt=true 
useTicketCache=false 
useKeyTab=true 
serviceName="kafka" 
principal="[email protected]" 
keyTab="/home/pqr/.pqr.headless.keytab" 
debug=true 
client=true; 
}; 
Client { 
    com.sun.security.auth.module.Krb5LoginModule required 
    doNotPrompt=true 
    useKeyTab=true 
    useTicketCache=false 
    serviceName="zookeeper" 
    principal="[email protected]" 
    keyTab="/home/pqr/.pqr.headless.keytab" 
    debug=true; 
}; 

Dies ist das Anwendungsprotokoll ich immer bin, wenn ich mein Glas auf Cluster ausgeführt: Anwendungsprotokolle:

[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/* 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/java/packages/lib/ext/* 
    14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected] 
    14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS)) 
    14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics 
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf 
    configparser: Reading next config entry: KafkaClient 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab 
    configparser:     client=true 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=kafka 
    configparser: Reading next config entry: Client 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=zookeeper 
    Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false 
    principal is [email protected] 
    Will use keytab 
      [LoginContext]: login success 
    Commit Succeeded 

      [LoginContext]: commit success 
    14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms. 

Bitte lassen Sie mich wissen, wenn ia m etwas falsch machen. Danke, Mahendra Tonape

Antwort

0

Wir konnten Nachricht von Verbraucherseite in unserem Cluster konsumieren, aber wir können Nachrichten an unserer lokalen Maschine verbrauchen, das ist so, weil wir unsere Anwendung api mit Kafka 0,10 und unserem Cluster mit schrieben kafka Version 0.9.If Sie Unterschiede zwischen diesen 2 Kafka-Version überprüft werden Sie feststellen, dass es erhebliche Unterschiede zwischen diesen 2-Version API.

Aktivieren Sie außerdem Kerberos-Debug-Protokolle, um zu überprüfen, ob der Benutzer mit dem Kerberos-aktivierten Cluster

authentifiziert wurde