0

Ich muss eine komplexe UDF schreiben, in der ich eine Verbindung mit einer anderen Tabelle machen muss, und die Anzahl der Übereinstimmungen zurückgeben. Der eigentliche Anwendungsfall ist viel komplexer, aber ich habe hier den Fall auf minimalen reproduzierbaren Code vereinfacht. Hier ist der UDF-Code.Spark: Join innerhalb UDF oder Kartenfunktion

def predict_id(date,zip): 
    filtered_ids = contest_savm.where((F.col('postal_code')==zip) & (F.col('start_date')>=date)) 
    return filtered_ids.count() 

Wenn ich die UDF mit dem folgenden Code definieren, erhalte ich eine lange Liste von Konsole Fehler:

predict_id_udf = F.udf(predict_id,types.IntegerType()) 

Die letzte Zeile des Fehlers ist:

py4j.Py4JException: Method __getnewargs__([]) does not exist 

Ich möchte zu wissen, was der beste Weg ist. Ich habe auch versucht map wie folgt aus:

result_rdd = df.select("party_id").rdd\ 
    .map(lambda x: predict_id(x[0],x[1]))\ 
    .distinct() 

Es ist auch in einer ähnlichen Abschlussfehler. Ich möchte wissen, wenn es überhaupt gibt, kann ich eine Verbindung innerhalb der UDF- oder Kartenfunktion für jede Reihe des ursprünglichen Datenrahmens tun.

Antwort

0

Ich muss eine komplexe UDF schreiben, in der ich eine Verbindung mit einer anderen Tabelle machen muss, und die Anzahl der Übereinstimmungen zurückgeben.

Dies ist nicht möglich. Wenn Sie einen solchen Effekt erzielen möchten, müssen Sie High-Level-DF/RDD-Operatoren verwenden:

df.join(ontest_savm, 
    (F.col('postal_code')==df["zip"]) & (F.col('start_date') >= df["date"]) 
).groupBy(*df.columns).count()