Ich benutze Spark 1.6.2, Scala 2.10.5 und Java 1.7.Spark dichte_Rank Fensterfunktion - ohne eine PartitionBy-Klausel
Unser Anwendungsfall erfordert, dass wir bei einem Datensatz von mehr als 200 Millionen Zeilen den Befehl dese_rank() ausführen, ohne die Klausel partitionBy zu verwenden. Nur die Klausel orderBy wird verwendet. Dies läuft derzeit in MSSQL und dauert etwa 30 Minuten.
Ich habe die entsprechende Logik in Funken umgesetzt, wie unten gezeigt:
val df1 = hqlContext.read.format("jdbc").options(
Map("url" -> url, "driver" -> driver,
"dbtable" -> "(select * from OwnershipStandardization_PositionSequence_tbl) as ps")).load()
df1.cache()
val df1_drnk = df1.withColumn("standardizationId",denseRank().over(Window.orderBy("ownerObjectId","securityId","periodId")))
ich den Job in Yarn-Cluster-Modus vorlege, wie unten gezeigt. Ich habe einen 2-Knoten Hadoop 2.6 Cluster mit jeweils 4 vCores und 32 GB Speicher.
spark-submit --class com.spgmi.csd.OshpStdCarryOver --master yarn --deploy-mode cluster --conf spark.yarn.executor.memoryOverhead=3072 --num-executors 2 --executor-cores 3 --driver-memory 7g --executor-memory 16g --jars $SPARK_HOME/lib/datanucleus-api-jdo-3.2.6.jar,$SPARK_HOME/lib/datanucleus-core-3.2.10.jar,$SPARK_HOME/lib/datanucleus-rdbms-3.2.9.jar,/usr/share/java/sqljdbc_4.1/enu/sqljdbc41.jar --files $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/lib/spark-poc2-14.0.0.jar
In den Protokollen, kann ich sehen, dass in 15 Minuten in Spark-& zwischengespeichert wird die Tabelle von etwa 200 mil Reihen von MSSQL importiert zu werden. Ich sehe, dass bis zu diesem Zeitpunkt ungefähr 5 GB Speicher verwendet werden und ungefähr 6,2 GB Speicher noch immer auf einem der Executoren frei sind und 11 GB Speicher auf dem anderen Executor frei sind.
Aber der Schritt bei DENSE_RANK() wird immer mit Versagen „GC Overhead Grenze überschritten“ Fehler nach wenigen Minuten. Ich habe sogar den Treiberspeicher so hoch wie 7g gesetzt, wie Sie oben im Befehl Funkenübergabe feststellen können. Aber vergebens! Natürlich verstehe ich, dass das Fehlen der PartitionBy-Klausel tatsächlich Probleme in Spark verursacht. Aber das ist leider der Anwendungsfall, mit dem wir uns befassen müssen.
Können Sie bitte etwas Licht hier werfen? Fehle ich etwas? Gibt es eine Alternative zur Verwendung der dose_rank-Fensterfunktion in Spark? Wie zum Beispiel mit der Funktion "zipWithIndex", die von einem der Experten in diesem Forum vorgeschlagen wurde? Wird es zu den gleichen Ergebnissen wie bei dichtem_Rank führen, wenn ich weiß, dass die "zipWithIndex" -Methode die Funktion row_number() im Gegensatz zu dicht_Rank repliziert?
Jeder hilfreiche Rat wird geschätzt! Vielen Dank!
Vielen Dank für Ihre Anregungen werden! Ich konnte die Datenimportzeit von MSSQL mit "partitionColumn" -Optionen in der JDBC-Datenquelle reduzieren. Aber die Empfehlungen für ein dichtes Ranking ohne partitionBy werden mehr Zeit für mich benötigen, um zu verdauen, da ich Scala ziemlich neu bin. Aber danke, dass du mich geführt hast! – Prash