Ich habe einen Pyspark DataFrame, den ich mit einer Funktion, die zeilenweise Operationen ausführt, aggregieren möchte.Zeilenweise Aggregation eines PySpark DataFrame
Ich habe 4 Spalten und für jeden eindeutigen Wert in Spalte AI habe die Zeile-für-Zeile-Aggregation in den Spalten B, C, D
Ich verwende diese Methode zu tun:
erhalten eindeutige Werte in A mit
A_uniques = df.select('A').distinct()
def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) for i in y.shape[0]: y[i,1] = y[i-1,0] y[i,0] = (y[i,0]+y[i,2])/y[i,3] agg = sum(y[:,1]) return agg
A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))
ich diese Störung erhalte:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o64.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)
Gibt es eine Lösung numpy Arrays in RDDs zu retten? Oder kann ich diese gesamte Operation auf andere Weise machen?
können Sie Beispiel Eingabe und Ausgabe, so dass wir einen anderen Ansatz versuchen können. –
Ich denke, Sie suchen nach 'groupby ('col'). Agg (sum (col2))' –
Das Problem, das Sie haben, ist, dass Sie referenzieren und RDD innerhalb einer RDD-Transformation. Wenn Ihre Aggregation eingebaute Pyspark-Funktionen verwendet, können Sie DataFrame 'groupby (...). Agg (...)' verwenden. Wenn nicht, dann müssen Sie möglicherweise rdd 'groupby' und eine maßgeschneiderte Aggregation verwenden. – ags29