0

Entschuldigung, wenn Titel zu vage ist, aber ich hatte Schwierigkeiten, es richtig zu formulieren.Verbinden von Streaming-Daten in Apache Spark

Also im Grunde versuche ich herauszufinden, ob Apache Spark zusammen mit Apache Kafka Daten aus meiner relationalen Datenbank zu Elasticsearch synchronisieren kann.

Mein Plan ist, einen der Kafka-Konnektoren zu verwenden, um Daten aus RDBMS zu lesen und in Kafka-Themen zu schieben. Das wäre die ERD des Modells und DDL. Ganz einfach, Report und Product Tabellen, die viele-zu-viele-Beziehung haben, die in ReportProduct Tabelle vorhanden ist: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 

Mein Ziel ist es, mit der folgenden Struktur in dem Dokument zu transformieren:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

Ich war in der Lage, eine solche Struktur mit statischen Daten zu bilden, die ich lokal ausgeheckt habe:

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 

Aber ich merke, dass dies zu einfach ist und Ströme werden viel komplizierter.

Daten ändern sich unregelmäßig, Berichte und ihre Produkte werden ständig geändert, Produkte werden hin und wieder (meist wöchentlich) geändert.

Ich möchte alle Arten von Änderungen in Elasticsearch replizieren, die in einer dieser Tabellen aufgetreten sind.

Antwort

1
  1. Kafka Connect die Daten von der Quelle DB zu ziehen - Sie JDBC Source verwenden können, die im Rahmen von Confluent Platform verfügbar ist (oder separately) und können auch kafka-connect-cdc-mssql

  2. Sobald Sie‘untersuchen Haben Sie die Daten in Kafka, verwenden Sie entweder die Kafka Streams API, um die Daten wie gewünscht zu manipulieren, oder sehen Sie sich die neu veröffentlichte KSQL an. Was Sie wählen, hängt von Dingen ab wie Ihrer Präferenz für das Kodieren in Java (mit Kafka Streams) oder dem Manipulieren von Daten in einer SQL-ähnlichen Umgebung (mit KSQL). Ungeachtet dessen wird die Ausgabe von beiden ein anderes Kafka-Thema sein.

  3. Schließlich strömen den Kafka Thema von oben in Elasticsearch den Elasticsearch Kafka Connect Plugin (erhältlich here, oder als Teil der Confluent Platform)

+0

Das ist wirklich gut klingt. Aus den Recherchen, die ich zuvor gemacht habe, lässt Kafka dich nicht auf Nicht-Partitions-Schlüssel kommen, was für mich der Fall sein könnte. Erkennt KSQL das? –

+0

Sie können einfach mit KSQL neu partitionieren, was dieses Problem umgehen könnte. Ich habe es nicht versucht. –