2016-04-16 6 views
2

ich zwei RDDs mit gleichen Spalten haben:
RDD1: -Pyspark: Berechnung Summe von zwei correspoding Säulen, basierend auf Bedingungen von zwei Spalten in zwei RDDs

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  1| 
| m1| u2|  1| 
| m2| u1|  2| 
+-----------------+ 

RDD2: -

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  10| 
| m2| u1|  98| 
| m3| u2|  21| 
+-----------------+ 

Ich möchte die Summe von frequencies basierend auf mid und uid berechnen. Ergebnis sollte etwas wie sein:

 
+-----------------+ 
|mid|uid|frequency| 
+-----------------+ 
| m1| u1|  11| 
| m2| u1|  100| 
| m3| u2|  21| 
+-----------------+ 

Vielen Dank im Voraus.

EDIT: ich die Lösung auf diese Weise auch erreicht werden (Verwendung von map-reduce):

from pyspark.sql.functions import col 

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\ 
      .reduceByKey(lambda a, b: a+b) 

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF() 

p = p.select(col("_1").alias("mid"), \ 
      col("_2").alias("uid"), \ 
      col("_3").alias("frequency")) 

p.show() 

Ausgang:

 
+---+---+---------+ 
|mid|uid|frequency| 
+---+---+---------+ 
| m2| u1|  100| 
| m1| u1|  11| 
| m1| u2|  1| 
| m3| u2|  21| 
+---+---+---------+ 
+0

Sie können einige Python-Code schreiben, dieses Problem zu lösen. Wenn Sie das bereits versucht haben, sollten Sie die Frage bearbeiten und Ihren Code hinzufügen. –

+0

Sie haben eine Gruppe in Ihrer erwarteten Ausgabe verpasst – eliasah

+0

@ HåkenLid Wir können es normalerweise tun Python mit Pandas esp. Aber ich wollte eine pysparkspezifische Hilfe. – rootcss

Antwort

0

Ich erreichte die Lösung auch auf diese Weise (Us ing Karten reduzieren):

from pyspark.sql.functions import col 

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 
df4 = df3.map(lambda bbb: ((bbb['mid'], bbb['uid']), int(bbb['frequency'])))\ 
      .reduceByKey(lambda a, b: a+b) 

p = df4.map(lambda p: (p[0][0], p[0][1], p[1])).toDF() 

p = p.select(col("_1").alias("mid"), \ 
      col("_2").alias("uid"), \ 
      col("_3").alias("frequency")) 

p.show() 

Ausgang:

 
+---+---+---------+ 
|mid|uid|frequency| 
+---+---+---------+ 
| m2| u1|  100| 
| m1| u1|  11| 
| m1| u2|  1| 
| m3| u2|  21| 
+---+---+---------+ 
+0

Das einzige Problem mit dieser Lösung ist, dass Sie die gesamte Optimierung durch das Wolfram-Projekt über 'DataFrame' verlieren. http://stackoverflow.com/questions/31780677/efficient-pairrdd-operations-on-dataframe-with-spark-sql-group-by – eliasah

1

Sie müssen nur eine Gruppe von mittleren und uid durchführen und führe eine Summenoperation aus:

data1 = [("m1","u1",1),("m1","u2",1),("m2","u1",2)] 
data2 = [("m1","u1",10),("m2","u1",98),("m3","u2",21)] 
df1 = sqlContext.createDataFrame(data1,['mid','uid','frequency']) 
df2 = sqlContext.createDataFrame(data2,['mid','uid','frequency']) 

df3 = df1.unionAll(df2) 

df4 = df3.groupBy(df3.mid,df3.uid).sum() \ 
     .withColumnRenamed("sum(frequency)","frequency") 

df4.show() 

# +---+---+---------+ 
# |mid|uid|frequency| 
# +---+---+---------+ 
# | m1| u1|  11| 
# | m1| u2|  1| 
# | m2| u1|  100| 
# | m3| u2|  21| 
# +---+---+---------+ 
+0

Vielen Dank. :) – rootcss

Verwandte Themen