2017-05-08 4 views
0

Ich lese jede Datei eines Verzeichnisses mit wholeTextFiles. Danach rufe ich eine Funktion auf jedem Element der RDD mit map. Das ganze Programm verwendet nur 50 Zeilen jeder Datei. Der Code ist wie folgt:Apache Funke: Lesen Sie große Dateien aus einem Verzeichnis

def processFiles(fileNameContentsPair): 
    fileName= fileNameContentsPair[0] 
    result = "\n\n"+fileName 
    resultEr = "\n\n"+fileName 
    input = StringIO.StringIO(fileNameContentsPair[1]) 
    reader = csv.reader(input,strict=True) 

    try: 
     i=0 
     for row in reader: 
     if i==50: 
      break 
     // do some processing and get result string 
     i=i+1 
    except csv.Error as e: 
    resultEr = resultEr +"error occured\n\n" 
    return resultEr 
    return result 



if __name__ == "__main__": 
    inputFile = sys.argv[1] 
    outputFile = sys.argv[2] 
    sc = SparkContext(appName = "SomeApp") 
    resultRDD = sc.wholeTextFiles(inputFile).map(processFiles) 
    resultRDD.saveAsTextFile(outputFile) 

Die Größe der einzelnen Dateien des Verzeichnisses kann in meinem Fall sehr groß sein und aus diesem Grund die Verwendung von wholeTextFiles api wird in diesem Fall ineffizient sein. Gibt es dafür einen effizienten Weg? Ich kann daran denken, jede Datei des Verzeichnisses nacheinander zu durchlaufen, aber das scheint auch ineffizient zu sein. Ich bin neu zu funken. Bitte lassen Sie mich wissen, ob es einen effizienten Weg gibt, dies zu tun.

+1

Wie groß ist die Größe jeder Datei? Können Sie die Dateien nicht in noch kleinere Dateien aufteilen? –

+0

@DatTran Die Größe jeder Datei kann in wenigen Gbs liegen und die Anzahl der Dateien im Verzeichnis kann mehr als 100 betragen. Eine Möglichkeit, Dateien aufzuteilen, besteht darin, jede Datei einzeln aufzuteilen und die erste von jeder Datei zu trennen in einem temporären Verzeichnis. Danach können wir 'ganzeTextFiles' auf dieses temporäre Verzeichnis anwenden. Ist das die Art, wie Sie vorhaben, die Dateien zu teilen? Wenn nicht, lass es mich wissen, wie würdest du vorschlagen, die Dateien zu teilen? – mcurious

Antwort

1

Okay, was ich vorschlagen würde ist, Ihre Dateien zuerst in kleinere Stücke zu teilen, ein paar Gbs ist zu groß, um zu lesen, was der Hauptgrund für Ihre Verzögerung ist. Wenn sich Ihre Daten in HDFS befinden, könnten Sie für jede Datei etwa 64 MB haben. Andernfalls sollten Sie mit der Dateigröße experimentieren, da dies von der Anzahl der Executoren abhängt, die Sie haben. Wenn Sie also mehr kleinere Stücke haben, können Sie dies erhöhen, um mehr Parallelität zu erreichen. Sie können auch Ihre Partition vergrößern, um sie zu optimieren, da Ihre -Funktion nicht CPU-intensiv zu sein scheint. Das einzige Problem mit vielen Executoren ist, dass I/O zunimmt, aber wenn die Dateigröße klein ist, sollte das nicht viel von dem Problem sein.

Übrigens, es ist kein temporäres Verzeichnis erforderlich, wholeTextFiles unterstützt Platzhalter wie *. Beachten Sie auch, wenn Sie S3 als Dateisystem verwenden, könnte es einen Engpass geben, wenn Sie zu viele kleine Dateien haben, da das Lesen eine Weile statt einer großen Datei dauern kann. Das ist also nicht trivial.

Hoffe, das hilft!

Verwandte Themen