2017-09-27 1 views
0

Ich bekomme Millionen von Nachrichten aus Kafka Stream in Spark-Streaming. Es gibt 15 verschiedene Nachrichtentypen. Nachrichten kommen von einem einzigen Thema. Ich kann die Nachricht nur durch ihren Inhalt unterscheiden. also verwende ich rdd.contains-Methode, um den unterschiedlichen Typ von rdd zu erhalten.ist rdd.contains Funktion in Spark-Scala teuer

Abtastnachricht

{ "a": "foo", "B": "bar", "Typ", "first"} .......
{ "a":“ foo1 "," b ":" bar1 "," type ":" zweite ".......}
{" a ":" foo2 "," b ":" bar2 "," type ":" 3. "......."
{"a": "foo", "b": "bar", "type": "zuerst" .......}
.... ..........
...............
.........
so auf

Code

DStream.foreachRDD { rdd => 
    if (!rdd.isEmpty()) { 
    val rdd_first = rdd.filter { 
     ele => ele.contains("First") 
    } 
    if (!rdd_first.isEmpty()) { 
     insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    } 
    val rdd_second = rdd.filter { 
     ele => ele.contains("Second") 
    } 
    if (!rdd_second.isEmpty()) { 
    insertIntoTableSecond(hivecontext.read.json(rdd_second)) 
    } 
     ............. 
     ...... 
    same way for 15 different rdd 

ist es eine Möglichkeit, verschiedene rdd von kafka Thema Nachricht zu bekommen?

Antwort

1

Es gibt keine rdd.contains. Die hier verwendete Funktion contains wird auf die String s in der angewendet.

wie hier:

val rdd_first = rdd.filter { 
    element => element.contains("First") // each `element` is a String 
} 

Diese Methode ist nicht robust, da andere Inhalte im String könnte den Vergleich zu treffen, zu Fehlern führt.

z.B.

{"a":"foo", "b":"bar","type":"second", "c": "first", .......} 

Eine Möglichkeit, dies zu umgehen wäre, zuerst die JSON-Daten in die richtige Datensätze umwandeln und dann anwenden auf die Datensätze Gruppierung oder Filterlogik. Dazu benötigen wir zunächst eine Schemadefinition der Daten. Mit dem Schema können wir die Datensätze in json analysieren und anwenden jede Verarbeitung obendrein:

case class Record(a:String, b:String, `type`:String) 

import org.apache.spark.sql.types._ 
val schema = StructType(
       Array(
       StructField("a", StringType, true), 
       StructField("b", StringType, true), 
       StructField("type", String, true) 
       ) 
      ) 

val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 

stream.foreachRDD { rdd => 
    val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record] 
    processPerType.foreach{case (tpe, process) => 
     val target = records.filter(entry => entry.`type` == tpe) 
     process(target) 
    } 
} 

Die Frage ist nicht festgelegt, welche Art von Logik auf jede Art von Datensatz angelegt werden muss. Was hier dargestellt wird, ist eine generische Art, das Problem anzugehen, bei dem jede benutzerdefinierte Logik als Funktion Dataset[Record] => Unit ausgedrückt werden kann.

Wenn die Logik als Aggregation ausgedrückt werden könnte, sind wahrscheinlich die Aggregationsfunktionen Dataset geeigneter.

+0

Ich muss die Daten in der Struktur speichern. Es gibt 15 verschiedene Tische im Bienenstock. Aktualisierte Frage. Tatsächlich gibt es in einem einzigen JSON-Typ mehr als 50 Spalten. Also muss ich 15 Fallklassen erstellen. Gibt es noch andere, anstatt Fallklassen zu erstellen? –

+0

@KishoreKumarSuthar, nachdem die Daten mit der initialen 'case class' (nach Spark lingo) 'strukturiert' wurden, könnten Sie Projektionen auf die Daten machen, um sie an die jeweilige Tabelle anzupassen' (val tableProjection1 = records select ($ "column", $ "Spalte", ...) wo ($ "type" === ...) ' – maasg