2016-10-11 3 views
1

Auf Garn-Cluster verwende ich kafka directream als Eingabe (ex.batch Zeit ist 15s), und möchte die Eingabe msg in separaten userIds aggregieren. Also verwende ich Stateful Streaming API wie updateStateByKey oder mapWithState. Aber aus der API-Quelle, sehe ich, dass die Standard-Checkpoint-Dauer mapWithState ist BatchDarstellung * 10 (in meinem Fall 150 s), und in Kafka DirectStream der Partition Offset ist jeder Batch (15 s). Tatsächlich kann jeder dstream eine unterschiedliche Checkpoint-Dauer setzen. Also, meine Frage ist:Wie kann ich die Prüfpunktaufzeichnung verstehen, wenn Kafka Direct InputDstream und Stateful Stream Transformation verwendet wird?

Wenn Streaming-App abgestürzt ist, starte ich es neu, der Kafka-Offset und State Stream Rdd sind asynchron in Checkpoint, in diesem Fall, wie kann ich keine Daten verlieren verlieren? Oder ich verstehe den Checkpoint-Mechanismus falsch?

Antwort

1

Wie kann ich keine Daten verlieren?

Stateful Ströme wie mapWithState oder updateStateByKeyerfordern Ihnen ein Checkpoint-Verzeichnis zur Verfügung zu stellen, denn das Teil ist, wie sie arbeiten, speichern sie den Zustand jeden Zwischen in der Lage sein, den Zustand bei einem Absturz zu erholen.

Darüber hinaus ist jede DStream in der Kette frei, Checkpointing als auch zu fragen, Frage ist "müssen Sie wirklich Checkpoint andere Streams"?

Wenn eine Anwendung abstürzt, nimmt Spark alle Status-RDDs, die im Checkpoint gespeichert sind, und bringt sie zurück in den Speicher, so dass Ihre Daten dort so gut sind wie beim letzten Funken Checkpoint. Eine Sache, die Sie in Erinnerung behalten sollten, ist, wenn Sie Ihren Anwendungscode ändern, können Sie den Status von Checkpoint nicht wiederherstellen, müssen Sie es löschen. Das heißt, wenn Sie zum Beispiel ein Versions-Upgrade durchführen müssen, sind alle Daten, die zuvor in diesem Zustand gespeichert wurden, verschwunden, es sei denn, Sie speichern sie manuell in einer Weise, die die Versionierung ermöglicht.

+0

danke für die Antwort :) In meinem Fall habe ich den Fall behandelt, wenn App-Version update.Aber was ich wirklich wissen möchte ist, dass zum Beispiel Startzeit 0s, Batch-Zeit ist 15s, mapWithState Checkpoint-Intervall ist 150s .So im Fall, dass Streaming-App zum Zeitpunkt 200s abgestürzt, beim Neustart ohne neu kompiliert, die Stateful-Stream-Wiederherstellung auf Daten bei 150s, und Kafka-Offset Datensatzwiederherstellung auf Zeit bei 195s. Habe ich die Kafka-Eingabedaten bei 165s verloren, 180s? True, wie kann ich das vermeiden? Danke ~ –

+0

@HengSu Wenn der Checkpoint bei 150s war, dann, wenn Sie wiederherstellen, wird es wieder von 150 vorwärts gehen. Dies bedeutet, dass Sie wahrscheinlich Daten verarbeiten, die bereits verarbeitet wurden, aber so funktioniert Checkpointing. Kafka-Offsets werden auch in den Checkpoint-Daten gespeichert. –

+0

@ Yuval Itzchakov Sorry, ich war ein wenig verwirrt.Ist es richtig, dass der Kafka-Offset wird jede Batch (15s) überprüft? Wenn MapWithStateDStream zu den Daten bei 150s wiederherstellen, meinen Sie, dass die Kafka Offset auch auf 150s anders als wiederherstellen 1955? Also habe ich keine Daten verloren, nur einige Chargen von ihnen neu aufbereiten? –

Verwandte Themen