2017-01-14 2 views
3

Ich verwende Apache Spark 1.6.2Kann nicht konvertieren RDD zu Datenrahmen (RDD Millionen von Zeilen hat)

ich eine CSV-Daten haben, es enthält etwa 8 Millionen Zeilen und ich möchte Datenrahmen konvertieren es

aber ich habe es zu RDD konvertieren ersten Abbildung zu tun, um die Daten zu erhalten (Spalte), die ich

Mapping the RDD funktioniert will, aber wenn es darum geht RDD zu Datenrahmen zu konvertieren, Funke wirft einen Fehler

Traceback (most recent call last): 
    File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module> 
    result_iso = input_iso.map(extract_iso).toDF() 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema 
    File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first 
    File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take 
    File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob 
    File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__ 
    File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco 
    File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error 

Diese meinen Code sind:

def extract_iso(line): 
    fields = line.split(',') 
    return [fields[-2], fields[1]] 

input_iso = sc.textFile("data.csv") 
result_iso = input_iso.map(extract_iso).toDF() 

data.csv mehr als 8 Millionen Zeilen hat, aber wenn ich die Zeilen subtrahieren, bis es nur < 500 Zeilen hat, arbeitet das Programm fein

Ich weiß nicht, ob Spark hat Zeilenbegrenzung oder etwas, gibt es irgendwelche Möglichkeiten, so kann ich meine RDD konvertieren?

Oder gibt es noch andere Möglichkeiten, wie wir den DataFrame so abbilden können, wie wir die RDD abbilden?

Weitere Informationen:

die Daten chaotisch ist, in jeder Zeile Gesamt Spalten ist oftenly unterscheidet sich von einem zum anderen, das ist, warum ich es erste Karte müssen. Aber die Daten, die ich möchte, ist immer genau die gleichen Index [1] und [-2] (die zweite Spalte und die vorletzte Spalte), die gesamte Spalte zwischen diesen Spalten unterscheiden sich von Zeile zu Zeile

Vielen Dank für die Antwort :)

+1

Haben Sie versucht, die Funken-csv-Paket? https://github.com/databricks/spark-csv –

+0

@RajatMishra Ja habe ich versucht, aber das Problem ist, die Daten sind chaotisch, insgesamt Spalten in jeder Zeile unterscheidet sich oft von einem anderen, deshalb muss ich Ordnen Sie es zuerst zu, bevor ich den DataFrame der Daten erstelle –

+1

Versuchen Sie es mit 'sc.wholeTextFile' und führen Sie die Mapping-Transformation durch. http://stackoverflow.com/questions/41195924/error-while-reading-yy-large-files-with-spark-csv-package –

Antwort

4

Die wahrscheinlichste Ursache ist, dass Spark versucht, Schema des neu erstellten Datenrahmens zu identifizieren. Versuchen zweites Verfahren zum Abbilden von RDD zu DF - angeben Schema, und gehen Sie durch createDataFrame, zum Beispiel:

>>> from pyspark.sql.types import * 
>>> schema = StructType([StructField('a', StringType()),StructField('b', StringType())]) 
>>> df = sqlContext.createDataFrame(input_iso.map(extract_iso), schema) 
+0

oh mein Gott IT WORKS !!!! Vielen Dank, ja du hast Recht, das Problem ist, dass ich das Schema nicht identifiziert habe, noch einmal, vielen Dank sehr, sehr :) –

Verwandte Themen