2017-03-06 3 views
1

Ich habe versucht, eine dynamische Version von org.apache.spark.sql.explode arbeiten ohne Glück: Ich habe ein Dataset mit einer Datumsspalte namens event_date und eine andere Spalte namens no_of_days_gap. Ich möchte no_of_days_gap verwenden, um Klone der Zeile unter Verwendung der explode-Funktion zu erstellen. Einer meiner ersten Versuche war diese zu verwenden:Wie explodiert man mit einem Spaltenwert?

myDataset.withColumn("clone", explode(array((0 until col("no_of_days_gap")).map(lit): _*))) 

jedoch col("no_of_days_gap") ist vom Typ Column und ein Int erwartet. Ich habe auch verschiedene andere Ansätze ausprobiert. Wie kann ich das funktionieren?

P. S .: Ich schaffte es, eine alternative Lösung mit einer map Funktion durch den Aufruf flatMap gefolgt arbeiten zu bekommen, bin ich jedoch zu verstehen, wie wirklich interessiert arbeitet den withColumn Ansatz zu bekommen.

Antwort

0

Was ist mit den folgenden?

scala> val diddy = Seq(
    | ("2017/03/07", 4), 
    | ("2016/12/09", 2)).toDF("event_date", "no_of_days_gap") 
diddy: org.apache.spark.sql.DataFrame = [event_date: string, no_of_days_gap: int] 

scala> diddy.flatMap(r => Seq.fill(r.getInt(1))(r.getString(0))).show 
+----------+ 
|  value| 
+----------+ 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2016/12/09| 
|2016/12/09| 
+----------+ 

// use explode instead 

scala> diddy.explode("no_of_days_gap", "events") { n: Int => 0 until n }.show 
warning: there was one deprecation warning; re-run with -deprecation for details 
+----------+--------------+------+ 
|event_date|no_of_days_gap|events| 
+----------+--------------+------+ 
|2017/03/07|    4|  0| 
|2017/03/07|    4|  1| 
|2017/03/07|    4|  2| 
|2017/03/07|    4|  3| 
|2016/12/09|    2|  0| 
|2016/12/09|    2|  1| 
+----------+--------------+------+ 

Wenn Sie jedoch auf withColumn bestehen, so ... sein ... it! Schnallen Sie sich an!

diddy 
    .withColumn("concat", concat($"event_date", lit(","))) 
    .withColumn("repeat", expr("repeat(concat, no_of_days_gap)")) 
    .withColumn("split", split($"repeat", ",")) 
    .withColumn("explode", explode($"split")) 
0

Sie haben UDF zu verwenden:

val range = udf((i: Integer) => (0 until i).toSeq) 

df 
    .withColumn("clone", range($"no_of_days_gap")) // Add range 
    .withColumn("clone", explode($"clone")) // Explode 
+0

In Funken Shell dies einen Fehler zurückgibt: '' 'scala> val range = UDF ((i: Integer) => (0 bis i) .toSeq) scala.MatchError: scala. collection.immutable.Range (der Klasse scala.reflect.internal.Types $ ClassNoArgsTypeRef) bei org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 692) bei org.apache.spark.sql .catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) bei org.apache.spark.sql.functions $ .udf (functions.scala: 3072) ... 48 elided''' – Diddy

Verwandte Themen