ich zur Zeit eine große RDD haben Join genannt chartEvents
Daten enthält, die Form:effizienteste Weg Massives/kleine Datenmengen
case class ChartEvent(patientID: String, itemID: String, chartTime: String, storeTime: String, value: String,
valueNum: String, warning: String, error: String)
Die Daten kommen aus einer 35 GB CSV-Datei, die ich Parsen bin in Verwendung SQL:
CSVUtils.loadCSVAsTable(sqlContext, "data_unzipped/CHARTEVENTS.csv")
val chartEvents = sqlContext.sql(
"""
|SELECT SUBJECT_ID, ITEMID, CHARTTIME, STORETIME, VALUE, VALUENUM, WARNING, ERROR
|FROM CHARTEVENTS
""".stripMargin)
.map(r => ChartEvent(r(0).toString, r(1).toString, r(2).toString, r(3).toString, r(4).toString,
r(5).toString, r(6).toString, r(7).toString))
ich habe eine separate, sehr klein (weniger als 100 Zeilen) RDD genannt featureMapping
der Form RDD[(itemID, label)]
wo diese sind beide Saiten. Was ich versuche zu tun, ist die RD12 chartEvents
auf Zeilen, die nur ElementIDs in featureMapping
enthalten filtern. Meine aktuelle Methode ist eine innere Verknüpfung der beiden RDDs auszuführen, wie folgt:
val result = chartEvents.map{case event => (event.itemID, event)}.join(featureMapping)
Ich bin jedoch zu bemerken, dass diese auf dem richtigen Weg ist, mehrere Stunden in Anspruch nehmen zu laufen, und wird eine große Menge an Raum mit in meinem /user/<user>/appdata/local/temp
Ordner. Gibt es eine effizientere Möglichkeit, diese Filterung durchzuführen? Wäre es schneller, in den sqlContext zu schreiben?
siehe 'Map-Seite join' Broadcast Variablen: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-broadcast.html#Introduction – maasg
@maasg, vielen Dank für das Feedback, scheint die Leistung verbessert zu haben. – mongolol