2016-07-16 6 views
4

Ich versuche UDF mit Eingabetyp Array von Struct zu verwenden. Ich habe die folgende Struktur von Daten, das ist nur relevant, Teil einer größeren StrukturSpark Sql UDF mit komplexen Eingabeparameter

|--investments: array (nullable = true) 
    | |-- element: struct (containsNull = true) 
    | | |-- funding_round: struct (nullable = true) 
    | | | |-- company: struct (nullable = true) 
    | | | | |-- name: string (nullable = true) 
    | | | | |-- permalink: string (nullable = true) 
    | | | |-- funded_day: long (nullable = true) 
    | | | |-- funded_month: long (nullable = true) 
    | | | |-- funded_year: long (nullable = true) 
    | | | |-- raised_amount: long (nullable = true) 
    | | | |-- raised_currency_code: string (nullable = true) 
    | | | |-- round_code: string (nullable = true) 
    | | | |-- source_description: string (nullable = true) 
    | | | |-- source_url: string (nullable = true) 

I Fall Klassen deklariert:

case class Company(name: String, permalink: String) 
case class FundingRound(company: Company, funded_day: Long, funded_month: Long, funded_year: Long, raised_amount: Long, raised_currency_code: String, round_code: String, source_description: String, source_url: String) 
case class Investments(funding_round: FundingRound) 

UDF Erklärung:

sqlContext.udf.register("total_funding", (investments:Seq[Investments]) => { 
    val totals = investments.map(r => r.funding_round.raised_amount) 
    totals.sum 
}) 

Wenn ich Beim Ausführen der folgenden Transformation ist das Ergebnis wie erwartet

scala> sqlContext.sql("""select total_funding(investments) from companies""") 
res11: org.apache.spark.sql.DataFrame = [_c0: bigint] 

Aber wenn eine Aktion wie collect ausgeführt habe ich einen Fehler:

Executor: Exception in task 0.0 in stage 4.0 (TID 10) 
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to $line33.$read$$iwC$$iwC$Investments 

Vielen Dank für jede Hilfe.

Antwort

10

Der Fehler, den Sie sehen, sollte ziemlich selbsterklärend sein. Es gibt eine strikte Zuordnung zwischen Catalyst/SQL-Typen und Scala-Typen, die in the relevant section von the Spark SQL, DataFrames and Datasets Guide gefunden werden können.

Insbesondere struct Typen werden in o.a.s.sql.Row konvertiert (in Ihrem speziellen Fall Daten werden als Seq[Row] ausgesetzt werden).

Es gibt verschiedene Methoden, die verwendet werden können, Daten als spezifische Arten zu belichten:

mit nur der erste Ansatz könnte in diesem speziellen Szenario anwendbar sein.

Wenn Sie investments.funding_round.raised_amount zugreifen UDF verwenden Sie so etwas wie dieses benötigen:

val getRaisedAmount = udf((investments: Seq[Row]) => scala.util.Try(
    investments.map(_.getAs[Row]("funding_round").getAs[Long]("raised_amount")) 
).toOption) 

aber einfach select sollte viel sicherer und sauberer sein:

df.select($"investments.funding_round.raised_amount")