2016-08-24 5 views
2

Kann jemand bitte erklären, warum , Seq[Row] nach der Explosion eines dataframe Feld verwendet werden, die Sammlung von Elementen hat. Und können Sie mir bitte den Grund erklären, warum asInstanceOf benötigt wird, um die Werte aus dem explodierten Feld zu erhalten? HierSpark Datenfeld explodieren Funktion

ist die Syntax:

val explodedDepartmentWithEmployeesDF = departmentWithEmployeesDF.explode(departmentWithEmployeesDF("employees")) {  
          case Row(employee: Seq[Row]) => 
          employee.map(employee => 
          Employee(employee(0).asInstanceOf[String], 
          employee(1).asInstanceOf[String], employee(2).asInstanceOf[String])) } 

Antwort

0

Ich glaube, Sie mehr über das Dokument lesen kann, und einen Test zuerst.

explodieren eines Dataframes immer noch ein Dataframe.And es akzeptieren eine Lambda-Funktion f: (Zeile) ⇒ TraversableOnce [A] als Parameter.

In der Lambda-Funktion passen Sie die Eingabe von Groß-/Kleinschreibung an. Sie haben bereits gewusst, dass Ihre Eingabe Row of employee ist, was immer noch eine Seq of Row ist. Wenn die Eingabe Row ist (Mitarbeiter: Seq [Row]), können Sie lernen, wenn Sie diesen Teil nicht verstehen mehr über die unapplic funciton in scala.

Und dann, Mitarbeiter (ich glaube, Sie sollten hier Mitarbeiter verwenden), als Seq of Row, wird die Map-Funktion anwenden, um jede Zeile zu einem Mitarbeiter zuordnen. Und Sie werden die scala apply-Funktion verwenden, um den i-ten Wert in dieser Zeile zu erhalten. Da der Rückgabewert ein Objekt ist, müssen Sie asInstanceOf verwenden, um es in den erwarteten Typ zu konvertieren.

2

Zuerst werde ich feststellen, dass ich nicht erklären kann, warum Ihre explode() in Row(employee: Seq[Row]) wird, da ich das Schema Ihres DataFrame nicht kenne. Ich muss annehmen, dass es mit der Struktur Ihrer Daten zu tun hat.

Nicht Sie Originaldaten zu wissen, ich habe einen kleinen Datensatz erstellt von

scala> val df = sc.parallelize(Array((1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text") 
df: org.apache.spark.sql.DataFrame = [id: int, text: string] 

arbeiten, wenn ich jetzt darüber zuordnen, können Sie sich, dass es gibt Zeilen Daten vom Typ folgendes enthalten.

scala> df.map {case row: Row => (row(0), row(1)) } 
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33 

Sie haben im Grunde Typinformationen verloren, weshalb Sie explizit müssen, ist die Art angeben, wenn Sie die Daten in der Zeile

scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) } 
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33 

Also, um es zur Explosion verwenden möchten, ich haben folgendes zu tun

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.Row 
df.explode(col("id"), col("text")) {case row: Row => 
    val id = row(0).asInstanceOf[Int] 
    val words = row(1).asInstanceOf[String].split(" ") 
    words.map(word => (id, word)) 
} 

// Exiting paste mode, now interpreting. 

import org.apache.spark.sql.Row 
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string] 

scala> res30 show 
+---+--------------------+---+-----+ 
| id|    text| _1| _2| 
+---+--------------------+---+-----+ 
| 1|dsfds dsf dasf ds...| 1|dsfds| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| dasf| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| dsf| 
| 1|dsfds dsf dasf ds...| 1| d| 
| 2|2344 2353 24 2343...| 2| 2344| 
| 2|2344 2353 24 2343...| 2| 2353| 
| 2|2344 2353 24 2343...| 2| 24| 
| 2|2344 2353 24 2343...| 2|23432| 
| 2|2344 2353 24 2343...| 2| 234| 
+---+--------------------+---+-----+ 

Wenn Sie benannte Spalten möchten, können Sie einen Fall Klasse definieren halten Sie explodierte Daten

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import org.apache.spark.sql.Row 
case class ExplodedData(word: String) 
df.explode(col("id"), col("text")) {case row: Row => 
    val words = row(1).asInstanceOf[String].split(" ") 
    words.map(word => ExplodedData(word)) 
} 

// Exiting paste mode, now interpreting. 

import org.apache.spark.sql.Row 
defined class ExplodedData 
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string] 

scala> res35.select("id","word").show 
+---+-----+ 
| id| word| 
+---+-----+ 
| 1|dsfds| 
| 1| dsf| 
| 1| dasf| 
| 1| dsf| 
| 1| dsf| 
| 1| d| 
| 2| 2344| 
| 2| 2353| 
| 2| 24| 
| 2|23432| 
| 2| 234| 
+---+-----+ 

Hoffe das bringt etwas Klarheit.