Angenommen, ich habe zwei PySpark DataFrames df1
und df2
.Pyspark Dataframe Funktion auf zwei Spalten anwenden
df1= 'a'
1
2
5
df2= 'b'
3
6
Und ich möchte den nächsten df2['b']
Wert für jeden df1['a']
, zu finden und die nächsten Werte als neue Spalte in df1
hinzuzufügen.
Mit anderen Worten, für jeden Wert x
in df1['a']
, möchte ich ein y
zu finden, die min(abx(x-y))
für alle y in df2['b']
erreicht (Anmerkung: kann davon ausgehen, dass es nur eine y
ist, die den Mindestabstand erreichen kann), und das Ergebnis wäre sein
'a' 'b'
1 3
2 3
5 6
ich habe versucht, den folgenden Code ein Distanzmatrix zuerst (bevor Sie die Werte der Suche nach Erreichen der Mindestabstand) zu erstellen:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
die
Column<PythonUDF#dist(a,b)>
Dann gibt habe ich versucht,
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
, die für immer läuft ohne Fehler/Ausgabe zu geben.
Meine Fragen sind:
- Da ich neu bin Spark, ist meine Art und Weise effizient die Ausgabe Datenrahmen zu konstruieren? (Mein Weg wäre das Erstellen einer Abstandsmatrix für alle
a
undb
Werte zuerst, und dann finden Sie diemin
eins) - Was ist falsch mit der letzten Zeile meines Codes und wie man es beheben?