2014-04-30 13 views
22

Ich muss mehrere Dateien über verschiedene Verzeichnisse verstreut verarbeiten. Ich würde gerne alle diese in einer einzigen RDD laden und dann map/reduce darauf durchführen. Ich sehe, dass SparkContext mehrere Dateien aus einem einzigen Verzeichnis mithilfe von Platzhaltern laden kann. Ich bin mir nicht sicher, wie ich Dateien aus mehreren Ordnern laden soll.Spark Kontext Textdatei: Laden Sie mehrere Dateien

Der folgende Codeausschnitt fehlschlägt:

for fileEntry in files: 
    fileName = basePath + "/" + fileEntry 
    lines = sc.textFile(fileName) 
    if retval == None: 
     retval = lines 
    else: 
     retval = sc.union(retval, lines) 

Dies scheitert auf der dritten Schleife mit der folgenden Fehlermeldung:

retval = sc.union(retval, lines) 
TypeError: union() takes exactly 2 arguments (3 given) 

Welche bizarr I nur zwei Argumente bin bereitstellt. Alle Hinweise geschätzt.

+2

..aber das erste Argument ist 'self'. Von der [docs] (http://spark.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#union), müssen Sie 'sc.union ([Retval, Linien]) ' –

+0

Lass mich das versuchen. Ich bin überrascht, warum dies würde für zwei Schleifen arbeiten und nicht an dritte ... – Raj

+0

, die den Trick tat. Danke Jonathan! – Raj

Antwort

39

Wie wäre es stattdessen mit dieser Formulierung?

sc.union([sc.textFile(basepath + "/" + f) for f in files]) 

In Scala SparkContext.union() hat zwei Varianten, eine, die Vararg Argumente, und man nimmt, die eine Liste nimmt. Nur die zweite existiert in Python (da Python keinen Polymorphismus hat).

UPDATE

Sie einen einzelnen textFile Aufruf verwenden können mehrere Dateien zu lesen.

sc.textFile(','.join(files)) 
+0

Danke Daniel. Mein Problem könnte Python-zentrisch sein. Ihr Schnipsel scheint Scala, – Raj

+0

Ah, warum habe ich das nicht bemerkt ?! Es gibt keinen Funktionspolymorphismus in Python, daher kann nur eine Form von SparkContext.union() verfügbar gemacht werden. Sie entschieden sich dafür, den zu entlarven, der eine Liste nimmt, nicht den, der einen Vararg nimmt. (Wie Jonathan sagt.) –

+0

Ich habe die Antwort auf Python anstelle von Scala korrigiert. –

1

können Sie verwenden diese

Zuerst Sie können einen Buffer/Liste der S3-Pfade erhalten:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

Jetzt Geben Sie diese Liste Objekt an folgende Stück Code, Anmerkung: sc Aufgabe SqlContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Jetzt haben Sie einen endgültigen Unified-RDD dh df

Optional, und Sie können es auch partitionieren in einem einzigen BigRDD

val files = sc.textFile(filename, 1).repartition(1) 

Repartitionierung immer funktioniert: D

13

ich ähnliche Probleme lösen, indem Platzhalter verwenden.

z.B. Ich fand einige Züge in den Dateien, die ich in Funken laden möchten,

dir

subdir1/folder1/x.txt

subdir2/folder2/y.txt

können Sie den folgenden Satz

sc.textFile("dir/*/*/*.txt") 
verwenden

um alle relativen Dateien zu laden.

Der Platzhalter ‚*‘ funktioniert nur in einzelnen Level-Verzeichnis, das nicht rekursiv ist.

2

können Sie die folgende Funktion von SparkContext verwenden:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

ein Verzeichnis von Textdateien von HDFS Lesen, einem lokalen Dateisystem (verfügbar auf allen Knoten) oder eine beliebige Hadoop unterstützten Dateisystem URI . Jede Datei wird als einzelner Datensatz gelesen und in einem Schlüssel/Wert-Paar zurückgegeben, wobei der Schlüssel der Pfad jeder Datei ist und der Wert der Inhalt jeder Datei ist.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

+0

Dies funktioniert in den meisten Fällen, aber in meiner Erfahrung, das funktioniert nicht, wenn die Größe der Dateien groß ist. – KartikKannapur

Verwandte Themen