2015-12-29 6 views
6

Update: Es scheint, dass meine Fehler wahrscheinlich daran liegen, wie ich Spark und/oder Hive installiert habe. Das Arbeiten mit Fensterfunktionen scheint in einem Databricks (gehosteten) Notebook ziemlich einfach zu sein. Ich muss herausfinden, wie ich das lokal einrichten kann.Wie bekomme ich einen PySpark DataFrame, der mit HiveContext in Spark 1.5.2 erstellt wurde?

Ich habe einen Spark DataFrame, den ich eine Fensterfunktion verwenden muss. * Ich habe versucht, die Anweisungen über here zu folgen, aber ich stieß auf einige Probleme.

Einstellung meiner Umgebung auf:

import os 
import sys 
import datetime as dt 

os.environ["SPARK_HOME"] = '/usr/bin/spark-1.5.2' 
os.environ["PYTHONPATH"] = '/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip' 
sys.path.append('/usr/bin/spark-1.5.2/python') 
sys.path.append('/usr/bin/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip') 

import pyspark 
sc = pyspark.SparkContext() 
hiveContext = pyspark.sql.HiveContext(sc) 
sqlContext = pyspark.sql.SQLContext(sc) 
from pyspark.sql import Row 
from pyspark.sql.functions import struct 
from pyspark.sql import DataFrame 
from collections import OrderedDict 

meine Daten einrichten:

test_ts = {'adminDistrict': None, 
'city': None, 
'country': {'code': 'NA', 'name': 'UNKNOWN'}, 
'data': [{'timestamp': '2005-08-25T00:00:00Z', 'value': 369.89}, 
    {'timestamp': '2005-08-26T00:00:00Z', 'value': 362.44}, 
    {'timestamp': '2005-08-29T00:00:00Z', 'value': 368.3}, 
    {'timestamp': '2005-08-30T00:00:00Z', 'value': 382.6}, 
    {'timestamp': '2005-08-31T00:00:00Z', 'value': 377.84}, 
    {'timestamp': '2005-09-01T00:00:00Z', 'value': 380.74}, 
    {'timestamp': '2005-09-02T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-05T00:00:00Z', 'value': 370.33}, 
    {'timestamp': '2005-09-06T00:00:00Z', 'value': 361.5}, 
    {'timestamp': '2005-09-07T00:00:00Z', 'value': 352.79}, 
    {'timestamp': '2005-09-08T00:00:00Z', 'value': 354.3}, 
    {'timestamp': '2005-09-09T00:00:00Z', 'value': 353.0}, 
    {'timestamp': '2005-09-12T00:00:00Z', 'value': 349.35}, 
    {'timestamp': '2005-09-13T00:00:00Z', 'value': 348.82}, 
    {'timestamp': '2005-09-14T00:00:00Z', 'value': 360.24}, 
    {'timestamp': '2005-09-15T00:00:00Z', 'value': 357.61}, 
    {'timestamp': '2005-09-16T00:00:00Z', 'value': 347.14}, 
    {'timestamp': '2005-09-19T00:00:00Z', 'value': 370.0}, 
    {'timestamp': '2005-09-20T00:00:00Z', 'value': 362.82}, 
    {'timestamp': '2005-09-21T00:00:00Z', 'value': 366.11}, 
    {'timestamp': '2005-09-22T00:00:00Z', 'value': 364.46}, 
    {'timestamp': '2005-09-23T00:00:00Z', 'value': 351.8}, 
    {'timestamp': '2005-09-26T00:00:00Z', 'value': 360.74}, 
    {'timestamp': '2005-09-27T00:00:00Z', 'value': 356.63}, 
    {'timestamp': '2005-09-28T00:00:00Z', 'value': 363.64}, 
    {'timestamp': '2005-09-29T00:00:00Z', 'value': 366.05}], 
'maxDate': '2015-12-28T00:00:00Z', 
'minDate': '2005-08-25T00:00:00Z', 
'name': 'S&P GSCI Crude Oil Spot', 
'offset': 0, 
'resolution': 'DAY', 
'sources': ['trf'], 
'subtype': 'Index', 
'type': 'Commodities', 
'uid': 'TRF_INDEX_Z39824_PI'} 

eine Funktion, die json in einen Datenrahmen zu drehen:

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append((dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), line['value'])) 
    return sc.parallelize(data).toDF(['Date', ts['name'].replace('&', '').replace(' ', '_')]) 

einen Datenrahmen und zu nehmen Ein Blick auf das, was drin ist:

test_df = ts_to_df(test_ts) 
test_df.show() 

Das hat mich zeigt dies:

+----------+----------------------+ 
|  Date|SP_GSCI_Crude_Oil_Spot| 
+----------+----------------------+ 
|2005-08-25|    369.89| 
|2005-08-26|    362.44| 
|2005-08-29|     368.3| 
|2005-08-30|     382.6| 
|2005-08-31|    377.84| 
|2005-09-01|    380.74| 
|2005-09-02|    370.33| 
|2005-09-05|    370.33| 
|2005-09-06|     361.5| 
|2005-09-07|    352.79| 
|2005-09-08|     354.3| 
|2005-09-09|     353.0| 
|2005-09-12|    349.35| 
|2005-09-13|    348.82| 
|2005-09-14|    360.24| 
|2005-09-15|    357.61| 
|2005-09-16|    347.14| 
|2005-09-19|     370.0| 
|2005-09-20|    362.82| 
|2005-09-21|    366.11| 
+----------+----------------------+ 

Und hier ist, wo ich keine Ahnung habe, was ich tue und alles schiefgeht:

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

dass mir diesen Fehler gibt:

Py4JJavaError: An error occurred while calling o59.select. : org.apache.spark.sql.AnalysisException: Could not resolve window function 'lead'. Note that, using window functions currently requires a HiveContext;

So sieht es aus wie ich brauche einen HiveContext, oder? Muss ich meinen Dataframe mit einem HiveContext erstellen? Dann lassen Sie mich versuchen, einen Datenrahmen explizit mit HiveContext zu erstellen:

def ts_to_hive_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    temp_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
    return hiveContext.createDataFrame(temp_rdd) 

test_df = ts_to_hive_df(test_ts) 
test_df.show() 

Aber das gibt mir diese Fehlermeldung:

TypeError: 'JavaPackage' object is not callable

Wie verwende ich Fensterfunktionen? Muss ich die DataFrames mit einem HiveContext erstellen? Wenn ja, wie mache ich das? Kann mir jemand sagen, was ich falsch mache?

* Ich muss wissen, ob es Lücken in meinen Daten gibt. Ich habe die Spalte "Datum" und für jede Zeile, geordnet nach Datum, möchte ich wissen, was in der nächsten Zeile ist, und wenn ich fehlende Tage oder schlechte Daten habe, dann möchte ich die Daten des letzten Tages in dieser Zeile verwenden. Wenn Sie einen besseren Weg kennen, lassen Sie es mich wissen. Aber ich würde immer noch gerne wissen, wie diese Fensterfunktionen funktionieren.

+0

Es tut uns leid. Speziellen Code hinzugefügt. Ich hoffe, das führt uns irgendwo hin. Danke für einen Blick. – Nathaniel

+1

Alles klar, es sieht so aus, als ob etwas mit der lokalen Installation von Spark (oder Hive?) Zusammenhängen könnte, weil ich das in einem DataBricks-Notizbuch verwenden kann. DataBricks möchte nicht, dass wir eigene HiveContexte oder SQLContexte erstellen. Damit es dort funktioniert, habe ich die Erstellung meiner eigenen Kontexte weggelassen und die obige ts_to_hive_df-Funktion verwendet, wobei ich meinen hiveContext durch seinen sqlContext ersetzt habe. Ich muss das in meiner eigenen Installation schließlich arbeiten lassen. Ich werde zurückkommen und eine Lösung schreiben, wenn ich es herausfinde. – Nathaniel

+1

Es sieht so aus, als ob Sie Spark-Binärdateien ohne Hive-Unterstützung erstellt haben. – zero323

Antwort

0

Dies ist eine ältere Frage und daher problematisch, da Sie wahrscheinlich auf neue Versionen von Spark umgezogen sind. Ich führe Spark 2.0 selbst, also könnte das Betrug sein.

Aber fwiw: 2 mögliche Probleme. Im ersten Beispiel denke ich, dass die .toDF() möglicherweise SQLContext defragmentieren, da Sie beide angerufen hatten. In der zweiten, könnte es sein, dass Sie den hivecontext innerhalb der Funktion aufgerufen haben?

Wenn ich Ihre zweite ts_to_df Funktion so umgestalten, dass der HiveConnext außerhalb der Funktion aufgerufen wird, ist alles in Ordnung.

def ts_to_df(ts): 
    data = [] 
    for line in ts['data']: 
     data.append({'Date':dt.datetime.strptime(line['timestamp'][:10], '%Y-%m-%d').date(), 
       ts['name'].replace('&', '').replace(' ', '_'):line['value']}) 
    return data 

data = ts_to_df(test_ts) 
test_rdd = sc.parallelize(data).map(lambda x: Row(**x)) 
test_df = hiveContext.createDataFrame(test_rdd) 

from pyspark.sql.functions import lag, col, lead 
from pyspark.sql.window import Window 

w = Window().partitionBy().orderBy(col('Date')) 
test_df.select(lead(test_df.Date, count=1, default=None).over(w).alias("Next_Date")).show() 

Ich bekomme die Ausgabe

+----------+ 
| Next_Date| 
+----------+ 
|2005-08-26| 
|2005-08-29| 
|2005-08-30| 
|2005-08-31| 
|2005-09-01| 
|2005-09-02| 
..... 
Verwandte Themen