2017-07-01 2 views
0

Ich habe eine CSV-Datei mit einer Struktur, die der folgenden ähnelt.Durchschnittspreis für ein großes Dataset über Zeitintervalle erhalten

INDEX,SYMBOL,DATETIMETS,PRICE,SIZE 
0,A,2002-12-02 9:30:20,19.75,30200 
1,A,2002-12-02 9:30:22,19.75,100 
2,A,2002-12-02 9:30:22,19.75,300 
3,A,2002-12-02 9:30:22,19.75,100 
4,A,2002-12-02 9:30:23,19.75,100 
5,A,2002-12-02 9:30:23,19.75,100 
6,A,2002-12-02 9:30:23,19.75,100 
7,A,2002-12-02 9:30:23,19.75,100 
....... 
....... 

Es gibt über eine Million Zeilen, die sich über mehrere Jahre erstrecken. Ich habe diese CSV-Datei in einen Spark-Datenframe (Pyspark) geladen. Was ist der schnellste Weg für mich, den Durchschnitt des Preises in 5-Minuten-Intervallen zu erhalten?

Was ich gerade mache ist Schleifen durch den gesamten Datensatz und Abfrage der Zeit in 5-Minuten-Intervallen. z.B.

filteredSqlString = ("SELECT PRICE FROM DF WHERE DATETIMETS >= '" + str(sdt) + "'" 
         + " AND DATETIMETS < '" + str(idt) + "'") 
filtered_df = sqlContext.sql(filteredSqlString); 
MEAN_PRICE = filtered_df.select([mean("PRICE")]).first()[0]; 

und dies durch Erhöhen Startdatetime und Ende Datetime in einer Schleife läuft immer nimmt

sdt = idt; 
idt = sdt + timedelta(minutes=5); 

Dieser Ansatz. Gab es einen schnelleren Weg, dies zu erreichen?

Antwort

1

Ich denke, das sollte eine viel bessere Lösung sein.

eine Eingabe Gegeben:

schema = StructType([ 
    StructField("INDEX", IntegerType(), True), 
    StructField("SYMBOL", StringType(), True), 
    StructField("DATETIMETS", StringType(), True), 
    StructField("PRICE", DoubleType(), True), 
    StructField("SIZE", IntegerType(), True), 
]) 

df = spark\ 
    .createDataFrame(
     data=[(0,'A','2002-12-02 9:30:20',19.75,30200), 
      (1,'A','2002-12-02 9:31:20',19.75,30200), 
      (2,'A','2002-12-02 9:35:20',19.75,30200), 
      (3,'A','2002-12-02 9:36:20',1.0,30200), 
      (4,'A','2002-12-02 9:41:20',20.0,30200), 
      (4,'A','2002-12-02 9:42:20',40.0,30200), 
      (5,'A','2003-12-02 11:28:20',19.75,30200), 
      (6,'A','2003-12-02 11:31:20',19.75,30200), 
      (7,'A','2003-12-02 12:35:20',19.75,30200), 
      (8,'A','2004-12-02 10:36:20',1.0,30200), 
      (9,'A','2006-12-02 22:41:20',20.0,30200), 
      (10,'A','2006-12-02 22:42:20',40.0,30200)], 
     schema=schema) 

Lassen Sie uns unser Interesse Intervalle erstellen:

intervals = [] 
for i in range(0,61,5): 
    intervals.append(i) 
print(intervals) 

Welche sind:

[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60] 

Dann einige UDF, die wir für die Gruppierung benötigen:

u_get_year = udf(lambda col : col[:10]) 
u_get_hour = udf(lambda col : col.strip().split(" ")[1].split(':')[0], StringType()) 

def get_interval(col): 
    curr = int(col.strip().split(" ")[1].split(':')[1]) 

    for idx,interval in enumerate(intervals): 
     if intervals[idx] <= curr < intervals[idx+1]: 
      return "{}-{}".format(intervals[idx],intervals[idx+1]) 

    return "" 

u_get_interval = udf(get_interval, StringType()) 

Schließlich lassen Sie uns Operationen durchführen:

df2 = df.withColumn('DATE',u_get_year('DATETIMETS'))\ 
     .withColumn('HOUR', u_get_hour('DATETIMETS'))\ 
     .withColumn('INTERVAL', u_get_interval('DATETIMETS'))\ 
     .drop('DATETIMETS') 

df2.groupBy('DATE', 'HOUR', 'INTERVAL').agg(mean('PRICE'))\ 
     .orderBy('DATE', 'HOUR', 'INTERVAL').show() 

Ausgänge:

+----------+----+--------+----------+ 
|DATE  |HOUR|INTERVAL|avg(PRICE)| 
+----------+----+--------+----------+ 
|2002-12-02|9 |30-35 |19.75  | 
|2002-12-02|9 |35-40 |10.375 | 
|2002-12-02|9 |40-45 |30.0  | 
|2003-12-02|11 |25-30 |19.75  | 
|2003-12-02|11 |30-35 |19.75  | 
|2003-12-02|12 |35-40 |19.75  | 
|2004-12-02|10 |35-40 |1.0  | 
|2006-12-02|22 |40-45 |30.0  | 
+----------+----+--------+----------+ 
+0

Vielen Dank für Ihre Antwort. – Bookamp

Verwandte Themen