2016-11-22 3 views
3

https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html schön erklären, wie ein Pivot für Funken funktioniert.funken pivot ohne Aggregation

In meinem Python-Code verwende ich Pandas ohne Aggregation, aber setzen Sie den Index und kommen:

pd.pivot_table(data=dfCountries, index=['A'], columns=['B']) 
countryToMerge.index.name = 'ISO' 
df.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner') 

Wie funktioniert das in Funken?

I Gruppe versucht und wie manuell join:

val grouped = countryKPI.groupBy("A").pivot("B") 
df.join(grouped, df.col("ISO") === grouped.col("ISO")).show 

aber das funktioniert nicht. Wie würde der reset_index in den Funken passen/Wie würde er auf native Weise umgesetzt werden?

bearbeiten

ein minimales Beispiel für den Python-Code:

import pandas as pd 
from datetime import datetime, timedelta 
import numpy as np 
dates = pd.DataFrame([(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(10)], columns=["dates"]) 
isos = pd.DataFrame(["ABC", "POL", "ABC", "POL","ABC", "POL","ABC", "POL","ABC", "POL"], columns=['ISO']) 
dates['ISO'] = isos.ISO 
dates['ISO'] = dates['ISO'].astype("category") 
countryKPI = pd.DataFrame({'country_id3':['ABC','POL','ABC','POL'], 
         'indicator_id':['a','a','b','b'], 
         'value':[7,8,9,7]}) 
countryToMerge = pd.pivot_table(data=countryKPI, index=['country_id3'], columns=['indicator_id']) 
countryToMerge.index.name = 'ISO' 
print(dates.merge(countryToMerge['value'].reset_index(), on='ISO', how='inner')) 

    dates ISO a b 
0 2016-01-01 ABC 7 9 
1 2016-01-03 ABC 7 9 
2 2016-01-05 ABC 7 9 
3 2016-01-07 ABC 7 9 
4 2016-01-09 ABC 7 9 
5 2016-01-02 POL 8 7 
6 2016-01-04 POL 8 7 
7 2016-01-06 POL 8 7 
8 2016-01-08 POL 8 7 
9 2016-01-10 POL 8 7 

folgen zusammen in scala/Funken

val dates = Seq(("2016-01-01", "ABC"), 
    ("2016-01-02", "ABC"), 
    ("2016-01-03", "POL"), 
    ("2016-01-04", "ABC"), 
    ("2016-01-05", "POL"), 
    ("2016-01-06", "ABC"), 
    ("2016-01-07", "POL"), 
    ("2016-01-08", "ABC"), 
    ("2016-01-09", "POL"), 
    ("2016-01-10", "ABC") 
).toDF("dates", "ISO") 
    .withColumn("dates", 'dates.cast("Date")) 

    dates.show 
    dates.printSchema 

    val countryKPI = Seq(("ABC", "a", 7), 
    ("ABC", "b", 8), 
    ("POL", "a", 9), 
    ("POL", "b", 7) 
).toDF("country_id3", "indicator_id", "value") 

    countryKPI.show 
    countryKPI.printSchema 

val grouped = countryKPI.groupBy("country_id3").pivot("indicator_id") 

Antwort

0

Der folgende Ausschnitt scheint zu funktionieren - aber ich bin nicht sicher, wenn eine Aggregation durch avg korrekt ist - obwohl "passende Zahlen" die Ausgabe sind.

countryKPI.groupBy("country_id3").pivot("indicator_id").avg("value").show 

Ich bin mir nicht sicher, ob dies für eine größere Datenmenge „ineffizient“ ist (avg) im Vergleich zu nur die Werte Wiederverwendung (wie ich will nicht aggregieren).

+0

sogar ich suche nach Pivot-Funktion ohne Aggregation. @Georg Heiler, hast du inzwischen etwas gefunden? – user3560220

+0

Nur was hier gepostet wird. Unglücklicherweise. –

Verwandte Themen