2016-11-16 1 views
0

Ich habe zwei Datenrahmen (tx_df und login_df). Die erste Spalte enthält die Spalten "player_id", "tx_id" und "tx_time", die zweite "player_id" und "login_time".PySpark verbindet zwei Datenrahmen mit dem nächstliegenden Zeitwert

Was ich tun möchte, ist, diese beiden Datenrahmen mit player_id Spalte zu verbinden, aber zusätzlich, nur die neueste Login-Zeile von login_df beizutreten. Zum Beispiel wird, wenn es wie folgt tx_df:

pid_1, txid_1, '2016-11-16 00:01:00' 
pid_1, txid_2, '2016-11-16 00:01:02' 
pid_1, txid_3, '2016-11-16 00:02:15' 
pid_1, txid_4, '2016-11-16 00:02:16' 
pid_1, txid_5, '2016-11-16 00:02:17' 

und login_df wie folgt aus:

pid_1, '2016-11-16 00:02:10' 
pid_1, '2016-11-16 00:00:55' 
pid_1, '2016-11-13 00:03:00' 
pid_1, '2016-11-10 16:30:00' 

I-Datenrahmen wollen resultierenden wie folgt aussehen:

pid_1, txid_1, '2016-11-16 00:01:00', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_2, '2016-11-16 00:01:02', pid_1, '2016-11-16 00:00:55' 
pid_1, txid_3, '2016-11-16 00:02:15', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_4, '2016-11-16 00:02:16', pid_1, '2016-11-16 00:02:10' 
pid_1, txid_5, '2016-11-16 00:02:17', pid_1, '2016-11-16 00:02:10' 

Ich bin nicht obligatorisch an Datenrahmen gebunden, so wird ein Hinweis, wie man es mit RDDs oder jedem anderen Ansatz schön macht, geschätzt.

Explosion der Daten ist, was ich fürchte, weil tx_df Tausende von Transaktionseinträgen für jede Spieler-ID (und dann Tausende von Spieler-IDs) haben kann, während Login_df auch unbekannte Anzahl von Spieler-Login-Informationen haben kann. Wenn Sie diese beiden Elemente mit player_id verbinden, würde dies zu einem enormen Datenrahmen führen, der auf ein kartesisches Produkt zurückzuführen ist, was nicht akzeptabel ist.

HINWEIS: Ich verwende Python API für Spark.

Antwort

0

Für eine zukünftige Referenz, habe ich es geschafft, dies mit einem etwas anderen Ansatz zu lösen. Ich hatte das Glück, dass der zweite Datenrahmen klein genug war, um ihn zu übertragen. Genauer gesagt habe ich hashmap von Werten ausgestrahlt, aber das ist nur, weil ich fand, dass es gut für den Zweck passt. (Siehe: broadcast variables in Spark)

Dann I iteriert über die Zeilen des ersten Datenrahmens wie diese

tx_df.rdd.map(my_map_function) 

und in my_map_function zugegriffen I hasmap ausgestrahlt, benötigt hat Sortier- und andere Operationen und wählte schließlich die I-Wert möchte an die Zeilen des ersten Datenrahmens anhängen.

Als netten Nebeneffekt, die hashmap von Werten zu übertragen, war ich in der Lage, die Verknüpfung von Datenrahmen zu entfernen und die Dinge zu beschleunigen. Bevor Sie dies tut, Skript hatte

  • Läden von Daten in Datenrahmen
  • Verbindungsdatenrahmen in große

Nach dieser Sendung Lösung nicht erforderlich Reihen von Rahmen großer Daten Ausfiltern Skript hat

  • Laden von Daten in Datenrahmen
  • Sendewerte der zweiten
  • nur über erste Iterieren, direkt Werte des zweiten Zugriff und sich auf die aktuelle Zeile

Filtern sind nicht erforderlich in dem zweiten Ansatz anhängen, weil geeignete Werte bereits sind abgeholt, damit die Skriptausführung schneller ist.

Verwandte Themen