2017-03-16 3 views
1

Ich habe einen Datenrahmen wie untenPyspark Dataframe-Gruppe durch Filterung

cust_id req req_met 
------- --- ------- 
1   r1  1 
1   r2  0 
1   r2  1 
2   r1  1 
3   r1  1 
3   r2  1 
4   r1  0 
5   r1  1 
5   r2  0 
5   r1  1 

Ich habe bei den Kunden zu sehen, wie viele Anforderungen sie haben, und sehen, ob sie mindestens einmal getroffen hat. Es kann mehrere Datensätze mit demselben Kunden und derselben Anforderung geben, eine mit erfüllt und nicht erfüllt. Im obigen Fall sollte mein Ausgang

sein
cust_id 
------- 
    1 
    2 
    3 

Was ich getan habe, ist

say initial dataframe is df 
df1 = df.groupby('cust_id').countdistinct('req').alias('num_of_req').sum('req_met').alias('sum_req_met') 

df2 = df1.filter(df1.num_of_req == df1.sum_req_met) 

Aber in einigen Fällen ist es nicht korrekte Ergebnisse

bekommen Wie dies geschehen kann?

Antwort

4

Zuerst werde ich nur über Spielzeug-Datensatz aus gegebenen vorbereiten,

from pyspark.sql.functions import col 
import pyspark.sql.functions as fn 

df = spark.createDataFrame([[1, 'r1', 1], 
[1, 'r2', 0], 
[1, 'r2', 1], 
[2, 'r1', 1], 
[3, 'r1', 1], 
[3, 'r2', 1], 
[4, 'r1', 0], 
[5, 'r1', 1], 
[5, 'r2', 0], 
[5, 'r1', 1]], schema=['cust_id', 'req', 'req_met']) 
df = df.withColumn('req_met', col("req_met").cast(IntegerType())) 
df = df.withColumn('cust_id', col("cust_id").cast(IntegerType())) 

ich das gleiche tun, indem Gruppe von cust_id und req dann die req_met zählen. jene Anforderung Danach erstellen I Funktion zum Boden nur 0, 1

def floor_req(r): 
    if r >= 1: 
     return 1 
    else: 
     return 0 
udf_floor_req = udf(floor_req, IntegerType()) 
gr = df.groupby(['cust_id', 'req']) 
df_grouped = gr.agg(fn.sum(col('req_met')).alias('sum_req_met')) 
df_grouped_floor = df_grouped.withColumn('sum_req_met', udf_floor_req('sum_req_met')) 

Jetzt können wir überprüfen, ob jeder Kunde erfüllt erfüllt alle Anforderung durch Zählen unterschiedliche Anzahl von Anforderung und Gesamtzahl der Anforderung.

df_req = df_grouped_floor.groupby('cust_id').agg(fn.sum('sum_req_met').alias('sum_req'), 
               fn.count('req').alias('n_req')) 

Schließlich müssen Sie nur überprüfen, ob zwei Spalten gleich sind:

df_req.filter(df_req['sum_req'] == df_req['n_req'])[['cust_id']].orderBy('cust_id').show() 
0
select cust_id from 
(select cust_id , MIN(sum_value) as m from 
(select cust_id,req ,sum(req_met) as sum_value from <data_frame> group by cust_id,req) 
temp group by cust_id)temp1 
where m>0 ; 

Dies gibt gewünschtes Ergebnis

+0

Dank für die Lösung. Ich sah eher wie ein Datenrahmen aus –

Verwandte Themen