13

Ich versuche, die Spark-API Dataset API zu verwenden, aber ich habe einige Probleme, die eine einfache Join.Spark Dataset API - Join

Sagen wir, ich habe zwei Datensatz mit Feldern: date | value, dann im Falle von DataFrame meine aussehen würde beitreten:

val dfA : DataFrame 
val dfB : DataFrame 

dfA.join(dfB, dfB("date") === dfA("date")) 

Allerdings gibt für Dataset ist die .joinWith Methode, aber der gleiche Ansatz funktioniert nicht :

val dfA : Dataset 
val dfB : Dataset 

dfA.joinWith(dfB, ?) 

Was das Argument von .joinWith erforderlich ist?

Antwort

19

Um joinWith zu verwenden, müssen Sie zuerst eine DataSet erstellen, und höchstwahrscheinlich zwei davon. Um eine DataSet zu erstellen, müssen Sie eine Fallklasse erstellen, die Ihrem Schema entspricht, und DataFrame.as[T] aufrufen, wobei T Ihre Fallklasse ist. Also:

case class KeyValue(key: Int, value: String) 
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") 
val ds = df.as[KeyValue] 
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

Sie könnten auch den Fall, Klasse überspringen und ein Tupel verwenden:

val tupDs = df.as[(Int,String)] 
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

Dann, wenn Sie einen anderen Fall Klasse/DF hatte, wie dies sagen:

case class Nums(key: Int, num1: Double, num2: Long) 
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") 
val ds2 = df2.as[Nums] 
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

Dann , während die Syntax von join und joinWith ähnlich sind, sind die Ergebnisse unterschiedlich:

df.join(df2, df.col("key") === df2.col("key")).show 
// +---+-----+---+----+----+ 
// |key|value|key|num1|num2| 
// +---+-----+---+----+----+ 
// | 1| asdf| 1| 7.7| 101| 
// | 2|34234| 2| 1.2| 10| 
// +---+-----+---+----+----+ 

ds.joinWith(ds2, df.col("key") === df2.col("key")).show 
// +---------+-----------+ 
// |  _1|   _2| 
// +---------+-----------+ 
// | [1,asdf]|[1,7.7,101]| 
// |[2,34234]| [2,1.2,10]| 
// +---------+-----------+ 

Wie Sie sehen können, lassen joinWith die Objekte intakt als Teile eines Tupels, während join die Spalten in einem einzigen Namespace abflacht. (. Welche Probleme im obigen Fall verursacht, weil der Spaltenname „Schlüssel“ wiederholt werden)

Merkwürdiger, ich habe df.col("key") und df2.col("key") verwenden, um die Bedingungen zu schaffen, für den Beitritt ds und ds2 - wenn Sie nur col("key") verwenden auf beiden Seiten funktioniert es nicht, und ds.col(...) existiert nicht. Die Verwendung des Originals df.col("key") macht jedoch den Trick.

+3

detaillierte Erklärung anzuwenden. Nur eine Verwirrung. Gibt es eine bessere Möglichkeit, typisierte Join-Bedingung zu schreiben. für z.B. df.col ("key") können wir etwas Typsicherer haben, das die Korrektheit des "Schlüssels" zur Kompilierzeit auflösen kann. –

+5

Ich stimme völlig zu, basierend auf dieser Syntax ist es nicht sinnvoll, den Datensatz zu erstellen, also wo ist der Vorteil? Ich kann nicht über die Tatsache hinwegkommen, dass es keine getippte Alternative gibt. So ein Jammer! – Sparky

2

In obigem Beispiel tun könnten Sie unter Option versuchen können -

  • Definieren Sie eine Fallklasse für Ihre Ausgabe

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • Verbinden Sie zwei Datensätze mit "Seq (" key ")", dies wird Ihnen helfen, zwei doppelte Schlüsselspalten in der Ausgabe zu vermeiden.Welche wird helfen, die Fallklasse oder holen die Daten im nächsten Schritt

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

+0

Sie beantworten nicht speziell die Frage, aber der Seq ("Schlüssel") Tipp hat mir geholfen – ImDarrenG

Verwandte Themen