2016-06-27 11 views
3

Ich möchte Array[org.apache.spark.sql.Row] in ein DataFrame konvertieren. Kann mir jemand einen besseren Weg vorschlagen?Array [Zeile] in Datenrahmen in Spark/Scala konvertieren

Ich habe versucht, es zuerst in RDD umzuwandeln und dann versuchte, es in Dataframe umzuwandeln, aber wenn ich irgendeine Operation auf der DataFrame führe, werden Ausnahmen gezeigt.

val arrayOfRows = myDataFrame.collect().map(t => myfun(t)) 
val distDataRDD = sc.parallelize(arrayOfRows) 
val newDataframe = sqlContext.createDataFrame(distDataRDD,myschema) 

Hier myfun() ist eine Funktion, die Row (org.apache.spark.sql.Row) zurückgibt. Der Inhalt im Array ist korrekt und ich kann es ohne Probleme ausdrucken.

Aber als ich versuchte, die Datensätze in der zu zählen, gab es mir die Zählung sowie eine Warnung, dass eine der Bühne eine Aufgabe von sehr großer Größe enthält. Ich denke, ich mache etwas falsch. Bitte helfen Sie.

Antwort

1

Sie haben einen Fehler in der ersten Zeile. collect gibt ein Array zurück, während map eine Methode ist, die mit DataFrames/RDDs arbeitet.

Versuchen Sie stattdessen val arrayOfRows = myDataFrame.map(t => myfun(t)).collect().

+0

Ich erhalte diesen Fehler, wenn ich die Reihenfolge ** org.apache.spark ändere .SparkException: Task nicht serialisierbar ** – rvp

+0

ArrayofRows ist eigentlich vom Typ DataFrame, so dass keine Zeilen 2 und 3 benötigt werden (sc.parallelize akzeptiert RDDs und nicht DataFrames, was der Grund für die neue Ausnahme ist) –

+0

Ich bekomme den Fehler als sobald ich die erste Zeile 'val arrayOfRows = myD eingeben ataFrame.collect(). map (t => myfun (t)) ' – rvp

0

Fallklasse PgRnk (userId: Lang, Pagerank: Doppel) // Ein Fallklasse

sc.parallelize (pg10.map (r 1 => PgRnk (r1.getLong (0), r1.getDouble (1)))). ToDS() // In ein Dataset konvertieren, sc.parallelize konvertiert das Array in ein RDD und dann in DS

Verwandte Themen