Ich versuche, das Äquivalent von Sparks unpersist
in Dask zu finden. Mein Bedürfnis für einen expliziten unpersist entsteht in einer Situation, in:Wie erreicht man inkrementelles Caching ohne Datenduplizierung in Dask?
- Der Aufruf-Kontext bereits ein großes
df
, zum Beispiel bestanden hat, weil es braucht viele Aggregate zur Vorverarbeitung Zweck zu berechnen. - Der aufrufende Kontext ruft eine Funktion auf, die auch persistieren muss, z. B. , weil sie einen iterativen Algorithmus ausführt.
Ein einfaches Beispiel würde wie folgt aussehen:
def iterative_algorithm(df, num_iterations):
for iteration in range(num_iterations):
# Transformation logic requiring e.g. map_partitions
def mapper(df):
# ...
return df
df = df.map_partitions(mapper)
df = df.persist()
# Now I would like to explicitly unpersist the old snapshot
return df
In Spark, könnte das Problem explizit durch Lösen der alten Schnappschüsse gelöst werden. Offenbar hat Dask keine explizite unpersist
, sondern behandelt das Problem durch Referenzzählung der zugrunde liegenden Futures. Dies bedeutet, dass das obige Beispiel die Daten duplizieren würde, da der aufrufende Kontext Referenzen auf die alten Futures enthält, während die Unterfunktion Verweise auf die geänderten persist hält. In meinem tatsächlichen Anwendungsfall gibt es mehrere verschachtelte Ebenen solcher Transformationsaufrufe, die dazu führen, dass die Daten sogar mehrfach dupliziert werden.
Gibt es eine Möglichkeit, iteratives Caching ohne zusätzliche Kopien zu lösen?
Sie möchten 'df.divisions' ebenfalls in Ihre' modify_inplace' Funktion einfügen – MRocklin