2016-11-02 13 views
5

Angenommen, ich habe zwei PySpark DataFrames df1 und df2.Pyspark Dataframe Funktion auf zwei Spalten anwenden

df1= 'a' 
     1  
     2  
     5  

df2= 'b' 
     3 
     6 

Und ich möchte den nächsten df2['b'] Wert für jeden df1['a'], zu finden und die nächsten Werte als neue Spalte in df1 hinzuzufügen.

Mit anderen Worten, für jeden Wert x in df1['a'], möchte ich ein y zu finden, die min(abx(x-y)) für alle y in df2['b'] erreicht (Anmerkung: kann davon ausgehen, dass es nur eine y ist, die den Mindestabstand erreichen kann), und das Ergebnis wäre sein

'a' 'b' 
1  3 
2  3 
5  6 

ich habe versucht, den folgenden Code ein Distanzmatrix zuerst (bevor Sie die Werte der Suche nach Erreichen der Mindestabstand) zu erstellen:

from pyspark.sql.types import IntegerType 
from pyspark.sql.functions import udf 

def dict(x,y): 
    return abs(x-y) 
udf_dict = udf(dict, IntegerType()) 

sql_sc = SQLContext(sc) 
udf_dict(df1.a, df2.b) 

die

Column<PythonUDF#dist(a,b)> 

Dann gibt habe ich versucht,

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b)) 

, die für immer läuft ohne Fehler/Ausgabe zu geben.

Meine Fragen sind:

  1. Da ich neu bin Spark, ist meine Art und Weise effizient die Ausgabe Datenrahmen zu konstruieren? (Mein Weg wäre das Erstellen einer Abstandsmatrix für alle a und b Werte zuerst, und dann finden Sie die min eins)
  2. Was ist falsch mit der letzten Zeile meines Codes und wie man es beheben?

Antwort

5

mit Ihrer zweiten Frage starten - Sie UDF nur auf bestehenden Datenrahmen anwenden können, ich denke du so etwas dacht:

>>> df1.join(df2).withColumn('distance', udf_dict(df1.a, df2.b)).show() 
+---+---+--------+ 
| a| b|distance| 
+---+---+--------+ 
| 1| 3|  2| 
| 1| 6|  5| 
| 2| 3|  1| 
| 2| 6|  4| 
| 5| 3|  2| 
| 5| 6|  1| 
+---+---+--------+ 

Aber es ist ein effizienter Weg, um diese Distanz gelten, durch interne abs:

>>> from pyspark.sql.functions import abs 
>>> df1.join(df2).withColumn('distance', abs(df1.a -df2.b)) 

Dann können Sie matching numbers finden, indem die Berechnung:

>>> distances = df1.join(df2).withColumn('distance', abs(df1.a -df2.b)) 
>>> min_distances = distances.groupBy('a').agg(min('distance').alias('distance')) 
>>> distances.join(min_distances, ['a', 'distance']).select('a', 'b').show() 
+---+---+                  
| a| b| 
+---+---+ 
| 5| 6| 
| 1| 3| 
| 2| 3| 
+---+---+ 
Verwandte Themen