Ich bekomme Millionen von Nachrichten aus Kafka Stream in Spark-Streaming. Es gibt 15 verschiedene Nachrichtentypen. Nachrichten kommen von einem einzigen Thema. Ich kann die Nachricht nur durch ihren Inhalt unterscheiden. also verwende ich rdd.contains-Methode, um den unterschiedlichen Typ von rdd zu erhalten.ist rdd.contains Funktion in Spark-Scala teuer
Abtastnachricht
{ "a": "foo", "B": "bar", "Typ", "first"} .......
{ "a":“ foo1 "," b ":" bar1 "," type ":" zweite ".......}
{" a ":" foo2 "," b ":" bar2 "," type ":" 3. "......."
{"a": "foo", "b": "bar", "type": "zuerst" .......}
.... ..........
...............
.........
so auf
Code
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
ist es eine Möglichkeit, verschiedene rdd von kafka Thema Nachricht zu bekommen?
Ich muss die Daten in der Struktur speichern. Es gibt 15 verschiedene Tische im Bienenstock. Aktualisierte Frage. Tatsächlich gibt es in einem einzigen JSON-Typ mehr als 50 Spalten. Also muss ich 15 Fallklassen erstellen. Gibt es noch andere, anstatt Fallklassen zu erstellen? –
@KishoreKumarSuthar, nachdem die Daten mit der initialen 'case class' (nach Spark lingo) 'strukturiert' wurden, könnten Sie Projektionen auf die Daten machen, um sie an die jeweilige Tabelle anzupassen' (val tableProjection1 = records select ($ "column", $ "Spalte", ...) wo ($ "type" === ...) ' – maasg