2017-08-21 1 views
0

Ich habe einen Datenrahmen, die wie unten einige Spalten:Wie eine Zelle/s-Wert ändern, basierend auf einem Zustand, in Pyspark Datenrahmen

 category| category_id| bucket| prop_count| event_count | accum_prop_count | accum_event_count 
----------------------------------------------------------------------------------------------------- 
nation | nation  | 1  | 222  |  444  | 555    | 6677

Diese Datenrahmen von 0 Zeilen beginnen und jede Funktion von meinem Skript fügt eine Zeile dazu.

Es gibt eine Funktion, die basierend auf der Bedingung 1 oder 2 Zellenwerte ändern muss. Wie macht man das?

Code:

schema = StructType([StructField("category", StringType()), StructField("category_id", StringType()), StructField("bucket", StringType()), StructField("prop_count", StringType()), StructField("event_count", StringType()), StructField("accum_prop_count",StringType())]) 
a_df = sqlContext.createDataFrame([],schema) 

a_temp = sqlContext.createDataFrame([("nation","nation",1,222,444,555)],schema) 
a_df = a_df.unionAll(a_temp) 

Zeilen aus einer anderen Funktion hinzugefügt:

a_temp3 = sqlContext.createDataFrame([("nation","state",2,222,444,555)],schema) 
a_df = a_df.unionAll(a_temp3) 

nun zu ändern, Ich versuche, ein mit einer Join-Bedingung.

a_temp4 = sqlContext.createDataFrame([("state","state",2,444,555,666)],schema) 
a_df = a_df.join(a_temp4, [(a_df.category_id == a_temp4.category_id) & (some other cond here)], how = "inner") 

Aber dieser Code funktioniert nicht. Ich erhalte einen Fehler:

 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 
|category|category_id|bucket|prop_count|event_count|accum_prop_count|category|category_id|bucket|prop_count|event_count|accum_prop_count| 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 
| nation|  state|  2|  222|  444|    555| state|  state|  2|  444|  555|    666| 
+--------+-----------+------+----------+-----------+----------------+--------+-----------+------+----------+-----------+----------------+ 

Wie behebt man das? Die korrekte Ausgabe sollte 2 Zeilen haben und die zweite Zeile sollte einen aktualisierten Wert haben

Antwort

1

1). Ein innerer Join löscht Zeilen aus Ihrem ursprünglichen Datenrahmen, wenn Sie die gleiche Anzahl von Zeilen wie a_df (auf der linken Seite) haben möchten, benötigen Sie einen linken Join.

2). Eine == Bedingung wird Spalten duplizieren, wenn Ihre Spalten die gleichen Namen haben, können Sie stattdessen eine Liste verwenden.

3). Ich stelle mir vor "eine andere Bedingung" bezieht sich auf bucket

4). Sie wollen den Wert von a_temp4 zu halten, wenn es vorhanden ist (die ihre Werte bei null beitreten werden gesetzt, wenn dies nicht der Fall), psf.coalesce ermöglicht es Ihnen, diese

import pyspark.sql.functions as psf 
a_df = a_df.join(a_temp4, ["category_id", "bucket"], how="leftouter").select(
    psf.coalesce(a_temp4.category, a_df.category).alias("category"), 
    "category_id", 
    "bucket", 
    psf.coalesce(a_temp4.prop_count, a_df.prop_count).alias("prop_count"), 
    psf.coalesce(a_temp4.event_count, a_df.event_count).alias("event_count"), 
    psf.coalesce(a_temp4.accum_prop_count, a_df.accum_prop_count).alias("accum_prop_count") 
    ) 

+--------+-----------+------+----------+-----------+----------------+ 
|category|category_id|bucket|prop_count|event_count|accum_prop_count| 
+--------+-----------+------+----------+-----------+----------------+ 
| state|  state|  2|  444|  555|    666| 
| nation|  nation|  1|  222|  444|    555| 
+--------+-----------+------+----------+-----------+----------------+ 

zu tun, wenn Sie nur mit einer Zeile arbeiten Datenrahmen, die Sie in Betracht ziehen sollten, das Update direkt zu codieren, anstatt Join zu verwenden:

def update_col(category_id, bucket, col_name, col_val): 
    return psf.when((a_df.category_id == category_id) & (a_df.bucket == bucket), col_val).otherwise(a_df[col_name]).alias(col_name) 

a_df.select(
    update_col("state", 2, "category", "nation"), 
    "category_id", 
    "bucket", 
    update_col("state", 2, "prop_count", 444), 
    update_col("state", 2, "event_count", 555), 
    update_col("state", 2, "accum_prop_count", 666) 
).show() 
Verwandte Themen