2017-08-24 2 views
0

So habe ich versucht, einen PubSub Kafka-Connector für etwa einen Monat jetzt mit verschiedenen Problemen zu laufen. Ich habe hier viele Fragen über Kafka Connect und den Pubsub Connector gelesen, die mir geholfen haben, weit zu kommen, aber ich stecke wieder fest. Wenn ich diesen Befehl ausführen:Laufende Pubsub Kafka Connector Standalone-Modus Probleme

.\bin\windows\connect-standalone.bat 
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties 

ich eine lange Liste von Fehlern here verbunden:

Direkt nachdem der Rest-Server zu starten versucht, ist, wenn die Fehler „konnte nicht Scan-Datei [Dateiname]. .." Anfang. Ich bin nicht sicher, ob ich die rest.host.name und rest.port einstellen müssen, weil zur Zeit, für die standaloneConfig Werte, liest es

rest.host.name = null 

Edit: Nach einer Weile die Protokolldatei überprüft, fand ich die folgenden Meldungen :

Kafka consumer created 
Created connector CPSConnector 
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub} 
Task CPSConnector-0 threw an uncaught and unrecoverable exception 
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics. 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Edit: Also durch Zugabe topics=test in meinem configSink ich das obige Problem behoben. Die aktuelle Fehlermeldung ist unten. Bedeutet dies, dass Sie nur einen Sink-Connector oder Source-Connector ausführen können?

Failed to create job for .\etc\kafka\configSource.properties 
Stopping after connector error 
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists 
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80) 
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67) 
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97) 
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists 
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145) 
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94) 

In meinem WorkerConfig.properites habe ich bootstrap.servers=localhost:2181. Meine Property-Dateien sind here.

Ich bin nicht sicher, wie zu beheben, da ich meine Eigenschaften Dateien festgelegt habe, stellen Sie sicher, dass die cps-kakfa-connector.jar im Klassenpfad ist. Ich setze auch plugin.path=\share\java\kafka\kafka-connect-pubsub.

Wenn mir jemand in die richtige Richtung zeigen kann, um dieses Problem zu beheben, wäre das großartig. Ich folgte den Anweisungen hier: https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector

+0

A ist screen eine schlechte Art und Weise die Log-Datei :) zu teilen Bitte können Sie es schreiben, entweder die wesentlichen Nachrichten inline hier, oder als gist.github.com (oder ähnlich), zu dem Sie den Link bereitstellen. Können Sie auch die Connector-Konfigurationsdatei freigeben? –

+0

Vielen Dank für das Feedback! Ich werde jetzt die Änderungen vornehmen. Es fällt mir schwer zu bestimmen, welche Botschaften wichtig sind, aber ich werde einige, die ich denke, wichtig sein. –

+0

Ignorieren Sie die Protokolldateien "Datei konnte nicht gescannt werden". –

Antwort

2

Jede Connector-Instanz, ob Quelle oder Senke, muss einen eindeutigen Namen haben, wenn Sie ihre Konfigurationseigenschaften an einen Kafka Connect-Cluster oder einen eigenständigen Mitarbeiter senden.

In dem obigen Beispiel benennen Sie Ihre Quelle anders als Ihre Sink. Zum Beispiel:

$ head -n 1 configSource.properties 
name=CPSSourceConnector 
$ head -n 1 configSink.properties 
name=CPSSinkConnector 

oder, könnte genauso gut:

$ head -n 1 configSource.properties 
name=Tom 
$ head -n 1 configSink.properties 
name=Jerry 
+0

Vielen Dank! Das ist so einfach, dass ich es überschaue. Ich bin jetzt in der Lage, die Anschlüsse auszuführen. –

Verwandte Themen