2016-10-05 3 views
1

Wenn ich eine Zeile Datenrahmen wie folgt zusammensetzen bringt meine Methode erfolgreich den erwarteten Datenrahmen zurück.Apache Spark 2.0.0 PySpark manuelle Dataframe Erstellung Kopf scratcher

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("terminate_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [ 
     Row(
      job_load_id=job_load_id, 
      terminate_datetime=datetime.now(), 
      was_success=is_success 
     ) 
    ] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

Wenn ich die „terminate_datetime“ auf „end_datetime“ oder „finish_datetime“ ändern, wie unten gezeigt einen Fehler wirft.

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("end_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [ 
     Row(
      job_load_id=job_load_id, 
      end_datetime=datetime.now(), 
      was_success=is_success 
     ) 
    ] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

Der Fehler, den ich erhalten ist

TypeError: IntegerType can not accept object datetime.datetime(2016, 10, 5, 11, 19, 31, 915745) in type <class 'datetime.datetime'> 

Ich kann "terminate_datetime" auf "start_datetime" und haben mit anderen Worten experimentiert.

Ich sehe keinen Grund für Feldnamenänderungen diesen Code zu brechen, da es nichts mehr tut, als einen manuellen Datenrahmen aufzubauen.

Das ist besorgniserregend, da ich Datenframes verwende, um ein Data Warehouse zu laden, wo ich keine Kontrolle über die Feldnamen habe.

I PySpark auf Python 3.3.2 auf Fedora 20.

Antwort

1

Warum der Name Dinge ändert ausführe? Das Problem ist, dass ein tuplesortiert von __fields__ ist. So ist der erste Fall schafft

from pyspark.sql import Row 
from datetime import datetime 

x = Row(job_load_id=1, terminate_datetime=datetime.now(), was_success=True) 
x.__fields__ 
## ['job_load_id', 'terminate_datetime', 'was_success'] 

während der zweite schafft:

y = Row(job_load_id=1, end_datetime=datetime.now(), was_success=True) 
y.__fields__ 
## ['end_datetime', 'job_load_id', 'was_success'] 

Dies passt nicht mehr das Schema Sie die (IntegerType, TimestampType, Boolean) erwartet definiert.

Da nützlich ist vor allem für Schema-Inferenz und Sie bieten Schema direkt können Sie die Adresse von Standard tuple mit:

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("end_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [tuple(job_load_id, datetime.now(), is_success)] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

obwohl ein einzelnes Element zu schaffen DataFrame sieht seltsam aus, wenn nicht sinnlos.

Verwandte Themen