2016-07-27 9 views
0

ich pyspark verwende und ich habe zwei Datenrahmen wie folgt aus:die nächste Zeit zwischen zwei Tabellen in Funken finden

user   time   bus 
A 2016/07/18 12:00:00 1 
B 2016/07/19 12:00:00 2 
C 2016/07/20 12:00:00 3 

bus   time   stop 
1 2016/07/18 11:59:40 sA 
1 2016/07/18 11:59:50 sB 
1 2016/07/18 12:00:05 sC 
2 2016/07/19 11:59:40 sB 
2 2016/07/19 12:00:10 sC 
3 2016/07/20 11:59:55 sD 
3 2016/07/20 12:00:10 sE 

Jetzt möchte ich bei denen wissen, die Anwenderberichte gemäß der Busnummer stoppen und die nächste Zeit in der zweiten Tabelle.

Zum Beispiel in Tabelle 1, berichtet Benutzer A am 2016/07/18 12:00:00 und er ist auf Bus Nr. 1, und nach der zweiten Tabelle gibt es drei Datensätze von Bus Nr. 1 , aber die nächste Zeit ist 2016/07/18 12:00:05 (der dritte Datensatz), so dass der Benutzer jetzt in SC ist.

Die gewünschte Ausgabe wie folgt sein sollte:

user   time   bus stop 
A 2016/07/18 12:00:00 1 sC 
B 2016/07/19 12:00:00 2 sC 
C 2016/07/20 12:00:00 3 sD 

ich die Zeit in Zeitstempel übertragen haben, so dass das einzige Problem ist, das am nächsten Zeitstempel zu finden, wo die Busnummer eqaul ist.

Da ich sql jetzt nicht kenne, habe ich versucht, die map-Funktion zu verwenden, um die nächste Zeit und ihren Stopp zu finden, was bedeutet, dass ich sqlContext.sql in der Map-Funktion verwenden muss, und funke nicht scheinen Zulassen:

Ausnahme: Scheint, dass Sie versuchen, SparkContext von einer Übertragungsvariable, einer Aktion oder einer Transformation zu verweisen. SparkContext kann nur für den Treiber verwendet werden, nicht für Code, der auf Workern ausgeführt wird. Weitere Informationen finden Sie unter SPARK-5063.

Also wie kann ich eine SQL-Abfrage schreiben, um die richtige Ausgabe zu erhalten?

Antwort

3

Dies kann mithilfe von Fensterfunktionen erfolgen.

from pyspark.sql.window import Window 
from pyspark.sql import Row, functions as W 

def tm(str): 
    return datetime.strptime(str, "%Y/%m/%d %H:%M:%S") 

#setup data 
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ] 
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2)) 
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3)) 

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ] 
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB")) 
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE")) 

#create RDD 
userDf = sc.parallelize(userTime).toDF().alias("usertime") 
busDf = sc.parallelize(busTime).toDF().alias("bustime") 

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
    userDf.user, 
    userDf.time.alias("user_time"), 
    busDf.bus, 
    busDf.time.alias("bus_time"), 
    busDf.stop) 

additional_cols = joinedDF.withColumn("bus_time_diff", abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time")))) 

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff")).alias("rank")).filter(col("rank") == 1) 


additional_cols.show(20,False) 
partDf.show(20,False) 

Ausgang:

+----+---------------------+---+---------------------+----+-------------+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff| 
+----+---------------------+---+---------------------+----+-------------+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:40.0|sA |20   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:50.0|sB |10   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 11:59:40.0|sB |20   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 12:00:10.0|sE |10   | 
+----+---------------------+---+---------------------+----+-------------+ 
+----+---------------------+---+---------------------+----+-------------+----+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff|rank| 
+----+---------------------+---+---------------------+----+-------------+----+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   |1 | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   |1 | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   |1 | 
+----+---------------------+---+---------------------+----+-------------+----+ 
+0

Es war sehr nett von Ihnen, mein Problem zu lösen, vielen Dank! – Finn

+0

Gern geschehen !! –

Verwandte Themen