2014-07-21 9 views
11

Wir planen, Apache Pig-Code auf die neue Spark-Plattform zu verschieben.Wie implementiert man "Cross Join" in Spark?

Pig hat ein "Bag/Tuple/Field" -Konzept und verhält sich ähnlich wie eine relationale Datenbank. Pig unterstützt CROSS/INNER/OUTER Joins.

Für CROSS JOIN wir alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

verwenden können, aber als wir in die Spark-Plattform bewegen konnte ich kein Gegenstück in der Spark-API. Hast du irgendeine Idee?

+0

Es ist noch nicht fertig, aber Spork (Schwein auf Funken) gebaut wird derzeit, so müssen Sie möglicherweise keinen Code ändern – aaronman

Antwort

18

Es ist oneRDD.cartesian(anotherRDD). Hier

+0

Danke, kartesischen Join ist der Spitzname von Cross Join –

2

ist die empfohlene Version für Spark 2.x Datensammlungen und Datenrahmen:

scala> val ds1 = spark.range(10) 
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint] 

scala> ds1.cache.count 
res1: Long = 10 

scala> val ds2 = spark.range(10) 
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint] 

scala> ds2.cache.count 
res2: Long = 10 

scala> val crossDS1DS2 = ds1.crossJoin(ds2) 
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint] 

scala> crossDS1DS2.count 
res3: Long = 100 

Alternativ ist es möglich, die traditionelle JOIN-Syntax zu verwenden, ohne Bedingung beizutreten. Verwenden Sie diese Konfigurationsoption, um den folgenden Fehler zu vermeiden.

spark.conf.set("spark.sql.crossJoin.enabled", true) 

Fehler bei der Konfiguration weggelassen wird (mit der "Join" Syntax speziell):

scala> val crossDS1DS2 = ds1.join(ds2) 
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint] 

scala> crossDS1DS2.count 
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans 
... 
Join condition is missing or trivial. 
Use the CROSS JOIN syntax to allow cartesian products between these relations.; 

Verwandte: spark.sql.crossJoin.enabled for Spark 2.x