2017-12-19 10 views
0

Ich muss ein Äquivalent der aktuellen Ansicht in pyspark erstellen, habe ich eine Verlaufsdatei und eine Deltadatei (mit ID und Datum) .Ich muss endgültige Datenrahmen erstellen, die habe den einzelnen Datensatz für jede ID und dieser Datensatz sollte das späteste Datum haben.Pyspark - Erstellen von Äquivalenten der aktuellen Ansicht in pyspark

df1=sql_context.createDataFrame([("3000", "2017-04-19"), ("5000", "2017-04-19"), ("9012", "2017-04-19")], ["id", "date"]) 
df2=sql_context.createDataFrame([("3000", "2017-04-18"), ("5120", "2017-04-18"), ("1012", "2017-04-18")], ["id", "date"]) 

df3 = df2.union (DF1) .distinct()

+----+----------+ 
| id|  date| 
+----+----------+ 
|3000|2017-04-19| 
|3000|2017-04-18| 
|5120|2017-04-18| 
|5000|2017-04-19| 
|1012|2017-04-18| 
|9012|2017-04-19| 

+ ---- + ---------- +

Ich habe versucht zu tun eine union und mache eine distinct, es gibt mir ID = 3000 für beide die Daten, wo ich nur für ID = 300 für Datum = 2017-04-19

aufnehmen muss Selbst subtrahieren funktioniert nicht, da es alle Zeilen zurückgibt einer der dfs.

gewünschte Ausgabe: -

+----+----------+ 
| id|  date| 
+----+----------+ 
|3000|2017-04-19| 
| 
|5120|2017-04-18| 
|5000|2017-04-19| 
|1012|2017-04-18| 
|9012|2017-04-19| 
+----+----------+ 

Antwort

0

hoffe, das hilft!

from pyspark.sql.functions import unix_timestamp, col, to_date, max 

#sample data 
df1=sqlContext.createDataFrame([("3000", "2017-04-19"), 
           ("5000", "2017-04-19"), 
           ("9012", "2017-04-19")], 
           ["id", "date"]) 
df2=sqlContext.createDataFrame([("3000", "2017-04-18"), 
           ("5120", "2017-04-18"), 
           ("1012", "2017-04-18")], 
           ["id", "date"]) 
df=df2.union(df1) 
df.show() 

#convert 'date' column to date type so that latest date can be fetched for an ID 
df = df.\ 
    withColumn('date_inDateFormat',to_date(unix_timestamp(col('date'),"yyyy-MM-dd").cast("timestamp"))).\ 
    drop('date') 

#get latest date for an ID 
df = df.groupBy('id').agg(max('date_inDateFormat').alias('date')) 
df.show() 

Ausgang ist:

+----+----------+ 
| id|  date| 
+----+----------+ 
|5000|2017-04-19| 
|1012|2017-04-18| 
|5120|2017-04-18| 
|9012|2017-04-19| 
|3000|2017-04-19| 
+----+----------+ 

Hinweis: Bitte nicht zu let SO know vergessen, wenn die Antwort hilft Ihnen, Ihr Problem zu lösen.

+0

Dank Prem, funktioniert es definitiv und pyspark Drop doppelte Funktion funktioniert auch. – Shrikant

+0

Froh, dass es geholfen hat :) – Prem