Wir sind derzeit ein Leistungsproblem in sparksql in Scala Sprache geschrieben. Der Anwendungsablauf wird nachstehend erwähnt.SparkSQL-Leistungsproblem mit Collect-Methode
- Spark-Anwendung liest eine Textdatei von Eingabe HDFS Verzeichnis
einen Datenrahmen auf der Datei erstellt programmatisch Angabe Schema verwenden. Dieser Datenrahmen wird eine exakte Replikation der im Speicher gehaltenen Eingabedatei sein.
var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)
Erstellt einen gefilterte Datenrahmen aus dem ersten Datenrahmen in Schritt aufgebaut ist, werde um 18 Spalten in dem Datenrahmen hat 2. Dieser Datenrahmen wird eindeutige Kontonummern mit Hilfe verschiedenen Schlüsselwort enthält.
var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()
Mit den beiden in Schritt konstruierten Datenrahmen 2 & 3, werden wir alle Datensätze erhalten, die zu einer Kontonummer gehören und einige Json Parsing-Logik auf der gefilterten Daten tun.
var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()
Schließlich analysiert die json werden Daten in Hbase Tabelle stehen wir vor Performance-Probleme
Hier setzen werden, während die collect Verfahren auf der Datenrahmen aufrufen. Weil collect alle Daten in einen einzigen Knoten holt und dann die Verarbeitung durchführt, wodurch der Vorteil der parallelen Verarbeitung verloren geht. Auch im realen Szenario wird es 10 Milliarden Datensätze von Daten geben, die wir erwarten können. Daher kann das Sammeln aller dieser Datensätze in den Treiberknoten das Programm selbst aufgrund von Speicher- oder Speicherplatzbeschränkungen zum Absturz bringen.
Ich glaube nicht, dass die Take-Methode in unserem Fall verwendet werden kann, die eine begrenzte Anzahl von Datensätzen gleichzeitig abrufen wird. Wir müssen alle eindeutigen Kontonummern aus den gesamten Daten erhalten und daher bin ich mir nicht sicher, ob die Methode, die begrenzte Datensätze zu einem Zeitpunkt nimmt,
Appreciate jede Hilfe, um zu vermeiden Calling Collect-Methoden und haben einige andere Best Practices zu folgen.Code-Schnipsel/Anregungen/git Links wird sehr hilfreich sein, wenn jemand faced ähnliche Probleme
Code-Snippet
val eqpSchemaString = "acoountnumber ....."
val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName =>
StructField(fieldName, StringType, true)));
val eqpRdd = sc.textFile(inputPath)
val eqpRowRdd = eqpRdd.map(_.split(",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....)
var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema);
var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()
distAccNrsDF.foreach { data =>
var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()
var result = new JSONObject()
result.put("jsonSchemaVersion", "1.0")
val firstRowAcc = filtrEqpDF(0)
//Json parsing logic
{
.....
.....
}
}
Was möchten Sie eigentlich tun? schreibe einfach in Hbase Tabelle ?. Wenn das der Fall ist, warum willst du etwas sammeln oder machen? collect und take werden nur verwendet, um Beispieldaten anzuzeigen. Ansonsten muss nicht gesammelt oder genommen werden. – Phoenix
Grundsätzlich müssen wir alle Daten gruppieren, die zu derselben Kontonummer gehören (in der Quelldatei) und die gruppierten Daten an hbase übergeben müssen. Sammeln wir verwenden, weil, um die verschiedenen Kontonummern global zu ermitteln. Wenn wir das Sammeln nicht aufrufen, wie können wir global die eindeutigen Kontonummern herausfinden, die möglicherweise in mehreren Knoten verteilt sind. – afzal