2016-06-06 16 views
1

Ich habe ein paar Datenrahmen wie folgt aus:Spalten aus verschiedenen Datenrahmen Zieldatenrahmen in PySpark

rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)]) 
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")]) 
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"]) 
df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"]) 

df_tg.show() 


+---+----+-------+-------+ 
| id|type|route_a|route_b| 
+---+----+-------+-------+ 
| 0|10 |  A|  2| 
| 1|20 |  B|  1| 
| 2|30 |  A|  2| 
+---+----+-------+-------+ 

df_data.show() 

+---+----+----+------+ 
| id|type|cost| date| 
+---+----+----+------+ 
| 0|10 | 223|201603| 
| 0|10 | 83 |201602| 
| 1|20 |3003|201601| 
| 1|20 |3213|201602| 
| 1|20 |9872|201603| 
| 2|30 | 10|201602| 
| 2|30 | 62|201601| 
| 2|40 |2321|201601| 
+---+----+----+------+ 

Also muss ich die Spalten wie folgt hinzu:

+---+----+-------+-------+-----------+-----------+-----------+ 
| id|type|route_a|route_b|cost_201603|cost_201602|cost_201601| 
+---+----+-------+-------+-----------+-----------+-----------+ 
| 0|10 |  A|  2|  223 | 83  |  None| 
| 1|20 |  B|  1|  9872 |  3213 |  3003| 
| 2|30 |  A|  2|  None | 10  |   62| 
+---+----+-------+-------+-----------+-----------+-----------+ 

Dafür hätte ich zu tun, schließt sich ein paar:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"]) 

und damit hätte ich, um die Spalten zu benennen, um sie nicht zu überschreiben:

df_tg = df_tg.join(df_data[df_data.date == "201603"], ["id", "type"]).withColumnRenamed("cost","cost_201603") 

Ich kann eine Funktion schreiben, dies zu tun, aber ich würde sowohl eine Schleife durch die verfügbaren Termine haben und die Spalten, Erzeugen Tonne mit voller Tabellen-Scans verbindet:

def feature_add(df_target, df_feat, feat_cols, period): 
    for ref_month in period: 
     df_target = df_target.join(df_feat, ["id", "type"]).select(
       *[df_target[column] for column in df_target.columns] + [df_feat[feat_col]] 
       ).withColumnRenamed(feat_col, feat_col + '_' + ref_month) 
    return df_target 

df_tg = feature_add(df_tg, df_data, ["cost"], ["201602", "201603", "201601"]) 

Dies funktioniert, aber es ist schrecklich. Wie kann ich diese Spalten hinzufügen, auch wenn ich dieselbe Funktion für andere Datenframes aufruft? Beachten Sie, dass die Spalten nicht perfekt ausgerichtet sind und ich einen inneren Join ausführen muss.

Antwort

3

Ich würde vorschlagen, Pivot-Funktionen wie folgt zu verwenden:

from pyspark.sql.functions import * 

rdd_1 = sc.parallelize([(0,10,"A",2), (1,20,"B",1), (2,30,"A",2)]) 
rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,3213,"201602"),(1,20,3003,"201601"), (1,20,9872,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, 2321,"201601")]) 
df_tg = sqlContext.createDataFrame(rdd_1, ["id", "type", "route_a", "route_b"]) 
df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"]) 

pivot_df_data = df_data.groupBy("id","type").pivot("date").agg({"cost" : "sum"}) 

pivot_df_data.join(df_tg, ['id','type'], 'inner').select('id','type','route_a','route_b','201601','201602','201603','2016032').show() 

# +---+----+-------+-------+------+------+------+-------+ 
# | id|type|route_a|route_b|201601|201602|201603|2016032| 
# +---+----+-------+-------+------+------+------+-------+ 
# | 0| 10|  A|  2| 223| null| null|  83| 
# | 1| 20|  B|  1| 3003| 3213| 9872| null| 
# | 2| 30|  A|  2| null| 10| null| null| 
# +---+----+-------+-------+------+------+------+-------+ 
Verwandte Themen