2017-01-03 1 views
1

Ich verwende konfluente kafka Connector 3.0.1 version.Ich erstelle eine neue Gruppe mit dem Namen new-group und es gibt etwa 20 Themen auf it.most dieser Themen beschäftigt ist.Aber es ist schade, dass, wenn ich das Connector-Framework starten, kann das System nicht neu zu balancieren, etwa 2 Minuten eine Neuverteilung für alle Themen. Ich kenne den Grund nicht. Einige der Fehlermeldung lautet:Konfluente Kafka Connector- kann nicht stoppen Rebalancing

[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180) 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) 
     at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) 
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) 
     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283) 
     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212) 
     at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) 
     at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
: 

Ich weiß nicht, ob es etwas mit der kontinuierlichen Neuverteilung zu tun hat.

Ich weiß, dass, wenn die KafkaConsumer.poll() ist länger als die konfigurierte Zeitüberschreitung, der Kafka wird die Partition widerrufen und damit die Rebalance ausgelöst wird, aber ich bin mir ziemlich sicher, dass die Umfrage jedes Mal nicht so lange ist . Jeder kann mir ein paar Hinweise geben?

Antwort

1

Ich denke, max.poll.records kann dies lösen.Es ist, die Anzahl der Datensätze, die auf jeder Schleife Iteration behandelt werden müssen stimmen. In 0.10 gibt es max.poll.records, die eine Obergrenze für die Anzahl der von jedem Aufruf zurückgegebenen Datensätze setzt.

Auch wie Confluent sollte consumer.poll() ziemlich hohe Sitzung Zeitüberschreitung für beispielsweise 30 bis 60 Sekunden haben.

Sie könnten auch zur Feinabstimmung wollen:

session.timeout.ms 
heartbeat.interval.ms 
max.partition.fetch.bytes 
+0

Ja geändert worden ist, wenn ich zu viel nehmen Zeit, um das Umfrageergebnis zu hdfs, dann eine Neugewichtung kommen.Ich habe meinen Code optimiert und rebalance selten geworden. – wuchang

0

Betrachten auf 0.10.1 oder höher aktualisieren, weil der Verbraucher in diesen Versionen verbessert wurde besser längere Zeiträume zwischen den Anrufen zu behandeln abzufragen().

Sie können den neuen Parameter max.poll.interval.ms erhöhen, wenn Sie mehr als 5 Minuten benötigen, um die Ergebnisse in HDFS zu speichern. Dadurch wird verhindert, dass Ihr Kunde aus der Verbrauchergruppe ausgeschlossen wird, weil er keine Fortschritte macht.

Im 0.10.1 Release Notes heißt es

Die neue Java Consumer unterstützt jetzt heartbeating von einem Hintergrund Faden. Es gibt eine neue Konfiguration max.poll.interval.ms, die steuert die maximale Zeit zwischen Aufrufaufrufen, bevor der Verbraucher die Gruppe proaktiv verlässt (standardmäßig 5 Minuten). Der Wert der Konfiguration request.timeout.ms muss immer größer als max.poll.interval.ms sein, da dies die maximale Zeit ist, die eine JoinGroup Anfrage auf dem Server blockieren kann, während der Verbraucher neu ausgleicht, also wir haben änderte seinen Standardwert auf knapp über 5 Minuten. Schließlich hat der Standardwert von session.timeout.ms wurde auf 10 Sekunden eingestellt nach unten, und der Standardwert von max.poll.records hat 500.

Verwandte Themen