2016-08-04 2 views
1

Ich habe ein Spark-Datenframe (mit der Scala-Schnittstelle), die Spalten von Timestamp, Asset (eine Zeichenfolge), Tag (eine Zeichenfolge) und Wert (ein Doppel). Hier ist ein Auszug davon:Filter Spark DataFrame wenn zwischen jeder Menge von Punkten in der Liste

+--------------------+-----+--------+-------------------+ 
|   timestamp|asset|  tag|    value| 
+--------------------+-----+--------+-------------------+ 
|2013-01-03 23:36:...| G4| BTGJ2_2|  116.985626221| 
|2013-01-15 00:36:...| G4| TTXD1_6|  66.887382507| 
|2013-01-05 13:03:...| G4|TTXD1_22|  40.913497925| 
|2013-01-12 04:43:...| G4|TTXD1_23|  60.834510803| 
|2013-01-08 17:54:...| G4| LTB1D|  106.534744263| 
|2013-01-02 04:15:...| G4| WEXH|  255.981292725| 
|2013-01-07 10:54:...| G4| BTTA1_7|  100.743843079| 
|2013-01-05 11:29:...| G4| CDFH_10|  388.560668945| 
|2013-01-10 09:10:...| G4| LTB1D|  112.226242065| 
|2013-01-13 15:09:...| G4|TTXD1_15|  63.970848083| 
|2013-01-15 01:23:...| G4| TTIB|  67.993904114| 

Ich habe auch ein Array[List[Timestamp]], wobei jede List der Größe ist zwei und hält Start- und Endzeiten für Intervalle von Interesse endet. Zum Beispiel:

event_times: Array[List[java.sql.Timestamp]] = Array(List(2013-01-02 00:00:00.0, 2013-01-02 12:00:00.0), List(2013-01-10 00:00:00.0, 2013-01-12 06:00:00.0)) 

hält zwei Intervalle von Interesse: eine von Mitternacht bis 12:00 Uhr am 2013.01.02 und eine weitere von Mitternacht 2013.01.10 bis 6:00 Uhr am 2013.01.12

Hier ist meine Frage: Wie kann ich den Datenrahmen filtern, um Werte zurückzugeben, so dass der Zeitstempel in beliebig der Intervalle ist? Für jeden ein Intervall, kann ich

df.filter(df("timestamp").between(start, end)) 

tun Da ich weiß nicht, wie viele Elemente in den Array (wie viele Intervalle ich habe), kann ich nicht einfach nur eine lange Reihe von Filtern.

Für das obige Beispiel würde ich Reihen behalten möchten 4, 6 und 9.

Was ich habe, ist jetzt eine Schleife über die Array, und bin für jeden die passende Teilmenge bekommen. Aber das ist wahrscheinlich langsamer, als alles in einem großen Filter zu haben, oder?

+1

Entschuldigung, ich habe nicht bekommen, was Sie wollen. Können Sie ein klares Beispiel geben? –

+0

@ThiagoBaldim Ich habe gerade mit einem Beispiel aktualisiert. – kgully

Antwort

3

Sie können Ihre Zeitstempelliste in einen DataFrame konvertieren und ihn mit Ihrem ursprünglichen Dataframe an entsprechenden Zeitstempeln verbinden. Ich habe ein einfaches Beispiel zur Veranschaulichung dieses Prozesses erstellt:

//Dummy data 
val data = List(
    ("2013-01-02 00:30:00.0", "116.985626221"), 
    ("2013-01-03 00:30:00.0", "66.887382507"), 
    ("2013-01-11 00:30:00.0", "12.3456") 
) 

//Convert data to DataFrame 
val dfData = sc.parallelize(data).toDF("timestamp", "value") 

//Timestamp intervals list 
val filterList = Array(
    List("2013-01-02 00:00:00.0", "2013-01-02 12:00:00.0"), 
    List("2013-01-10 00:00:00.0", "2013-01-12 06:00:00.0") 
) 

//Convert the intervals list to a DataFrame 
val dfIntervals = sc.parallelize(
    filterList.map(l => (l(0),l(1))) 
).toDF("start_ts","end_ts") 

//Join both dataframes (inner join, since you only want matching rows) 
val joined = dfData.as("data").join(
    dfIntervals.as("inter"), 
    $"data.timestamp".between($"inter.start_ts", $"inter.end_ts") 
) 
Verwandte Themen