Da ich neu bei Kafka bin, kann ich die Datensätze aus der Datei lesen und diese Nachricht über den Producer an Kafka senden, aber nicht das gleiche Thema über den Konsumenten konsumieren.Kafka-Thema mit kafka consumer nicht lesen?
Hinweis: Sie können die Daten aus einer beliebigen Textdatei lesen und ich habe
kafka_2.11-0.9.0.0 Version verwendet Hier ist mein Code:
package implementation;
import java.io.BufferedReader;
//import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
//import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
//import java.io.OutputStreamWriter;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.log4j.BasicConfigurator;
import kafka.producer.ProducerConfig;
public class File_to_Kafka extends ProducerConfig{
Properties configProperties;
public File_to_Kafka(Properties originalProps) {
super(originalProps);
configProperties = originalProps;
// TODO Auto-generated constructor stub
}
public String topicName = "temp"+Math.random();
public String groupId = UUID.randomUUID().toString();
public void producerKafka(Properties configProperties) throws IOException
{
FileInputStream fis = new FileInputStream("/home/nick/Desktop/Database-Kafka-ElasticSearch/src/main/java/resources/properties.xml");
configProperties.load(fis);
System.out.println(configProperties);
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(configProperties);
File f1 = new File("/home/niket/Desktop/sample-example.txt");
FileInputStream fis1 = new FileInputStream(f1);
BufferedReader br1 = new BufferedReader(new InputStreamReader(fis1));
String str = br1.readLine();
//while(br1.readLine()!=null)
while(str != null)
{
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, str);
producer.send(rec);
str = br1.readLine();
}
br1.close();
fis.close();
fis1.close();
producer.close();
}
public void consumerKafka() throws InterruptedException
{
ConsumerThread consumerRunnable = new ConsumerThread(topicName, groupId);
consumerRunnable.start();
Thread.sleep(100);
consumerRunnable.getKafkaConsumer().wakeup();
System.out.println("Stopping consumer .....");
consumerRunnable.join();
}
private static class ConsumerThread extends Thread{
private String topicName;
private String groupId;
private KafkaConsumer<String, String> kafkaConsumer;
public ConsumerThread(String topicName, String groupId2) {
super();
this.topicName = topicName;
this.groupId = groupId2;
}
public void run()
{
Properties configProperties = new Properties();
configProperties.put("bootstrap.servers","localhost:9092");
configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put("group.id", groupId);
configProperties.put("CLIENT_ID_CONFIG", "simple");
//Figure out where to tart processing messages from
kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
kafkaConsumer.subscribe(Arrays.asList(topicName));
int count=0;
//Start Processing Messages
try {
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
count = 0;
for (ConsumerRecord<String, String> record : records)
{
System.out.println(record.value());
count++;
}
kafkaConsumer.commitAsync();
if(count==records.count())
break;
}
}
catch (WakeupException e) {
// TODO: handle exception
System.out.println("Exception caught : "+ e.getMessage());
}
finally {
kafkaConsumer.close();
System.out.println("After Closing KafkaConsumer");
}
}
public KafkaConsumer<String,String> getKafkaConsumer(){
return this.kafkaConsumer;
}
}
public static void main(String [] args) throws IOException, InterruptedException
{
BasicConfigurator.configure();
Properties configProperties = new Properties();
configProperties.put("bootstrap.servers", "localhost:9092");
configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
configProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configProperties.put("metadata.broker.list", "localhost:9092");
File_to_Kafka obj = new File_to_Kafka(configProperties);
obj.producerKafka(configProperties);
obj.consumerKafka();
}
}
Hier ist der Ausgang:
0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties
61 [main] WARN kafka.utils.VerifiableProperties - Property bootstrap.servers is not valid
62 [main] WARN kafka.utils.VerifiableProperties - Property key.serializer is not valid
62 [main] INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092
62 [main] WARN kafka.utils.VerifiableProperties - Property value.serializer is not valid
{<name>BOOTSTRAP_SERVERS_CONFIG=(bootstrap.servers)</name>, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, <value>org.apache.kafka.common.serialization.StringSerializer</value>=, <name>KEY_SERIALIZER_CLASS_CONFIG</name>=, metadata.broker.list=localhost:9092, <configuration>=, <?xml=version="1.0"?>, <name>KEY_DESERIALIZER_CLASS_CONFIG</name>=, <property>=, <value>org.apache.kafka.common.serialization.StringDeserializer</value>=, <value>localhost=9092</value>, bootstrap.servers=localhost:9092, <name>VALUE_DESERIALIZER_CLASS_CONFIG</name>=, <value>org.apache.kafka.common.serialization.ByteArraySerializer</value>=, </property>=, value.serializer=org.apache.kafka.common.serialization.StringSerializer, </configuration>=, <name>VALUE_SERIALIZER_CLASS_CONFIG</name>=}
86 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
max.block.ms = 60000
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
ssl.protocol = TLS
request.timeout.ms = 30000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
acks = 1
batch.size = 16384
ssl.keystore.location = null
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
retries = 0
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
send.buffer.bytes = 131072
linger.ms = 0
93 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time
96 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records
99 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = [])
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-producer-1
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-producer-1
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-producer-1
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-producer-1
118 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-producer-1
122 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max
126 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>localhost = 9092</value> was supplied but isn't a known config.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </configuration> = was supplied but isn't a known config.
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <property> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringDeserializer</value> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringSerializer</value> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <configuration> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.ByteArraySerializer</value> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>BOOTSTRAP_SERVERS_CONFIG = (bootstrap.servers)</name> was supplied but isn't a known config.
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </property> = was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration metadata.broker.list = localhost:9092 was supplied but isn't a known config.
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <?xml = version="1.0"?> was supplied but isn't a known config.
130 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
131 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
131 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092.
254 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470088, sendTimeMs=0) to node -1
502 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {temp0.8655521798253616=LEADER_NOT_AVAILABLE}
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [])
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Adding node 0 to nodes ever seen
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node 0 for sending metadata request
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at niket-Lenovo-Y50-70:9092.
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 0
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470433, sendTimeMs=0) to node 0
611 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 3 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [Partition(topic = temp0.8655521798253616, partition = 0, leader = 0, replicas = [0,], isr = [0,]])
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.records-per-batch
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.bytes
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.compression-rate
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-retries
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-errors
646 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
647 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-producer-1
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-producer-1
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed.
669 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The Kafka producer has closed.
674 [Thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = 2a4549ce-0e9d-4a66-9573-c5b4c47b3b34
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [localhost:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
675 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer
675 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = [])
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-consumer-1
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-consumer-1
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-consumer-1
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency
684 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency
685 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
688 [Thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration CLIENT_ID_CONFIG = simple was supplied but isn't a known config.
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): temp0.8655521798253616
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker -1
690 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092.
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
691 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
Stopping consumer .....
Exception caught : null
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-consumer-1
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-consumer-1
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
774 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - The Kafka consumer has closed.
After Closing KafkaConsumer
Der gesamte Ausgang wurde hervorgehoben. Machen Sie es für die Menschen leicht verständlich. –