2015-12-02 4 views
6

Ich bin mit Funken 1,3PySpark und Broadcast Beispiel beitreten

# Read from text file, parse it and then do some basic filtering to get data1 
data1.registerTempTable('data1') 

# Read from text file, parse it and then do some basic filtering to get data1 
data2.registerTempTable('data2') 

# Perform join 
data_joined = data1.join(data2, data1.id == data2.id); 

Meine Daten sind ziemlich schief und Daten2 (wenige KB) < < data1 (10s von GB) und die Leistung ist sehr schlecht. Ich habe über Broadcast-Join gelesen, bin mir aber nicht sicher, wie ich dasselbe mit der Python-API machen kann.

Antwort

13

Spark 1.3 unterstützt keine Broadcast-Joins mit DataFrame. In Spark> = 1.5.0 Sie broadcast Funktion schließt sich bewerben können Broadcast:

from pyspark.sql.functions import broadcast 

data1.join(broadcast(data2), data1.id == data2.id) 

Bei älteren Versionen ist die einzige Option auf RDD und Anwendung der gleichen Logik wie in anderen Sprachen zu konvertieren. so etwas wie dies etwa:

from pyspark.sql import Row 
from pyspark.sql.types import StructType 

# Create a dictionary where keys are join keys 
# and values are lists of rows 
data2_bd = sc.broadcast(
    data2.map(lambda r: (r.id, r)).groupByKey().collectAsMap()) 


# Define a new row with fields from both DFs 
output_row = Row(*data1.columns + data2.columns) 

# And an output schema 
output_schema = StructType(data1.schema.fields + data2.schema.fields) 

# Given row x, extract a list of corresponding rows from broadcast 
# and output a list of merged rows 
def gen_rows(x): 
    return [output_row(*x + y) for y in data2_bd.value.get(x.id, [])] 

# flatMap and create a new data frame 
joined = data1.rdd.flatMap(lambda row: gen_rows(row)).toDF(output_schema) 
+0

'pyspark.sql.functions.broadcast' erschien zuerst in 1,6, nach Arbeits zu [die Dokumente] (https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#module-pypark.sql.functions) –

+1

@NicholasWhite In PySpark Wrapper wurde hinzugefügt 1.6 aber Scala Methode ist seit 1.5 verfügbar, so dass Sie es auch in 1.5 arbeiten können. – zero323

-1

Dieser Code wird in funken 2.0.2-bin-hadoop2.7 Version

from pyspark.sql import SparkSession 

from pyspark.sql.functions import broadcast 

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate() 

df2 = spark.read.csv("D:\\trans_mar.txt",sep="^"); 

df1=spark.read.csv("D:\\trans_feb.txt",sep="^"); 

print(df1.join(broadcast(df2),df2._c77==df1._c77).take(10))