2016-08-18 2 views
1

Ich habe ein Datenframe in Pyspark. Hier ist, wie es aussieht,Einfügen von Datensätzen in einem Spark-Datenframe

+---------+---------+ 
|timestamp| price | 
+---------+---------+ 
|670098928| 50  | 
|670098930| 53  | 
|670098934| 55  | 
+---------+---------+ 

ich in den Lücken in Zeitstempeln mit dem vorherigen Zustand füllen will, so dass ich ein perfektes Set bekommen kann zeitgewichtete Mittelwert zu berechnen. Hier ist, was die Ausgabe sein sollte -

+---------+---------+ 
|timestamp| price | 
+---------+---------+ 
|670098928| 50  | 
|670098929| 50  | 
|670098930| 53  | 
|670098931| 53  | 
|670098932| 53  | 
|670098933| 53  | 
|670098934| 55  | 
+---------+---------+ 

Schließlich ich diesen neuen Datenrahmen auf der Festplatte bestehen bleiben soll, und meine Analyse zu visualisieren.

Wie mache ich das in pyspark? (Der Einfachheit halber habe ich gehalten, nur 2 Spalten. Meine eigentliche Datenrahmen hat 89 Spalten mit ~ 670 Millionen Datensätze vor, die Lücken zu füllen.)

+0

Du könntest mit scipy interpolieren. Ich bin mir nicht sicher, dass PySpark tun kann, was Sie wollen –

+0

@ cricket_007 Funken kann das nicht tun. Veenit, ich bin nicht sicher, warum willst du das überhaupt machen? – eliasah

+0

@eliasah Ich versuche, einen Datenrahmen mit einem Datensatz für jeden Zeitstempel (niedrigste Granularität Ebene) zu erstellen, so dass, wenn ich zeitgewichtete Durchschnittswerte tun möchte, ist es sehr praktisch. – Veenit

Antwort

1

Sie Zeitstempel Bereiche erzeugen kann, glätten sie und wählen Sie Reihen

import pyspark.sql.functions as func 

from pyspark.sql.types import IntegerType, ArrayType 


a=sc.parallelize([[670098928, 50],[670098930, 53], [670098934, 55]])\ 
.toDF(['timestamp','price']) 

f=func.udf(lambda x:range(x,x+5),ArrayType(IntegerType())) 

a.withColumn('timestamp',f(a.timestamp))\ 
.withColumn('timestamp',func.explode(func.col('timestamp')))\ 
.groupBy('timestamp')\ 
.agg(func.max(func.col('price')))\ 
.show() 

+---------+----------+ 
|timestamp|max(price)| 
+---------+----------+ 
|670098928|  50| 
|670098929|  50| 
|670098930|  53| 
|670098931|  53| 
|670098932|  53| 
|670098933|  53| 
|670098934|  55| 
|670098935|  55| 
|670098936|  55| 
|670098937|  55| 
|670098938|  55| 
+---------+----------+ 
+0

Ich bekomme 'AttributeError: 'JavaMember' Objekt hat kein Attribut 'parseDataType'', wenn ich' f = func.udf (Lambda x: Bereich (x, x + 5), ArrayType (IntegerType())) ' – Veenit

+0

den Code ausführen funktioniert, überprüfen Sie Ihre pyspark.sql Importe – marmouset

+0

Nein. Es tut es nicht. In welcher Version von Spark bist du? Ich bin auf 2.0.0 – Veenit

Verwandte Themen