2015-10-12 4 views
20

Ich habe ein Problem mit dem Ausgleich von Apache Spark-Jobs Ressourcen auf YARN Fair Scheduled Warteschlangen.YARN Preempting Ressourcen basierend auf fairen Aktien beim Ausführen eines Spark-Jobs

Für die Tests habe ich konfiguriert Hadoop 2.6 (versuchte 2.7 auch) im pseudo-verteilten Modus mit lokalen HDFS auf MacOS laufen. Für die Jobeinreichung wurde "Pre-build Spark 1.4 für Hadoop 2.6 und höher" (auch 1.5) verwendet. Verteilung von .

Beim Testen mit Hadoop MapReduce-Grundeinstellungen funktioniert Fair Scheduler wie erwartet: Wenn Ressourcen des Clusters ein Maximum überschreiten, werden gerechte Anteile berechnet und Ressourcen für Jobs in verschiedenen Warteschlangen werden basierend auf diesen Berechnungen ausgeschlossen und ausgeglichen.

Derselbe Test wird mit Spark-Jobs ausgeführt. In diesem Fall führt YARN korrekte Berechnungen der fairen Freigaben für jeden Job durch, aber Ressourcen für Spark-Container werden nicht neu ausgeglichen.

Hier sind meine conf Dateien:

$ HADOOP_HOME/etc/hadoop/Garn-site.xml

<?xml version="1.0" encoding="UTF-8"?> 
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
<configuration> 
    <property> 
     <name>yarn.nodemanager.aux-services</name> 
     <value>mapreduce_shuffle</value> 
    </property> 
    <property> 
     <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> 
     <value>org.apache.spark.network.yarn.YarnShuffleService</value> 
    </property> 
    <property> 
     <name>yarn.resourcemanager.scheduler.class</name> 
     <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value> 
    </property> 
    <property> 
     <name>yarn.scheduler.fair.preemption</name> 
     <value>true</value> 
    </property> 
</configuration> 

$ HADOOP_HOME/etc/hadoop/fair-scheduler.xml

<?xml version="1.0" encoding="UTF-8"?> 
<allocations> 
    <defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy> 
    <queue name="prod"> 
     <weight>40</weight> 
     <schedulingPolicy>fifo</schedulingPolicy> 
    </queue> 
    <queue name="dev"> 
     <weight>60</weight> 
     <queue name="eng" /> 
     <queue name="science" /> 
    </queue> 
    <queuePlacementPolicy> 
     <rule name="specified" create="false" /> 
     <rule name="primaryGroup" create="false" /> 
     <rule name="default" queue="dev.eng" /> 
    </queuePlacementPolicy> 
</allocations> 

$ HADOOP_HOME/etc/hadoop/Kern-site.xml

<?xml version="1.0" encoding="UTF-8"?> 
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
<configuration> 
    <property> 
     <name>fs.defaultFS</name> 
     <value>hdfs://localhost:9000</value> 
    </property> 
</configuration> 

$ HADOOP_HOME/etc/hadoop/Kern-site.xml

<?xml version="1.0" encoding="UTF-8"?> 
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> 
<configuration> 
    <property> 
     <name>dfs.replication</name> 
     <value>1</value> 
    </property> 
</configuration> 

Und der Testfall ist:

Führen Sie einen Job auf der "prod" Warteschlange mit Gewicht 40 (müssen 40% aller zuteilen Ressourcen), benötigt der Job wie erwartet alle benötigten freien Ressourcen (62,5% der Cluster-Ressourcen).

./bin/spark-submit --class org.apache.spark.examples.SparkPi \ 
--master yarn-cluster \ 
--driver-memory 512M \ 
--executor-memory 768M \ 
--executor-cores 1 \ 
--num-executors 2 \ 
--queue prod \ 
lib/spark-examples*.jar 100000 

Danach die gleiche Arbeit auf der „dev.eng“ Warteschlange laufen mit dem Gewicht 60, dass der Job bedeutet 60% aller Ressourcen zuteilen müssen und verringern die Ressourcen des ersten Job zu ~ 40%.

./bin/spark-submit --class org.apache.spark.examples.SparkPi \ 
--master yarn-cluster \ 
--driver-memory 512M \ 
--executor-memory 768M \ 
--executor-cores 1 \ 
--num-executors 2 \ 
--queue dev.eng \ 
lib/spark-examples*.jar 100000 

Leider ändern sich Clusterressourcen nicht - 62,5% für den ersten Job und 37,5% für den zweiten Job.

Antwort

0

Fair Scheduler tötet keine Container für den ersten Job, es wartet nur, bis einige Ressourcen frei sind, und reserviert sie für den zweiten Job. Wenn vom ersten Job keine Ressourcen frei sind, kann der Scheduler diese Ressourcen dem zweiten Job nicht zuweisen. In MapReduce-Jobs muss jeder Map- oder Reduzierungs-Task einen neuen Container instanziieren, und der Scheduler kann den Job blockieren, um neue Container zu instanziieren, wenn er sein Angebot überschritten hat (basierend auf der Warteschlangenkapazität).

In Spark sind die Dinge anders, die Executoren werden am Anfang des Jobs initiiert und die verschiedenen Aufgaben (Phasen) werden an sie gesendet. Dann sind die Ressourcen nicht frei und sie können nicht neu zugewiesen werden.

Kann dynamische Zuordnung sein helfen könnte: http://spark.apache.org/docs/1.6.1/configuration.html#dynamic-allocation

+0

Eigentlich tötet es Container auf den ersten Job. Alles hängt davon ab, wie Sie Vorkaufsrechte einrichten. – tk421

4

Sie müssen in Ihrer Zuordnung xml einer der preemption Timeouts setzen. Eine für den Mindestanteil und eine für den fairen Anteil, beides in Sekunden. Standardmäßig sind die Timeouts nicht festgelegt.

Von Hadoop: The Definitive Guide 4th Edition

Wenn eine Warteschlange als Mindestanteil Vorkaufsrecht Timeout so lange wartet, ohne dass seine Mindestanteil garantiert empfängt, dann kann der Scheduler andere Behälter präjudizieren. Das Standardzeitlimit wird für alle Warteschlangen über das oberste Element defaultMinSharePreemptionTimeout in der Zuweisungsdatei und pro Warteschlange festgelegt, indem das Element minSharePreemptionTimeout für eine Warteschlange festgelegt wird.

Wenn eine Warteschlange so lange unter der Hälfte ihres fairen Anteils bleibt wie als Fair-Share-Preemption-Timeout, dann kann der Scheduler andere Container vorbesetzen . Das Standardzeitlimit wird für alle Warteschlangen über das oberste Element defaultFairSharePreemptionTimeout in der Zuweisungsdatei und pro Warteschlange festgelegt, indem fairSharePreemptionTimeout in einer Warteschlange festgelegt wird. Der Schwellenwert kann auch von seinem Standardwert 0,5 durch Festlegen von defaultFairSharePreemptionThreshold und fairSharePreemptionThreshold (pro Warteschlange) geändert werden.

+0

Was wären gültige Timeout-Werte? –

Verwandte Themen