2017-03-20 8 views
0

Ich arbeite mit Hive-Daten, die in ein Python-Jupyter-Notizbuch mit einem in Python geschriebenen Hive-Wrapper gezogen wurden. Ich habe Terabytes von Daten wie folgt aus:Sammeln nach Gruppe gruppiert?

Table 1: time=t1 
uid colA 
1  A 
1  B 
1  C 
2  A 
2  B 
3  C 
3  D 

Ich mag einen neuen Datenrahmen (PySpark/Pandas) aus den obigen Daten erstellen, die wie folgt aussehen:

Table 2: time=t1 
uid colA 
1  [A, B, C] 
2  [A, B] 
3  [C, D] 

wo colA wäre eine Liste von Saiten. Wie würde ich das tun? Ich habe gelesen über collect_set(), aber bin nicht vertraut mit seiner Verwendung oder Eignung.

Nach Table 2 zu schaffen, nehme ich für time=t2 einen anderen Tisch hatte:

Table 3: time=t2 
uid colA 
1  [A, B] 
2  [B] 
3  [C, D, E] 

Nun, ich möchte die eingestellte Differenz zwischen table 2 und table 3 berechnen. Es sollte 3 zurückgeben, da dies die Anzahl von Hinzufügungen/Löschungen ist, die benötigt werden, um von Tabelle 3 zu Tabelle 2 zu gelangen.

+0

Also, was haben yo Du hast es versucht? –

Antwort

2

Hier ist eine zusammengefasste Lösung für das Problem. Hoffe, das wird für dich funktionieren mit pyspark.

Globale Importe: -

import pyspark.sql.functions as F 
import pyspark.sql.types as T 

Tabelle 2 Creation Code: -

df1 = sc.parallelize([ 
     [1,'A'], [1,'B'], [1,'C'], [2,'A'], [2,'B'], [3, 'C'], [3,'D'] 
     ]).toDF(['uid', 'colA']).groupBy("uid").agg(F.collect_set("colA").alias("colA")) 

df1.show() 
+---+---------+ 
|uid|  colA| 
+---+---------+ 
| 1|[A, B, C]| 
| 2| [A, B]| 
| 3| [C, D]| 
+---+---------+ 

Tabelle 3 Creation Code: -

df2 = sc.parallelize([[1, ['A', 'B']],[2, ['B']],[3, ['C', 'D', 'E']]]).toDF(['uid', 'colA']) 
def diffUdfFunc(x,y): 
    return list(set(y).difference(set(x))) 

diffUdf = F.udf(diffUdfFunc,T.ArrayType(T.StringType())) 
finaldf = df1.withColumnRenamed("colA", "colA1").join(df2, "uid").withColumnRenamed("colA", "colA2").withColumn("diffCol", diffUdf(F.col("colA1"), F.col("colA2"))) 
finaldf.select("uid", F.col("diffCol").alias("colA")).where(F.size("colA") > 0).show() 
+---+----+ 
|uid|colA| 
+---+----+ 
| 3| [E]| 
+---+----+ 
Verwandte Themen