2016-07-19 4 views
0

Ich habe Probleme mit einem einfachen Spark Job von mir, der nach der Vereinfachung so aussieht.Kann eine spekulative Aufgabe kurz ausgeführt werden, nachdem ein Spark-Job zurückgegeben wurde?

JavaRDD<ObjectNode> rdd = pullAndProcessData(); 
ManifestFilesystem fs = getOutputFS(); 
List<WriteObjectResult> writeObjectResults = rdd.mapPartitions(fs::write).collect(); 
fs.writeManifest(Manifest.makeManifest(writeObjectResults)); 

mit diesem Code Meine Erwartung ist, dass alles, was geschieht, writeManifest genannt werden wird, wenn und nur wenn alle Aufgaben fertig sind und ihre Partition S3 erfolgreich geschrieben haben. Das Problem ist, dass anscheinend einige Aufgaben nach dem Manifest in S3 schreiben, was niemals passieren sollte.

In ManifestFilesystem.write, lösche ich die vorhandene Manifest (wenn es einen gibt) es ungültig zu machen, weil der normale Arbeitsablauf sein sollte:

  • schreiben alle Partitionen bis S3
  • das Manifest zu S3 schreiben

ich zu ahnen es wegen spekuliert Aufgaben passieren könnte, in dem folgende Szenario:

  • einige Aufgaben sind speculatable markiert und erneut senden, um andere Sklaven
  • alle spekulierten Aufgaben Rendite mindestens einen Slave sie gesendet wurden, aber einige von ihnen halten auf langsamen Sklaven
  • Funken laufen nicht über die Aufgaben unterbrechen oder gibt das Ergebnis von collect dem Fahrer, bevor die Aufgaben
  • die spekulierten Aufgaben unterbrochen werden, die immer noch das Manifest schließlich ausführen ManifestTimeslice.write und löschen ausgeführt wurden, bevor sie ihre Partition zu schreiben

Ist das etwas, das passieren kann? Hat jemand eine andere Hypothese für ein solches Verhalten?

Hinweis: mit integrierten Daten Publishing Methoden ist keine Option

Anmerkung 2: Ich fand this die meine Intuition zu bestätigen neigt, aber es wäre immer noch groß sein, um eine Bestätigung zu haben, weil Ich verwende keine Standard-HDFS- oder S3-Lese-/Schreibmethoden aus Gründen, die außerhalb des Bereichs dieser Frage liegen.

+0

Hallo, können Sie bitte posten Sie Ihren vollständigen Code und Protokolle einen tiefen Tauchgang zu nehmen? –

+0

Hallo Praveen, danke für den Vorschlag, aber ich werde in dieser Frage nicht so weit gehen. Ich habe meinen eigenen tiefen Tauchgang in den Logs gemacht und so kam ich auf diese Hypothese. Ich konnte einfach nicht beweisen, dass es die Hauptursache war, und ich wollte überprüfen, ob es eine vernünftige Erklärung war. Es ist das erste Mal, dass ich auf diese Art von Interaktionen zwischen Nebenaufgaben und spekulativen Aufgaben stoße. – Dici

Antwort

0

Ich werde meine eigene Frage beantworten, nachdem ich festgestellt habe, dass es aus der Sicht von Spark nicht anders ging: Wie würdest du sicherstellen, dass du alle spekulativen Aufgaben beendest, bevor sie die Zeit haben, sie abzuschließen? Es ist sogar besser, sie vollständig laufen zu lassen, sonst könnten sie beim Schreiben in eine Datei, die dann abgeschnitten würde, getötet werden.

Es gibt verschiedene mögliche Ansätze:

  • ein paar Nachrichten in this thread legen nahe, dass eine gängige Praxis ist, in einen temporären Versuch Datei zu schreiben, bevor ein Atomumbenennungs (billig auf den meisten Dateisystemen durchführen, weil es eine ist bloßer Zeigerschalter).Wenn eine spekulative Task versucht, ihre temporäre Datei in einen vorhandenen Namen umzubenennen, der nicht gleichzeitig ausgeführt wird, wenn die Operation atomisch ist, wird die Umbenennungsanforderung ignoriert und die temporäre Datei gelöscht.

  • meines Wissens, S3 bietet keine atomare Umbenennung. Obwohl der oben beschriebene Prozess relativ einfach zu implementieren ist, versuchen wir derzeit, Homebrew-Lösungen auf das Maximum zu beschränken und das System einfach zu halten. Daher wird meine endgültige Lösung darin bestehen, einen jobId (zum Beispiel den Zeitstempel, zu dem der Job gestartet wurde) zu verwenden und ihn an die Slaves weiterzugeben sowie ihn in das Manifest zu schreiben. Wenn Sie eine Datei mit der FS schreibt, wird die folgende Logik angewandt:

    public WriteObjectResult write(File localTempFile, long jobId) { 
        // cheap operation to check if the manifest is already there 
        if (manifestsExists()) { 
         long manifestJobId = Integer.parseInt(getManifestMetadata().get("jobId")); 
         if (manifestJobId == jobId) { 
          log.warn("Job " + jobId + " has already completed successfully and published a manifest. Ignoring write request." 
          return null; 
         } 
         log.info("A manifest has already been published by job " + jobId + " for this dataset. Invalidating manifest."); 
         deleteExistingManifest(); 
        }  
        return publish(localTempFile); 
    } 
    
1

Spark spekulative Aufgaben nicht proaktiv zu erledigen. Es wartet nur bis die Aufgabe beendet ist und ignoriert das Ergebnis. Ich denke, es ist durchaus möglich, dass Ihre spekulativen Aufgaben nach dem Aufruf collect weiterschreiben.

+0

Ja, genau wie ich dachte = (Ich denke, es kann nicht geholfen werden, es gibt keine Möglichkeit, solche Randfälle zu vermeiden. Ich muss dafür eine benutzerdefinierte Logik hinzufügen. Danke für deine Antwort – Dici

Verwandte Themen