2017-06-28 2 views
2

Ich versuche, das Äquivalent von Sparks unpersist in Dask zu finden. Mein Bedürfnis für einen expliziten unpersist entsteht in einer Situation, in:Wie erreicht man inkrementelles Caching ohne Datenduplizierung in Dask?

  • Der Aufruf-Kontext bereits ein großes df, zum Beispiel bestanden hat, weil es braucht viele Aggregate zur Vorverarbeitung Zweck zu berechnen.
  • Der aufrufende Kontext ruft eine Funktion auf, die auch persistieren muss, z. B. , weil sie einen iterativen Algorithmus ausführt.

Ein einfaches Beispiel würde wie folgt aussehen:

def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     # Transformation logic requiring e.g. map_partitions 
     def mapper(df): 
      # ... 
      return df 

     df = df.map_partitions(mapper) 
     df = df.persist() 
     # Now I would like to explicitly unpersist the old snapshot 

    return df 

In Spark, könnte das Problem explizit durch Lösen der alten Schnappschüsse gelöst werden. Offenbar hat Dask keine explizite unpersist, sondern behandelt das Problem durch Referenzzählung der zugrunde liegenden Futures. Dies bedeutet, dass das obige Beispiel die Daten duplizieren würde, da der aufrufende Kontext Referenzen auf die alten Futures enthält, während die Unterfunktion Verweise auf die geänderten persist hält. In meinem tatsächlichen Anwendungsfall gibt es mehrere verschachtelte Ebenen solcher Transformationsaufrufe, die dazu führen, dass die Daten sogar mehrfach dupliziert werden.

Gibt es eine Möglichkeit, iteratives Caching ohne zusätzliche Kopien zu lösen?

Antwort

2

Ich werde einige Ideen, wie dies zu lösen, aber ich bin immer noch auf der Suche nach besseren Alternativen.

Aufgrund der Referenzzählung ist es schwierig, die Kopien zu vermeiden, aber dort sind Möglichkeiten. Das Problem entsteht, wenn der Anrufer eine Referenz an die ursprüngliche df hält und die Unterfunktion neue Instanzen über Aufrufe erstellt. Um das Problem zu lösen, müssten wir den Verweis auf df selbst veränderbar machen. Leider erlaubt es Python im Allgemeinen nicht, die Referenz von Funktionsargumenten zu mutieren.

Lösung 1: Naive änderbare Referenz

Die einfachste Art und Weise diese Einschränkung zu um Arbeit ist die df in eine Liste oder dict zu wickeln. In diesem Fall kann die Unterfunktion die externe Referenz modifizieren, z. von:

df_list[0] = df_list[0].map_partitions(mapper) 
df_list[0] = df_list[0].persist() 

Dies ist jedoch syntaktisch umständlich und man muss sehr vorsichtig sein, weil die Syntax vereinfacht über df = df_list[0] wieder neue Hinweise auf die zugrunde liegende Futures erzeugt, die Duplizierung von Daten führen kann.

Lösung 2: Wrapper-basierte änderbare Referenz

auf der Verbesserung könnte man eine kleine Wrapper-Klasse schreiben, die zu einem Datenrahmen einen Verweis hält.Wenn diese Unterfunktionen diesen Wrapper durchlaufen, können sie die Referenz mutieren. Um das Syntaxproblem zu verbessern, könnte man überlegen, ob der Wrapper die Funktionalität automatisch an den Datenrahmen delegieren oder davon erben soll. Insgesamt fühlt sich diese Lösung auch nicht richtig an.

Lösung 3: Explizite Mutation

Um die Syntax Probleme der anderen Lösungen vermeide ich zur Zeit am liebsten folgende Variante, die über eine Inplace Modifikation des ursprünglichen df simuliert effektiv wandelbar Versionen von map_partitions und persist Beispiel.

def modify_inplace(old_df, new_df): 
    # Currently requires accessing private fields of a DataFrame, but 
    # maybe this could be officially supported by Dask. 
    old_df.dask = new_df.dask 
    old_df._meta = new_df._meta 
    old_df._name = new_df._name 
    old_df.divisions = new_df.divisions 


def iterative_algorithm(df, num_iterations): 

    for iteration in range(num_iterations): 

     def mapper(df): 
      # Actual transform logic... 
      return df 

     # Simulate mutable/in-place map_partitions 
     new_df = df.map_partitions(mapper) 
     modify_inplace(df, new_df) 

     # Simulate mutable/in-place persist 
     new_df = df.persist() 
     modify_inplace(df, new_df) 

    # Technically no need to return, because all operations were in-place 
    return df 

Dies funktioniert für mich recht gut, erfordert aber sorgfältig diese Regeln zu befolgen:

  • alle unveränderlichen Anrufe wie df = df.<method> Ersetzen oben durch das Muster.
  • Achten Sie darauf, Referenzen auf df zu erstellen. Zum Beispiel erfordert die Verwendung einer Variablen wie some_col = df["some_sol"] für syntaktische Bequemlichkeit del some_col vor dem Anruf bestehen bleiben. Andernfalls wird die Referenz, die mit some_col gespeichert wird, erneut eine Datenduplizierung verursachen.
+1

Sie möchten 'df.divisions' ebenfalls in Ihre' modify_inplace' Funktion einfügen – MRocklin

2

Sie könnten eine Freigabefunktion wie folgt schreiben:

from distributed.client import futures_of 

def release(collection): 
    for future in futures_of(collection): 
     future.release() 

Dies wird die aktuelle Instanz nur freigeben. Wenn Sie mehrere Instanzen dieser Futures herumliegen könnten Sie es ein paar Mal anrufen oder eine Schleife wie folgt hinzu:

while future.client.refcount[future.key] > 0: 

Aber in der Regel diese mehrmals aufrufen, falls unklug scheint haben Sie andere Kopien im Umlauf mit Grund.