Entschuldigung, wenn Titel zu vage ist, aber ich hatte Schwierigkeiten, es richtig zu formulieren.Verbinden von Streaming-Daten in Apache Spark
Also im Grunde versuche ich herauszufinden, ob Apache Spark zusammen mit Apache Kafka Daten aus meiner relationalen Datenbank zu Elasticsearch synchronisieren kann.
Mein Plan ist, einen der Kafka-Konnektoren zu verwenden, um Daten aus RDBMS zu lesen und in Kafka-Themen zu schieben. Das wäre die ERD des Modells und DDL. Ganz einfach, Report
und Product
Tabellen, die viele-zu-viele-Beziehung haben, die in ReportProduct
Tabelle vorhanden ist:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
Mein Ziel ist es, mit der folgenden Struktur in dem Dokument zu transformieren:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
Ich war in der Lage, eine solche Struktur mit statischen Daten zu bilden, die ich lokal ausgeheckt habe:
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
Aber ich merke, dass dies zu einfach ist und Ströme werden viel komplizierter.
Daten ändern sich unregelmäßig, Berichte und ihre Produkte werden ständig geändert, Produkte werden hin und wieder (meist wöchentlich) geändert.
Ich möchte alle Arten von Änderungen in Elasticsearch replizieren, die in einer dieser Tabellen aufgetreten sind.
Das ist wirklich gut klingt. Aus den Recherchen, die ich zuvor gemacht habe, lässt Kafka dich nicht auf Nicht-Partitions-Schlüssel kommen, was für mich der Fall sein könnte. Erkennt KSQL das? –
Sie können einfach mit KSQL neu partitionieren, was dieses Problem umgehen könnte. Ich habe es nicht versucht. –