Ich versuche, 2 kafka Themen mit JDBC-Senke-Connector zu lesen und in 2 Oracle-Tabellen upsert, die ich manuell erstellt. Jede Tabelle hat einen Primärschlüssel, den ich im Upsert-Modus verwenden möchte. Connector funktioniert gut, wenn ich nur für 1 Thema und nur 1 Feld in pk.fields
, aber wenn ich mehrere Spalten in pk.fields
eine aus jeder Tabelle eingeben, erkennt es das Schema nicht. Vermisse ich etwas, bitte schlage vor.JDBC Sink Connector -upsert in mehrere Tabellen von mehreren Themen mit kafka-connect
name=oracle_sink_prod
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=KAFKA1011,JAFKA1011
connection.url=URL
connection.user=UID
connection.password=PASSWD
auto.create=false
table.name.format=KAFKA1011,JAFKA1011
pk.mode=record_value
pk.fields= ID,COMPANY
auto.evolve=true
insert.mode=upsert
//ID is pk of kafka1011 table and COMPANY is of other
Vielen Dank Robin aber in Zukunft wird die Anzahl der Themen weiter steigen bis 100 und 100 Verbindungen verwalten, ist es eine gute Option? Auch wir planen, unsere Datenpipeline in Echtzeit mit kafka connect zu machen, da unsere Quellen mehrere Oracle-Datenbanken und Sinks sein werden, ein Oracle-Dataware-Haus, wird es schwierig sein, so viele Connectors zu verwalten? –
Ich habe meine Antwort aktualisiert - Sie können auch die Option "record_key" verwenden und sicherstellen, dass Ihre Kafka-Nachrichten entsprechend codiert sind. Welche Option verwenden Sie, um die Daten aus Oracle herauszuziehen? Tools wie GoldenGate können Sie den Schlüssel angeben, können Sie auch verwenden [SMT] (https://www.confluent.io/blog/simplet-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/) In Connect selbst, um die Schlüssel zu definieren –