2017-11-30 3 views
1

Wir sind derzeit ein Leistungsproblem in sparksql in Scala Sprache geschrieben. Der Anwendungsablauf wird nachstehend erwähnt.SparkSQL-Leistungsproblem mit Collect-Methode

  1. Spark-Anwendung liest eine Textdatei von Eingabe HDFS Verzeichnis
  2. einen Datenrahmen auf der Datei erstellt programmatisch Angabe Schema verwenden. Dieser Datenrahmen wird eine exakte Replikation der im Speicher gehaltenen Eingabedatei sein.

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema)

  3. Erstellt einen gefilterte Datenrahmen aus dem ersten Datenrahmen in Schritt aufgebaut ist, werde um 18 Spalten in dem Datenrahmen hat 2. Dieser Datenrahmen wird eindeutige Kontonummern mit Hilfe verschiedenen Schlüsselwort enthält.

    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect()

  4. Mit den beiden in Schritt konstruierten Datenrahmen 2 & 3, werden wir alle Datensätze erhalten, die zu einer Kontonummer gehören und einige Json Parsing-Logik auf der gefilterten Daten tun.

    var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect()

  5. Schließlich analysiert die json werden Daten in Hbase Tabelle stehen wir vor Performance-Probleme

Hier setzen werden, während die collect Verfahren auf der Datenrahmen aufrufen. Weil collect alle Daten in einen einzigen Knoten holt und dann die Verarbeitung durchführt, wodurch der Vorteil der parallelen Verarbeitung verloren geht. Auch im realen Szenario wird es 10 Milliarden Datensätze von Daten geben, die wir erwarten können. Daher kann das Sammeln aller dieser Datensätze in den Treiberknoten das Programm selbst aufgrund von Speicher- oder Speicherplatzbeschränkungen zum Absturz bringen.

Ich glaube nicht, dass die Take-Methode in unserem Fall verwendet werden kann, die eine begrenzte Anzahl von Datensätzen gleichzeitig abrufen wird. Wir müssen alle eindeutigen Kontonummern aus den gesamten Daten erhalten und daher bin ich mir nicht sicher, ob die Methode, die begrenzte Datensätze zu einem Zeitpunkt nimmt,

Appreciate jede Hilfe, um zu vermeiden Calling Collect-Methoden und haben einige andere Best Practices zu folgen.Code-Schnipsel/Anregungen/git Links wird sehr hilfreich sein, wenn jemand faced ähnliche Probleme

Code-Snippet

val eqpSchemaString = "acoountnumber ....." 
    val eqpSchema = StructType(eqpSchemaString.split(" ").map(fieldName => 
StructField(fieldName, StringType, true))); 
    val eqpRdd = sc.textFile(inputPath) 
    val eqpRowRdd = eqpRdd.map(_.split(",")).map(eqpRow => Row(eqpRow(0).trim, eqpRow(1).trim, ....) 

    var eqpDF = sqlContext.createDataFrame(eqpRowRdd, eqpSchema); 


    var distAccNrsDF = eqpDF.select("accountnumber").distinct().collect() 


    distAccNrsDF.foreach { data => 

     var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'").collect() 



     var result = new JSONObject() 

     result.put("jsonSchemaVersion", "1.0") 
     val firstRowAcc = filtrEqpDF(0) 
     //Json parsing logic 
     { 
     ..... 
     ..... 
     } 
    } 
+0

Was möchten Sie eigentlich tun? schreibe einfach in Hbase Tabelle ?. Wenn das der Fall ist, warum willst du etwas sammeln oder machen? collect und take werden nur verwendet, um Beispieldaten anzuzeigen. Ansonsten muss nicht gesammelt oder genommen werden. – Phoenix

+0

Grundsätzlich müssen wir alle Daten gruppieren, die zu derselben Kontonummer gehören (in der Quelldatei) und die gruppierten Daten an hbase übergeben müssen. Sammeln wir verwenden, weil, um die verschiedenen Kontonummern global zu ermitteln. Wenn wir das Sammeln nicht aufrufen, wie können wir global die eindeutigen Kontonummern herausfinden, die möglicherweise in mehreren Knoten verteilt sind. – afzal

Antwort

2

Der Ansatz in der Regel in einer solchen Situation nehmen gehabt haben ist:

  • statt collect, invoke foreachPartition: foreachPartition wendet eine Funktion auf jede Partition (dargestellt durch eine Iterator[Row]) der zugrunde liegenden DataFrame separat an (die Partition ist die atomare Einheit der Parallelität Spark)
  • die Funktion eine Verbindung zu HBase geöffnet (so dass es eines pro Partition) und jeder Testamentsvollstrecker eine Verbindung der

Das bedeutet, alle enthaltenen Werte über diese Verbindung senden öffnet (die nicht serialisierbar ist aber lebt innerhalb der Grenzen der Funktion und muss daher nicht über das Netzwerk gesendet werden) und sendet seinen Inhalt unabhängig an HBase, ohne dass alle Daten des Treibers (oder irgendeines Knotens) gesammelt werden müssen.

Es sieht aus wie Sie eine CSV-Datei lesen, so ist es wahrscheinlich so etwas wie die folgenden den Trick:

spark.read.csv(inputPath).   // Using DataFrameReader but your way works too 
    foreachPartition { rows => 
    val conn = ???     // Create HBase connection 
    for (row <- rows) {   // Loop over the iterator 
     val data = parseJson(row) // Your parsing logic 
     ???       // Use 'conn' to save 'data' 
    } 
    } 
2

Sie können in Ihrem Code ignorieren sammeln, wenn Sie große Menge von Daten haben.

Collect Gibt alle Elemente des Datasets als Array im Treiberprogramm zurück. Dies ist normalerweise nach einem Filter oder einer anderen Operation nützlich, die eine ausreichend kleine Teilmenge der Daten zurückgibt.

Auch dies kann dazu führen, dass der Treiber nicht genügend Arbeitsspeicher zur Verfügung hat, da collect() die gesamte RDD/DF auf einen einzelnen Computer holt.

Ich habe gerade Ihren Code bearbeitet, der für Sie arbeiten sollte.

 var distAccNrsDF = eqpDF.select("accountnumber").distinct() 
      distAccNrsDF.foreach { data => 
       var filtrEqpDF = eqpDF.where("accountnumber='" + data.getString(0) + "'") 
       var result = new JSONObject() 
       result.put("jsonSchemaVersion", "1.0") 
       val firstRowAcc = filtrEqpDF(0) 
       //Json parsing logic 
       { 
       ..... 
       ..... 
       } 
      }