2016-04-20 2 views
0

Ich versuche this CSV-Datei mit vielen Spalten zu laden und berechnen Sie die Korrelation zwischen den Spalten mit Spark.Python Worker stürzt ab, wenn CSV-Datei mit vielen Spalten geladen wird

from pyspark import SparkContext, SparkConf 
from pyspark.mllib.stat import Statistics 

conf = SparkConf()\ 
    .setAppName("Movie recommender")\ 
    .setMaster("local[*]")\ 
    .set("spark.driver.memory", "10g")\ 
    .set("spark.driver.maxResultSize", "4g") 

sc = SparkContext(conf=conf) 


pivot = sc.textFile(r"pivot.csv") 
header = pivot.first() 
pivot = pivot.filter(lambda x:x != header) 
pivot = pivot.map(lambda x:x.split()).cache() 
corrs = Statistics.corr(pivot) 

ich diesen Fehler:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(Unknown Source) 
    at java.net.SocketOutputStream.write(Unknown Source) 

Antwort

0

ich dies mit einem erhöhten Partitionen laufen verwaltet. Aber die Leistung ist auf meinem lokalen Rechner sehr langsam, so dass es nicht funktioniert. Wenn die Anzahl der Spalten hoch ist, wird Leistungsproblem erwartet.

def extract_sparse(str_lst, N): 
    if len(str_lst) == 0: 
     return (0, {}) 
    else: 
     keyvalue = {} 
     length = len(str_lst) 
     if length > N: 
      length = N 
     for i in range(length): 
      if str_lst[i] != '': # not missing 
       keyvalue[i] = float(str_lst[i]) 

     return (length, keyvalue) 

pivot = sc.textFile(r"pivot.csv", 24) 
header = pivot.first() 
pivot = pivot.filter(lambda x:x != header) 
pivot = pivot.map(lambda x:x.split(',')) 
pivot = pivot.map(lambda x: extract_sparse(x, 50000)) 
pivot = pivot.map(lambda x: Vectors.sparse(x[0], x[1])) 
pivot = pivot.map(lambda x: x.toArray()).collect() 
corrs = Statistics.corr(pivot) 
Verwandte Themen