2015-11-16 10 views
19

In meinem Schwein Code, den ich dies tun:Funken Vereinigung mehrerer RDDs

all_combined = Union relation1, relation2, 
    relation3, relation4, relation5, relation 6. 

ich mit Funken, das gleiche tun mag. Doch leider sehe ich, dass ich es paarweise zu halten, haben zu tun:

first = rdd1.union(rdd2) 
second = first.union(rdd3) 
third = second.union(rdd4) 
# .... and so on 

Gibt es eine Vereinigung Operator, der mich auf mehreren RDDs zu einer Zeit arbeiten lassen:

z.B. union(rdd1, rdd2,rdd3, rdd4, rdd5, rdd6)

Es ist eine Frage der Bequemlichkeit.

Antwort

40

Wenn dies RDDs Sie SparkContext.union Methode verwenden:

rdd1 = sc.parallelize([1, 2, 3]) 
rdd2 = sc.parallelize([4, 5, 6]) 
rdd3 = sc.parallelize([7, 8, 9]) 

rdd = sc.union([rdd1, rdd2, rdd3]) 
rdd.collect() 

## [1, 2, 3, 4, 5, 6, 7, 8, 9] 

Es gibt keine DataFrame gleichwertig, aber es ist nur eine Frage eines einfachen Einstrichs:

from functools import reduce # For Python 3.x 
from pyspark.sql import DataFrame 

def unionAll(*dfs): 
    return reduce(DataFrame.unionAll, dfs) 

df1 = sqlContext.createDataFrame([(1, "foo1"), (2, "bar1")], ("k", "v")) 
df2 = sqlContext.createDataFrame([(3, "foo2"), (4, "bar2")], ("k", "v")) 
df3 = sqlContext.createDataFrame([(5, "foo3"), (6, "bar3")], ("k", "v")) 

unionAll(df1, df2, df3).show() 

## +---+----+ 
## | k| v| 
## +---+----+ 
## | 1|foo1| 
## | 2|bar1| 
## | 3|foo2| 
## | 4|bar2| 
## | 5|foo3| 
## | 6|bar3| 
## +---+----+ 

Wenn die Anzahl der DataFrames groß ist SparkContext.union auf RDDs mit und DataFrame neu zu erstellen kann eine bessere Wahl issues related to the cost of preparing an execution plan zu vermeiden sein:

def unionAll(*dfs): 
    first, *rest = dfs # Python 3.x, for 2.x you'll have to unpack manually 
    return first.sql_ctx.createDataFrame(
     first.sql_ctx._sc.union([df.rdd for df in dfs]), 
     first.schema 
    ) 
+0

Was ist der Zweck * Rest ist hier? Es wird nirgends verwendet. –

1

Leider ist es der einzige Weg zu UNION Tabellen in Spark. Doch statt

first = rdd1.union(rdd2) 
second = first.union(rdd3) 
third = second.union(rdd4) 
... 

können Sie es in einem wenig saubere Art und Weise, wie diese ausgeführt werden:

result = rdd1.union(rdd2).union(rdd3).union(rdd4) 
0
from pyspark.sql import DataFrame 
reduce(DataFrame.unionAll, [df1,df2,df3]) 
+0

Bitte versuchen Sie, Ihre Lösung in ein oder zwei Sätzen zu erklären. Das wird anderen Benutzern helfen, es besser zu verstehen – Alireza