Ich verwende Apache Spark 1.6.2Kann nicht konvertieren RDD zu Datenrahmen (RDD Millionen von Zeilen hat)
ich eine CSV-Daten haben, es enthält etwa 8 Millionen Zeilen und ich möchte Datenrahmen konvertieren es
aber ich habe es zu RDD konvertieren ersten Abbildung zu tun, um die Daten zu erhalten (Spalte), die ich
Mapping the RDD funktioniert will, aber wenn es darum geht RDD zu Datenrahmen zu konvertieren, Funke wirft einen Fehler
Traceback (most recent call last):
File "C:/Users/Dzaky/Project/TJ-source/source/201512/final1.py", line 38, in <module>
result_iso = input_iso.map(extract_iso).toDF()
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 64, in toDF
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 423, in createDataFrame
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 310, in _createFromRDD
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\context.py", line 254, in _inferSchema
File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1315, in first
File "c:\spark\python\lib\pyspark.zip\pyspark\rdd.py", line 1297, in take
File "c:\spark\python\lib\pyspark.zip\pyspark\context.py", line 939, in runJob
File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
File "c:\spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 45, in deco
File "c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error
Diese meinen Code sind:
def extract_iso(line):
fields = line.split(',')
return [fields[-2], fields[1]]
input_iso = sc.textFile("data.csv")
result_iso = input_iso.map(extract_iso).toDF()
data.csv
mehr als 8 Millionen Zeilen hat, aber wenn ich die Zeilen subtrahieren, bis es nur < 500 Zeilen hat, arbeitet das Programm fein
Ich weiß nicht, ob Spark hat Zeilenbegrenzung oder etwas, gibt es irgendwelche Möglichkeiten, so kann ich meine RDD konvertieren?
Oder gibt es noch andere Möglichkeiten, wie wir den DataFrame so abbilden können, wie wir die RDD abbilden?
Weitere Informationen:
die Daten chaotisch ist, in jeder Zeile Gesamt Spalten ist oftenly unterscheidet sich von einem zum anderen, das ist, warum ich es erste Karte müssen. Aber die Daten, die ich möchte, ist immer genau die gleichen Index [1] und [-2] (die zweite Spalte und die vorletzte Spalte), die gesamte Spalte zwischen diesen Spalten unterscheiden sich von Zeile zu Zeile
Vielen Dank für die Antwort :)
Haben Sie versucht, die Funken-csv-Paket? https://github.com/databricks/spark-csv –
@RajatMishra Ja habe ich versucht, aber das Problem ist, die Daten sind chaotisch, insgesamt Spalten in jeder Zeile unterscheidet sich oft von einem anderen, deshalb muss ich Ordnen Sie es zuerst zu, bevor ich den DataFrame der Daten erstelle –
Versuchen Sie es mit 'sc.wholeTextFile' und führen Sie die Mapping-Transformation durch. http://stackoverflow.com/questions/41195924/error-while-reading-yy-large-files-with-spark-csv-package –