2017-03-05 4 views
0

Ich muss Pandas .apply (Funktion, Achse = 1) (um Zeilen weise Funktion) in pyspark anwenden. Da ich ein Neuling bin, bin ich mir nicht sicher, ob es entweder durch Kartenfunktion oder mithilfe von UDFs implementiert werden kann. Ich kann nirgendwo eine ähnliche Implementierung finden.Zeile weise Operationen oder UDF nach Zeile auf einem Datenrahmen in pyspark

Grundsätzlich möchte ich nur eine Zeile an eine Funktion übergeben, einige Operationen ausführen, um neue Spalten zu erstellen, die von den Werten der aktuellen und vorherigen Zeilen abhängen und dann modifizierte Zeilen zurückgeben, um einen neuen Datenrahmen zu erstellen. Einer der Funktion mit Pandas verwendet wird unten angegeben:

previous = 1 
def row_operation(row): 
    global previous 
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]): 
     current = 1 
    elif row["COL_C"] > cutoff: 
     current = previous +1 
    elif row["COL_C"]<=cutoff: 
     current = previous 
    else: 
     current = Nan 
    previous = current 
    return current 

Hier PREV_COL_A ist nichts anderes als col_a um 1 Zeile zurückgeblieben.

Bitte beachten Sie, dass diese Funktion die einfachste ist und Zeilen nicht zurückgibt, andere jedoch. Wenn mir jemand helfen könnte, Zeilenoperationen in pyspark zu implementieren, wäre das eine große Hilfe. TIA

Antwort

0

Sie könnten rdd.mapPartition verwenden. Sie erhalten einen Iterator über die Zeilen und Sie geben die Ergebniszeilen aus, die Sie zurückgeben möchten. Die Iterable, die Sie erhalten, wird Ihnen nicht erlauben, vorwärts oder rückwärts zu gehen, sondern einfach die nächste Zeile zurückgeben. Sie können jedoch Zeilen während der Verarbeitung speichern, um alles zu tun, was Sie tun müssen. Zum Beispiel

def my_cool_function(rows): 
    prev_rows = [] 

    for row in rows: 
     # Do some processing with all the rows, and return a result 
     yield my_new_row 

     if len(prev_rows) >= 2: 
      prev_rows = prev_rows[1:] 

     prev_rows.append(row) 

updated_rdd = rdd.mapPartitions(my_cool_function) 

Hinweis, habe ich eine Liste die Partitionen aus Gründen des Beispiels zu verfolgen, aber Python-Listen sind wirklich Arrays, die nicht effizient Kopf Push/Pop-Methoden, so dass Sie werden wahrscheinlich wollen verwenden eine tatsächliche Warteschlange.

Verwandte Themen