2017-01-02 1 views
2

Ich benutze Spark 1.6.2, Scala 2.10.5 und Java 1.7.Spark dichte_Rank Fensterfunktion - ohne eine PartitionBy-Klausel

Unser Anwendungsfall erfordert, dass wir bei einem Datensatz von mehr als 200 Millionen Zeilen den Befehl dese_rank() ausführen, ohne die Klausel partitionBy zu verwenden. Nur die Klausel orderBy wird verwendet. Dies läuft derzeit in MSSQL und dauert etwa 30 Minuten.

Ich habe die entsprechende Logik in Funken umgesetzt, wie unten gezeigt:

val df1 = hqlContext.read.format("jdbc").options(
    Map("url" -> url, "driver" -> driver, 
    "dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load() 

df1.cache() 

val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId"))) 

ich den Job in Yarn-Cluster-Modus vorlege, wie unten gezeigt. Ich habe einen 2-Knoten Hadoop 2.6 Cluster mit jeweils 4 vCores und 32 GB Speicher.

spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar 

In den Protokollen, kann ich sehen, dass in 15 Minuten in Spark-& zwischengespeichert wird die Tabelle von etwa 200 mil Reihen von MSSQL importiert zu werden. Ich sehe, dass bis zu diesem Zeitpunkt ungefähr 5 GB Speicher verwendet werden und ungefähr 6,2 GB Speicher noch immer auf einem der Executoren frei sind und 11 GB Speicher auf dem anderen Executor frei sind.

Aber der Schritt bei DENSE_RANK() wird immer mit Versagen „GC Overhead Grenze überschritten“ Fehler nach wenigen Minuten. Ich habe sogar den Treiberspeicher so hoch wie 7g gesetzt, wie Sie oben im Befehl Funkenübergabe feststellen können. Aber vergebens! Natürlich verstehe ich, dass das Fehlen der PartitionBy-Klausel tatsächlich Probleme in Spark verursacht. Aber das ist leider der Anwendungsfall, mit dem wir uns befassen müssen.

Können Sie bitte etwas Licht hier werfen? Fehle ich etwas? Gibt es eine Alternative zur Verwendung der dose_rank-Fensterfunktion in Spark? Wie zum Beispiel mit der Funktion "zipWithIndex", die von einem der Experten in diesem Forum vorgeschlagen wurde? Wird es zu den gleichen Ergebnissen wie bei dichtem_Rank führen, wenn ich weiß, dass die "zipWithIndex" -Methode die Funktion row_number() im Gegensatz zu dicht_Rank repliziert?

Jeder hilfreiche Rat wird geschätzt! Vielen Dank!

Antwort

2

Es gibt zwei verschiedene Probleme hier:

  • Sie Ladedaten über JDBC-Verbindung ohne Partitionierungsspalte oder Partition Prädikate bereitstellt. Dies lädt alle Daten mit einem einzelnen Executor-Thread.

    Dieses Problem ist in der Regel recht einfach zu lösen, entweder durch eine der vorhandenen Spalten, oder durch künstlichen Schlüssel bereitstellt.

  • Sie verwenden Fensterfunktionen ohne partitionBy. Als Ergebnis werden alle Daten auf eine einzelne Partition neu gemischt, lokal sortiert und mit einem einzigen Thread verarbeitet.

    • Erstellen von künstlichen Partitionen reflektieren erforderliches Aufzeichnungsbestell

      :

      Im Allgemeinen gibt es keine universelle Lösung, die mit nur Dataset API ansprechen kann, aber es gibt einige Tricks, die Sie verwenden können. Ich beschrieb diese Methode in meiner Antwort auf Avoid performance impact of a single partition mode in Spark window functions

      Eine ähnliche Methode könnte in Ihrem Fall verwendet werden, aber es würde mehrstufiger Prozess erfordern, der dem unten beschriebenen entspricht.

    • Mit assoziativen Methoden Sie zwei separate Scans über sortiert RDD verwenden können (es sollte möglich sein, ähnlich, was zu tun, ohne von Dataset als auch die Umwandlung) und zusätzliche Maßnahmen:

      • Compute Teilergebnisse für jede Partition (In Ihrem Fall Rang für eine bestimmte Partition).
      • Sammeln Sie die erforderlichen Zusammenfassungen (in Ihrem Fall Partitionsgrenzen und akkumulierten Rangwert für jede Partition).
      • Führen Sie einen zweiten Scan durch, um Partialaggregate von vorherigen Partitionen zu korrigieren.

    Ein Beispiel für diesen Ansatz, der leicht Ihren Fall passen angepasst werden kann, kann in How to compute cumulative sum using Spark

+0

Vielen Dank für Ihre Anregungen werden! Ich konnte die Datenimportzeit von MSSQL mit "partitionColumn" -Optionen in der JDBC-Datenquelle reduzieren. Aber die Empfehlungen für ein dichtes Ranking ohne partitionBy werden mehr Zeit für mich benötigen, um zu verdauen, da ich Scala ziemlich neu bin. Aber danke, dass du mich geführt hast! – Prash

Verwandte Themen