0

Ich habe ein Problem der Erfassung von Daten in MySQL mit Debezium ändern Datenerfassung und konsumieren es auf eine andere MySQL mit Kafka Connect JDBC Senke.Wie man Daten in mysql mit Debezium Datenerfassung erfassen und konsumieren mit JDBC Senke in Kafka verbinden?

Da das Schema und die Nutzlast, die Debezium zu Kafka-Thema produziert, nicht mit dem Schema kompatibel ist, das Kafka Connect JDBC Senke erwartet.

Ich bekomme eine Ausnahme, wenn jdbc Senke Daten konsumieren und Datensätze in einem anderen MySQL erstellen möchte.

Wie soll ich dieses Problem lösen?

Antwort

5

Die Nachrichtenstruktur, die von Debezium erzeugt wird, ist tatsächlich anders als die von der JDBC-Senke erwartet. Die JDBC-Senke erwartet, dass jedes Feld in der Nachricht einem Feld in der Zeile entspricht, und daher entspricht die Nachricht dem Zustand "nach" der Zeile. OTOH, die Debezium MySQL connector führt Change Data Capture durch, was bedeutet, dass es mehr als nur den letzten Stand der Zeile enthält. Insbesondere gibt die Verbinder Nachrichten mit einer Taste, um die Reihe der primären oder einzigartige Schlüsselspalten enthält, und ein Nachrichtenwert mit einem Umschlag Struktur:

  • der Operation, wie etwa, ob es sich um eine Einlage, zu aktualisieren oder löschen
  • der Zustand der Zeile vor die Änderung (null auf Einsätze)
  • den Zustand der Zeile nach aufgetreten die Änderung (null auf Löschungen)
  • quellenspezifischen Informationen, einschließlich Server-Metadaten aufgetreten ist, das Transaktions ID, da TAbase und Tabellennamen, Server-Zeitstempel, wenn das Ereignis aufgetreten ist, und Details über, wo das Ereignis gefunden wurde, usw.
  • Zeitstempel, an dem der Anschluss der Veranstaltung

Die einfachste Art und Weise erzeugte diese Diskrepanz zu lösen, ist die Verwendung Kafka 0.10.2.x (derzeit die neueste Version ist 0.10.2.1) und Kafka Connect neue Single Message Transforms (SMTs). Jeder Kafka Connect-Konnektor kann mit Ketten von null oder mehr SMTs konfiguriert werden, die die Ausgabe von Quellkonnektoren vor dem Schreiben der Nachrichten in Kafka transformieren oder die von Kafka gelesenen Nachrichten transformieren können, bevor sie als Eingabe an die Sink-Konnektoren übergeben werden. SMTs sind absichtlich sehr einfach, befassen sich mit einer einzelnen Nachricht und sollten definitiv nicht auf externe Ressourcen zugreifen oder irgendeinen Zustand beibehalten und daher keinen Ersatz für Kafka Streams oder andere Streamverarbeitungssysteme, die weitaus leistungsfähiger sind, können mehrere Eingabeströme verbinden und können Führen Sie sehr komplexe Operationen aus und verwalten Sie den Status über mehrere Nachrichten hinweg.

Wenn Sie Kafka-Streams für die Verarbeitung verwenden, sollten Sie die Nachrichtenstruktur in Ihrer Kafka Streams-Anwendung ändern. Wenn nicht, dann sind SMTs eine gute Möglichkeit, Ihr Problem zu lösen. Tatsächlich gibt es zwei Möglichkeiten, SMTs zum Anpassen der Nachrichtenstruktur zu verwenden.

Die erste Option besteht darin, ein SMT mit dem Debezium-Konnektor zu verwenden, um den "nach" -Zustand der Zeile zu extrahieren/beizubehalten und alle anderen Informationen zu verwerfen, bevor sie an Kafka geschrieben werden. Natürlich würden Sie weniger Informationen in den Kafka-Themen speichern und einige der CDC-Informationen wegwerfen, die in der Zukunft wertvoll sein könnten.Die zweite und IMO bevorzugte Option besteht darin, den Quell-Connector wie er ist zu belassen und alle CDC-Nachrichten in den Kafka-Topics zu behalten, aber dann ein SMT mit dem Sink-Connector zu verwenden, um das "Nachher" zu extrahieren/zu behalten. Status der Zeile und verwerfen alle anderen Informationen, bevor die Nachricht an den JDBC-Senke-Connector übergeben wird. Möglicherweise können Sie eine der in Kafka Connect vorhandenen SMTs verwenden, aber Sie können erwägen, Ihre eigene SMT zu schreiben, um genau das zu tun, was Sie wollen.

+0

Vielen Dank Randall für eine gute Antwort. –