2016-12-13 16 views

Angenommen, ich habe folgenden Datenrahmen:PySpark Datenrahmen Manipulation Effizienz

|  123| 989| 0.9|  0| 
|  123| 990| 0.8|  1| 
|  123| 999| 0.7|  0| 
|  234| 789| 0.9|  0| 
|  234| 777| 0.7|  0| 
|  234| 769| 0.6|  1| 
|  234| 798| 0.5|  0| 

ich die folgenden Operationen führen Sie dann eine endgültige Datensatz zu erhalten (unter dem Code angezeigt):

# Add a new column with the clicked ad_id if clicked == 1, 0 otherwise 
df_adClicked = df.withColumn("ad_id_clicked", when(df.clicked==1, df.ad_id).otherwise(0)) 

# DF -> RDD with tuple : (display_id, (ad_id, prob), clicked) 
df_blah = df_adClicked.rdd.map(lambda x : (x[0],(x[1],x[2]),x[4])).toDF(["display_id", "ad_id","clicked_ad_id"]) 

# Group by display_id and create column with clicked ad_id and list of tuples : (ad_id, prob) 
df_blah2 = df_blah.groupby('display_id').agg(F.collect_list('ad_id'), F.max('clicked_ad_id')) 

# Define function to sort list of tuples by prob and create list of only ad_ids 
def sortByRank(ad_id_list): 
    sortedVersion = sorted(ad_id_list, key=itemgetter(1), reverse=True) 
    sortedIds = [i[0] for i in sortedVersion] 

# Sort the (ad_id, prob) tuples by using udf/function and create new column ad_id_sorted 
sort_ad_id = udf(lambda x : sortByRank(x), ArrayType(IntegerType())) 
df_blah3 = df_blah2.withColumn('ad_id_sorted', sort_ad_id('collect_list(ad_id)')) 

# Function to change clickedAdId into an array of size 1 
def createClickedSet(clickedAdId): 
    setOfDocs = [clickedAdId] 
    return setOfDocs 

clicked_set = udf(lambda y : createClickedSet(y), ArrayType(IntegerType())) 
df_blah4 = df_blah3.withColumn('ad_id_set', clicked_set('max(clicked_ad_id)')) 

# Select the necessary columns 
finalDF = df_blah4.select('display_id', 'ad_id_sorted','ad_id_set') 

|display_id|ad_id_sorted  |ad_id_set| 
|234  |[789, 777, 769, 798]|[769] | 
|123  |[989, 990, 999]  |[990] | 

Gibt es eine effizientere Möglichkeit, dies zu tun? Diese Reihe von Transformationen in der Art zu tun, wie ich bin, scheint der Flaschenhals in meinem Code zu sein. Ich würde jedes Feedback sehr schätzen.



Ich habe keine Timing-Vergleiche gemacht, aber ich würde denken, dass Spark nicht in der Lage sein sollte, sich optimal zu optimieren, indem er keine UDFs verwendet.

#scala: val dfad = sc.parallelize(Seq((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF("display_id","ad_id","prob","clicked") 
#^^^that's^^^ the only difference (besides putting val in front of variables) between this python response and a Scala one 

dfad = sc.parallelize(((123,989,0.9,0),(123,990,0.8,1),(123,999,0.7,0),(234,789,0.9,0),(234,777,0.7,0),(234,769,0.6,1),(234,798,0.5,0))).toDF(["display_id","ad_id","prob","clicked"]) 

df1 = sqlContext.sql("SELECT display_id,collect_list(ad_id) ad_id_sorted FROM (SELECT * FROM df_ad SORT BY display_id,prob DESC) x GROUP BY display_id") 
|display_id|  ad_id_sorted| 
|  234|[789, 777, 769, 798]| 
|  123|  [989, 990, 999]| 

df2 = sqlContext.sql("SELECT display_id, max(ad_id) as ad_id_set from df_ad where clicked=1 group by display_id") 
|  234|  769| 
|  123|  990| 

final_df = df1.join(df2,"display_id") 
|display_id|  ad_id_sorted|ad_id_set| 
|  234|[789, 777, 769, 798]|  769| 
|  123|  [989, 990, 999]|  990| 

Ich habe nicht das ad_id_set in ein Array setzen, weil Sie die max wurden die Berechnung und max sollte nur 1 Wert zurück. Ich bin mir sicher, wenn du es wirklich in einem Array brauchst, kannst du das machen.

Ich habe den subtilen Scala-Unterschied aufgenommen, wenn ein zukünftiger Benutzer mit Scala ein ähnliches Problem hat.


Vielen Dank für die Bereitstellung dieser Lösung. Ich habe beide Lösungen zeitlich abgestimmt. Ihre Lösung wurde in 1,38 ms ausgeführt und die ursprüngliche Lösung in 2,01 ms ausgeführt. :) – user2253546