2017-12-21 3 views
1

Da ich neu bei Kafka bin, kann ich die Datensätze aus der Datei lesen und diese Nachricht über den Producer an Kafka senden, aber nicht das gleiche Thema über den Konsumenten konsumieren.Kafka-Thema mit kafka consumer nicht lesen?

Hinweis: Sie können die Daten aus einer beliebigen Textdatei lesen und ich habe

kafka_2.11-0.9.0.0 Version verwendet Hier ist mein Code:

package implementation; 
import java.io.BufferedReader; 
//import java.io.BufferedWriter; 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.FileNotFoundException; 
//import java.io.FileOutputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.util.Arrays; 
//import java.io.OutputStreamWriter; 
import java.util.Properties; 
import java.util.Random; 
import java.util.UUID; 

import org.apache.kafka.clients.consumer.Consumer; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.errors.WakeupException; 
import org.apache.log4j.BasicConfigurator; 

import kafka.producer.ProducerConfig; 

public class File_to_Kafka extends ProducerConfig{ 

    Properties configProperties; 
    public File_to_Kafka(Properties originalProps) { 
     super(originalProps); 
     configProperties = originalProps; 
     // TODO Auto-generated constructor stub 
    } 

    public String topicName = "temp"+Math.random(); 

    public String groupId = UUID.randomUUID().toString();   



     public void producerKafka(Properties configProperties) throws IOException 
     { 


      FileInputStream fis = new FileInputStream("/home/nick/Desktop/Database-Kafka-ElasticSearch/src/main/java/resources/properties.xml"); 

      configProperties.load(fis); 
      System.out.println(configProperties); 
      org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(configProperties); 

      File f1 = new File("/home/niket/Desktop/sample-example.txt"); 
      FileInputStream fis1 = new FileInputStream(f1); 
      BufferedReader br1 = new BufferedReader(new InputStreamReader(fis1)); 
      String str = br1.readLine(); 
      //while(br1.readLine()!=null) 
      while(str != null) 
      { 
       ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, str); 
       producer.send(rec); 
       str = br1.readLine(); 
      } 

      br1.close(); 
      fis.close(); 
      fis1.close(); 
      producer.close(); 

     } 

     public void consumerKafka() throws InterruptedException 
     { 
      ConsumerThread consumerRunnable = new ConsumerThread(topicName, groupId); 
      consumerRunnable.start(); 
      Thread.sleep(100); 
      consumerRunnable.getKafkaConsumer().wakeup(); 
      System.out.println("Stopping consumer ....."); 
      consumerRunnable.join(); 
     } 

     private static class ConsumerThread extends Thread{ 

      private String topicName; 
      private String groupId; 
      private KafkaConsumer<String, String> kafkaConsumer; 


      public ConsumerThread(String topicName, String groupId2) { 
       super(); 
       this.topicName = topicName; 
       this.groupId = groupId2; 
      } 

      public void run() 
      { 
       Properties configProperties = new Properties(); 
       configProperties.put("bootstrap.servers","localhost:9092"); 
       configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
       configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
       configProperties.put("group.id", groupId); 
       configProperties.put("CLIENT_ID_CONFIG", "simple"); 

       //Figure out where to tart processing messages from 
       kafkaConsumer = new KafkaConsumer<String, String>(configProperties); 
       kafkaConsumer.subscribe(Arrays.asList(topicName)); 
       int count=0; 



       //Start Processing Messages 
       try { 
         while(true) { 
          ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
          count = 0; 
          for (ConsumerRecord<String, String> record : records) 
          { 
            System.out.println(record.value()); 
            count++; 
          } 
          kafkaConsumer.commitAsync(); 
          if(count==records.count()) 
           break; 
         } 

        } 
       catch (WakeupException e) { 
        // TODO: handle exception 
        System.out.println("Exception caught : "+ e.getMessage()); 
       } 
       finally { 
        kafkaConsumer.close(); 
        System.out.println("After Closing KafkaConsumer"); 
       } 

      } 
      public KafkaConsumer<String,String> getKafkaConsumer(){ 
       return this.kafkaConsumer; 
       } 


     } 

     public static void main(String [] args) throws IOException, InterruptedException 
     { 
      BasicConfigurator.configure(); 
      Properties configProperties = new Properties(); 
      configProperties.put("bootstrap.servers", "localhost:9092"); 
      configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
      configProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      configProperties.put("metadata.broker.list", "localhost:9092"); 
      File_to_Kafka obj = new File_to_Kafka(configProperties); 
      obj.producerKafka(configProperties); 
      obj.consumerKafka(); 

     } 

} 

Hier ist der Ausgang:

0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties 
61 [main] WARN kafka.utils.VerifiableProperties - Property bootstrap.servers is not valid 
62 [main] WARN kafka.utils.VerifiableProperties - Property key.serializer is not valid 
62 [main] INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092 
62 [main] WARN kafka.utils.VerifiableProperties - Property value.serializer is not valid 
{<name>BOOTSTRAP_SERVERS_CONFIG=(bootstrap.servers)</name>, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, <value>org.apache.kafka.common.serialization.StringSerializer</value>=, <name>KEY_SERIALIZER_CLASS_CONFIG</name>=, metadata.broker.list=localhost:9092, <configuration>=, <?xml=version="1.0"?>, <name>KEY_DESERIALIZER_CLASS_CONFIG</name>=, <property>=, <value>org.apache.kafka.common.serialization.StringDeserializer</value>=, <value>localhost=9092</value>, bootstrap.servers=localhost:9092, <name>VALUE_DESERIALIZER_CLASS_CONFIG</name>=, <value>org.apache.kafka.common.serialization.ByteArraySerializer</value>=, </property>=, value.serializer=org.apache.kafka.common.serialization.StringSerializer, </configuration>=, <name>VALUE_SERIALIZER_CLASS_CONFIG</name>=} 
86 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    compression.type = none 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    metadata.fetch.timeout.ms = 60000 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    buffer.memory = 33554432 
    timeout.ms = 30000 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    block.on.buffer.full = false 
    ssl.key.password = null 
    max.block.ms = 60000 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    max.in.flight.requests.per.connection = 5 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    ssl.protocol = TLS 
    request.timeout.ms = 30000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    acks = 1 
    batch.size = 16384 
    ssl.keystore.location = null 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    retries = 0 
    max.request.size = 1048576 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    send.buffer.bytes = 131072 
    linger.ms = 0 

93 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time 
96 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records 
99 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = []) 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-producer-1 
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-producer-1 
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-producer-1 
118 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-producer-1 
122 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time 
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time 
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max 
126 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>localhost = 9092</value> was supplied but isn't a known config. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </configuration> = was supplied but isn't a known config. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <property> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringDeserializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringSerializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <configuration> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.ByteArraySerializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>BOOTSTRAP_SERVERS_CONFIG = (bootstrap.servers)</name> was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </property> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration metadata.broker.list = localhost:9092 was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <?xml = version="1.0"?> was supplied but isn't a known config. 
130 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0 
131 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a 
131 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started 
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092. 
254 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1 
267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470088, sendTimeMs=0) to node -1 
502 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {temp0.8655521798253616=LEADER_NOT_AVAILABLE} 
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = []) 
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Adding node 0 to nodes ever seen 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node 0 for sending metadata request 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at niket-Lenovo-Y50-70:9092. 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 0 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470433, sendTimeMs=0) to node 0 
611 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 3 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [Partition(topic = temp0.8655521798253616, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.records-per-batch 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.bytes 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.compression-rate 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-retries 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-errors 
646 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
647 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records. 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed. 
669 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The Kafka producer has closed. 
674 [Thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    group.id = 2a4549ce-0e9d-4a66-9573-c5b4c47b3b34 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    max.partition.fetch.bytes = 1048576 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    enable.auto.commit = true 
    ssl.key.password = null 
    fetch.max.wait.ms = 500 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    session.timeout.ms = 30000 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    ssl.protocol = TLS 
    check.crcs = true 
    request.timeout.ms = 40000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.keystore.location = null 
    heartbeat.interval.ms = 3000 
    auto.commit.interval.ms = 5000 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    fetch.min.bytes = 1024 
    send.buffer.bytes = 131072 
    auto.offset.reset = latest 

675 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer 
675 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = []) 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-consumer-1 
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency 
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency 
684 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency 
685 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time 
688 [Thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration CLIENT_ID_CONFIG = simple was supplied but isn't a known config. 
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0 
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): temp0.8655521798253616 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker -1 
690 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092. 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 
691 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1 
Stopping consumer ..... 
Exception caught : null 
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-consumer-1 
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 
774 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - The Kafka consumer has closed. 
After Closing KafkaConsumer 
+1

Der gesamte Ausgang wurde hervorgehoben. Machen Sie es für die Menschen leicht verständlich. –

Antwort

0

Scheint, dass es ein Problem mit Ihrer Hauptmethode gibt. Es sieht so aus, als ob dein Flow während des Loopings im Producer hängen bleibt.

Können Sie versuchen, den Verbraucher in separaten Hauptklasse zu starten, sollten Sie in der Lage sein, Consumer Records zu sehen.

+0

Immer noch nicht in der Lage, die Nachrichten von Kafka Thema zu konsumieren. – nick

+0

Können Sie Debug-Protokolle in Consumer-While-Schleife hinzufügen und das gesamte Protokoll freigeben? –

+0

Nach dem Hinzufügen von Debug-Logs habe ich org.apache.kafka.common.errors.WakeupException – nick