Update: Es scheint, dass meine Fehler wahrscheinlich daran liegen, wie ich Spark und/oder Hive installiert habe. Das Arbeiten mit Fensterfunktionen scheint in einem Databricks (gehosteten) Notebook ziemlich einfach zu sein. Ich muss herausfinden, wie ich das lokal einrichten kann.Wie bekomme ich einen PySpark DataFrame, der mit HiveContext in Spark 1.5.2 erstellt wurde?
Ich habe einen Spark DataFrame, den ich eine Fensterfunktion verwenden muss. * Ich habe versucht, die Anweisungen über here zu folgen, aber ich stieß auf einige Probleme.
Einstellung meiner Umgebung auf:
import os
import sys
import datetime as dt
os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2'
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip'
sys.path.append('/usr/bin/spark-1.5.2/python')
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
sc = pyspark.SparkContext()
hiveContext = pyspark.sql.HiveContext(sc)
sqlContext = pyspark.sql.SQLContext(sc)
from pyspark.sql import Row
from pyspark.sql.functions import struct
from pyspark.sql import DataFrame
from collections import OrderedDict
meine Daten einrichten:
test_ts = {'adminDistrict': None,
'city': None,
'country': {'code': 'NA', 'name': 'UNKNOWN'},
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89},
{'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44},
{'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3},
{'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6},
{'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84},
{'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74},
{'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33},
{'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5},
{'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79},
{'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3},
{'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0},
{'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35},
{'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82},
{'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24},
{'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61},
{'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14},
{'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0},
{'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82},
{'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11},
{'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46},
{'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8},
{'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74},
{'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63},
{'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64},
{'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}],
'maxDate': '2015-12-28T00:00:00Z',
'minDate': '2005-08-25T00:00:00Z',
'name': 'S&P GSCI Crude Oil Spot',
'offset': 0,
'resolution': 'DAY',
'sources': ['trf'],
'subtype': 'Index',
'type': 'Commodities',
'uid': 'TRF_INDEX_Z39824_PI'}
eine Funktion, die json in einen Datenrahmen zu drehen:
def ts_to_df(ts):
data = []
for line in ts['data']:
data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value']))
return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')])
einen Datenrahmen und zu nehmen Ein Blick auf das, was drin ist:
test_df = ts_to_df(test_ts)
test_df.show()
Das hat mich zeigt dies:
+----------+----------------------+
| Date|SP_GSCI_Crude_Oil_Spot|
+----------+----------------------+
|2005-08-25| 369.89|
|2005-08-26| 362.44|
|2005-08-29| 368.3|
|2005-08-30| 382.6|
|2005-08-31| 377.84|
|2005-09-01| 380.74|
|2005-09-02| 370.33|
|2005-09-05| 370.33|
|2005-09-06| 361.5|
|2005-09-07| 352.79|
|2005-09-08| 354.3|
|2005-09-09| 353.0|
|2005-09-12| 349.35|
|2005-09-13| 348.82|
|2005-09-14| 360.24|
|2005-09-15| 357.61|
|2005-09-16| 347.14|
|2005-09-19| 370.0|
|2005-09-20| 362.82|
|2005-09-21| 366.11|
+----------+----------------------+
Und hier ist, wo ich keine Ahnung habe, was ich tue und alles schiefgeht:
from pyspark.sql.functions import lag, col, lead
from pyspark.sql.window import Window
w = Window().partitionBy().orderBy(col('Date'))
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show()
dass mir diesen Fehler gibt:
Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;
So sieht es aus wie ich brauche einen HiveContext, oder? Muss ich meinen Dataframe mit einem HiveContext erstellen? Dann lassen Sie mich versuchen, einen Datenrahmen explizit mit HiveContext zu erstellen:
def ts_to_hive_df(ts):
data = []
for line in ts['data']:
data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(),
ts['name'].replace('&', '').replace(' ', '_'):line['value']})
temp_rdd = sc.parallelize(data).map(lambda x: Row(**x))
return hiveContext.createDataFrame(temp_rdd)
test_df = ts_to_hive_df(test_ts)
test_df.show()
Aber das gibt mir diese Fehlermeldung:
TypeError: 'JavaPackage' object is not callable
Wie verwende ich Fensterfunktionen? Muss ich die DataFrames mit einem HiveContext erstellen? Wenn ja, wie mache ich das? Kann mir jemand sagen, was ich falsch mache?
* Ich muss wissen, ob es Lücken in meinen Daten gibt. Ich habe die Spalte "Datum" und für jede Zeile, geordnet nach Datum, möchte ich wissen, was in der nächsten Zeile ist, und wenn ich fehlende Tage oder schlechte Daten habe, dann möchte ich die Daten des letzten Tages in dieser Zeile verwenden. Wenn Sie einen besseren Weg kennen, lassen Sie es mich wissen. Aber ich würde immer noch gerne wissen, wie diese Fensterfunktionen funktionieren.
Es tut uns leid. Speziellen Code hinzugefügt. Ich hoffe, das führt uns irgendwo hin. Danke für einen Blick. – Nathaniel
Alles klar, es sieht so aus, als ob etwas mit der lokalen Installation von Spark (oder Hive?) Zusammenhängen könnte, weil ich das in einem DataBricks-Notizbuch verwenden kann. DataBricks möchte nicht, dass wir eigene HiveContexte oder SQLContexte erstellen. Damit es dort funktioniert, habe ich die Erstellung meiner eigenen Kontexte weggelassen und die obige ts_to_hive_df-Funktion verwendet, wobei ich meinen hiveContext durch seinen sqlContext ersetzt habe. Ich muss das in meiner eigenen Installation schließlich arbeiten lassen. Ich werde zurückkommen und eine Lösung schreiben, wenn ich es herausfinde. – Nathaniel
Es sieht so aus, als ob Sie Spark-Binärdateien ohne Hive-Unterstützung erstellt haben. – zero323