2017-04-10 8 views
0

startet ich ein Source Verbraucher Aufzeichnungen mit Reactive Kafka wie folgt erstellen:Wie Kafka Quelle wieder zu machen, wenn Kafka

val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer) 
.withBootstrapServers(bootstrapServers) 
.withGroupId(groupName) 
// what offset to begin with if there's no offset for this group 
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
// do we want to automatically commit offsets? 
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") 
// auto-commit offsets every 1 minute, in the background 
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000") 
// reconnect every 1 second, when disconnected 
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000") 
// every session lasts 30 seconds 
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") 
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms 
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000") 
// how many records to fetch in each poll() 
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100") 

Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value) 

Ich habe 1 Instanz von Kafka auf meinem lokalen Rechner ausgeführt wird. Ich drehe über den Konsolenhersteller Werte in das Thema und sehe sie ausgedruckt. Dann töte ich Kafka und starte es neu, um zu sehen, ob die Quelle sich wieder verbindet.

Diese sind wie meine Protokolle gehen:

* Connection with /192.168.0.1 disconnected 
    java.net.ConnectException: Connection refused 
* Give up sending metadata request since no node is available 
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 
* Resuming partition test-events-0 
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR} 
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0 
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null) 
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds 
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR} 
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup 
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available. 
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata 
* Initiating API versions fetch from node 2147483647 
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group. 
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup 
* The Kafka consumer has closed. 

Wie kann ich sicherstellen, dass diese Quelle wieder verbindet und die Protokolle geht die Verarbeitung?

Antwort

1

Ich denke, Sie müssen mindestens 2 Makler haben. Wenn einer fehlschlägt, kann der andere die Arbeit machen, und Sie könnten den anderen neu starten.