2017-02-24 2 views
1

ich Funken Streaming bin mit und bin dieses Datenrahmen aus der kafka Nachricht erstellen:Erstellen von Datenrahmen aus zwei verschiedenen Arrays entfachen

|customer|initialLoadComplete|initialLoadRunning|  messageContent|  tableName| 
+--------+-------------------+------------------+--------------------+-----------------+ 
| A|    false|    true|TEFault_IdReason...|Timed_Event_Fault| 
| A|    false|    true|TEFault_IdReason...|Timed_Event_Fault| 
+--------+-------------------+------------------+--------------------+-----------------+ 

Jetzt möchte ich messageContent extrahieren, ist messageContent im Grunde wie eine CSV, dass die enthält Rohdaten und die erste Zeile sind die Spalten. Ich kann die Header auf folgende Weise aus dem Feld messageContent extrahieren.

val Array1 = ssc.sparkContext.parallelize(rowD.getString(2).split("\u0002")(0)) 

So sieht Array1 wie folgt aus:

Array1: col1^Acol2^Acol3 

Array2 ist im Grunde die Rohdaten, die jeweils durch^A und Aufzeichnung von^B getrennt getrennt Spaltenwert.

^A ist ein Spaltentrenner.^B ist Rekord seperator

Also das ist, was array2 aussehen könnte:

Array2 = value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9 

Grundsätzlich ich einen Datenrahmen erstellt werden soll dies aus, so sieht es wie folgt aus:

col1 | col2 | col3 
------------------------- 
value1 | value2 | value3 
value4 | value5 | value6 
value7 | value8 | value9 

^B ist der Datensatzbegrenzer.

Wenn wir von einer hdfs Datei gelesen wurden, haben wir einen Datenrahmen über diesen Befehl:

val df = csc.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", "\u0001").load(hdfsFile) 

Aber dieses Mal, dass ich einen Datenrahmen von zwei Arrays aus dem Speicher erschaffe. Array1 ist der Header für die Werte in Array2 und Array2 ist ein Datensatz, der durch^B getrennt ist.

Was könnte dem Erstellen eines Datenrahmens in diesem Ansatz entsprechen, wie ich für das Erstellen eines Datenrahmens aus einer Datei getan habe.

+0

Also Array1 und Array2 hat nur ein Element des Typs String mit Trennzeichen von^A und^B? –

+0

es ist eine parallelisierte Sammlung, aktualisiert meine Frage – Ahmed

+0

Ich muss nur wissen, was jedes Element in Array1 und Array2 aussieht .. so dass wir Lösungen ohne Annahmen bieten können .. –

Antwort

1

Ich folge aus Ihrer Frage das Folgende.

Array1 ist ein rdd von nur einem Eintrag col1^Acol2^Acol3

Array2 ein rdd mit jedem Eintrag etwas suchen, wie das ist. value1^Avalue2^Avalue3^Bvalue4^Avalue5^Avalue6^Bvalue7^Avalue8^Avalue9

mit diesen Annahmen sollte das folgende funktionieren.

val array1 = sc.parallelize(Seq("col1\u0002col2\u0002col3")) 
val array2 = sc.parallelize(Seq("value1\u0001value2\u0001value3\u0002value4\u0001value5\u0001value6\u0002value7\u0001value8\u0001value9")) 
val data = array2.flatMap(x => x.split("\u0002")).map(x => x.split('\u0001')).collect() 

val result = array2 
       .flatMap(x => x.split("\u0002")) 
       .map(x => x.split('\u0001')) 
       .map({ case Array(x,y,z) => (x,y,z)}) 
       .toDF(array1.flatMap(x => x.split('\u0002')).collect(): _*) 

result.show() 
+------+------+------+ 
| col1| col2| col3| 
+------+------+------+ 
|value1|value2|value3| 
|value4|value5|value6| 
|value7|value8|value9| 
+------+------+------+ 
+1

Hallo, Ja, ich konnte eine ähnliche Lösung selbst implementieren und funktionierte. Danke – Ahmed

+0

Ihr Willkommen !!. –

Verwandte Themen