2016-10-02 5 views
1

Ich habe eine DataFrame mit folgenden einfachen schema:Verschwenkung mit fehlenden Werten

root 
|-- amount: double (nullable = true) 
|-- Date: timestamp (nullable = true) 

ich versuche, die sum Beträge pro Tag zu sehen und pro Stunde, einige wie:

+---+--------+--------+ ... +--------+ 
|day|  0|  1|  |  23| 
+---+--------+--------+ ... +--------+ 
|148| 306.0| 106.0|  |  0.0| 
|243| 1906.0| 50.0|  |  1.0| 
| 31| 866.0| 100.0|  |  0.0| 
+---+--------+--------+ ... +--------+ 

Well Zuerst fügte ich eine Spalte hour hinzu und dann gruppierte ich nach Tag und schwang um Stunde. Allerdings habe ich eine Ausnahme, die vielleicht mit fehlenden Verkäufen für einige Stunden zusammenhängt. Das versuche ich zu beheben, aber ich habe nicht realisiert, wie.

(df.withColumn("hour", hour("date")) 
    .groupBy(dayofyear("date").alias("day")) 
    .pivot("hour") 
    .sum("amount").show()) 

Ein Auszug der Ausnahme.

AnalysisException: u'resolved Attribut (e) Datum # 3972 fehlen Tag # 5367, # 5354 Stunden, Summe (Betrag) # 5437 in Operator Aggregate [dayofyear (cast (Datum # 3972 als Datum!))], [Tag des Jahres (Besetzung (Datum # 3972 als Datum)) AS Tag # 5367, Pivotfirst (Stunde # 5354, Summe (Betrag) # 5437, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 0, 0) AS __pivot_sum (Menge) AS Summe (Menge) # 5487]; '

Antwort

2

Das Problem ist ungelöst day Spalte. Sie können es schaffen außerhalb groupBy Klausel, dass zur Adresse:

df = (sc 
     .parallelize([ 
      (1.0, "2016-03-30 01:00:00"), (30.2, "2015-01-02 03:00:02")]) 
     .toDF(["amount", "Date"]) 
     .withColumn("Date", col("Date").cast("timestamp")) 
     .withColumn("hour", hour("date"))) 

with_day = df.withColumn("day", dayofyear("Date")) 
with_day.groupBy("day").pivot("hour", range(0, 24)).sum("amount") 

values Argument für pivot optional ist aber ratsam.