Zuerst werde ich feststellen, dass ich nicht erklären kann, warum Ihre explode()
in Row(employee: Seq[Row])
wird, da ich das Schema Ihres DataFrame nicht kenne. Ich muss annehmen, dass es mit der Struktur Ihrer Daten zu tun hat.
Nicht Sie Originaldaten zu wissen, ich habe einen kleinen Datensatz erstellt von
scala> val df = sc.parallelize(Array((1, "dsfds dsf dasf dsf dsf d"), (2, "2344 2353 24 23432 234"))).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
arbeiten, wenn ich jetzt darüber zuordnen, können Sie sich, dass es gibt Zeilen Daten vom Typ folgendes enthalten.
scala> df.map {case row: Row => (row(0), row(1)) }
res21: org.apache.spark.rdd.RDD[(Any, Any)] = MapPartitionsRDD[17] at map at <console>:33
Sie haben im Grunde Typinformationen verloren, weshalb Sie explizit müssen, ist die Art angeben, wenn Sie die Daten in der Zeile
scala> df.map {case row: Row => (row(0).asInstanceOf[Int], row(1).asInstanceOf[String]) }
res22: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[18] at map at <console>:33
Also, um es zur Explosion verwenden möchten, ich haben folgendes zu tun
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
df.explode(col("id"), col("text")) {case row: Row =>
val id = row(0).asInstanceOf[Int]
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => (id, word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
res30: org.apache.spark.sql.DataFrame = [id: int, text: string, _1: int, _2: string]
scala> res30 show
+---+--------------------+---+-----+
| id| text| _1| _2|
+---+--------------------+---+-----+
| 1|dsfds dsf dasf ds...| 1|dsfds|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dasf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| dsf|
| 1|dsfds dsf dasf ds...| 1| d|
| 2|2344 2353 24 2343...| 2| 2344|
| 2|2344 2353 24 2343...| 2| 2353|
| 2|2344 2353 24 2343...| 2| 24|
| 2|2344 2353 24 2343...| 2|23432|
| 2|2344 2353 24 2343...| 2| 234|
+---+--------------------+---+-----+
Wenn Sie benannte Spalten möchten, können Sie einen Fall Klasse definieren halten Sie explodierte Daten
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.Row
case class ExplodedData(word: String)
df.explode(col("id"), col("text")) {case row: Row =>
val words = row(1).asInstanceOf[String].split(" ")
words.map(word => ExplodedData(word))
}
// Exiting paste mode, now interpreting.
import org.apache.spark.sql.Row
defined class ExplodedData
res35: org.apache.spark.sql.DataFrame = [id: int, text: string, word: string]
scala> res35.select("id","word").show
+---+-----+
| id| word|
+---+-----+
| 1|dsfds|
| 1| dsf|
| 1| dasf|
| 1| dsf|
| 1| dsf|
| 1| d|
| 2| 2344|
| 2| 2353|
| 2| 24|
| 2|23432|
| 2| 234|
+---+-----+
Hoffe das bringt etwas Klarheit.