Ich lese jede Datei eines Verzeichnisses mit wholeTextFiles
. Danach rufe ich eine Funktion auf jedem Element der RDD mit map
. Das ganze Programm verwendet nur 50 Zeilen jeder Datei. Der Code ist wie folgt:Apache Funke: Lesen Sie große Dateien aus einem Verzeichnis
def processFiles(fileNameContentsPair):
fileName= fileNameContentsPair[0]
result = "\n\n"+fileName
resultEr = "\n\n"+fileName
input = StringIO.StringIO(fileNameContentsPair[1])
reader = csv.reader(input,strict=True)
try:
i=0
for row in reader:
if i==50:
break
// do some processing and get result string
i=i+1
except csv.Error as e:
resultEr = resultEr +"error occured\n\n"
return resultEr
return result
if __name__ == "__main__":
inputFile = sys.argv[1]
outputFile = sys.argv[2]
sc = SparkContext(appName = "SomeApp")
resultRDD = sc.wholeTextFiles(inputFile).map(processFiles)
resultRDD.saveAsTextFile(outputFile)
Die Größe der einzelnen Dateien des Verzeichnisses kann in meinem Fall sehr groß sein und aus diesem Grund die Verwendung von wholeTextFiles
api wird in diesem Fall ineffizient sein. Gibt es dafür einen effizienten Weg? Ich kann daran denken, jede Datei des Verzeichnisses nacheinander zu durchlaufen, aber das scheint auch ineffizient zu sein. Ich bin neu zu funken. Bitte lassen Sie mich wissen, ob es einen effizienten Weg gibt, dies zu tun.
Wie groß ist die Größe jeder Datei? Können Sie die Dateien nicht in noch kleinere Dateien aufteilen? –
@DatTran Die Größe jeder Datei kann in wenigen Gbs liegen und die Anzahl der Dateien im Verzeichnis kann mehr als 100 betragen. Eine Möglichkeit, Dateien aufzuteilen, besteht darin, jede Datei einzeln aufzuteilen und die erste von jeder Datei zu trennen in einem temporären Verzeichnis. Danach können wir 'ganzeTextFiles' auf dieses temporäre Verzeichnis anwenden. Ist das die Art, wie Sie vorhaben, die Dateien zu teilen? Wenn nicht, lass es mich wissen, wie würdest du vorschlagen, die Dateien zu teilen? – mcurious