2016-10-26 2 views
0

Jetzt lerne ich Storms garantierte Nachrichtenverarbeitung und bin durch einige Konzepte in diesem Teil verwirrt.Verwechslung von Storm Acker und garantierte Nachrichtenverarbeitung

Um zu garantieren, dass eine Meldung von einer Tülle vollständig verarbeitet wird, verwendet Storm acker, um dies zu erreichen. Jedes Mal, wenn eine Tülle ein Tupel ausgibt, weist acker "ack val" als 0 initialisiert zu, um den Status des Tupel-Baums zu speichern. Jedes Mal, wenn die Downstream-Bots dieses Tupels ein neues Tupel ausgeben oder ein "altes" Tupel quittieren, wird die Tupel-ID XOR mit "ack val" sein. Der acker muss nur prüfen, ob "ack val" 0 ist oder nicht, dass das Tupel vollständig verarbeitet wurde. Lassen Sie uns den Code unten:

public class WordReader implements IRichSpout { 
    ... ... 
while((str = reader.readLine()) != null){ 
    this.collector.emit(new Values(str), str); 
    ... ... 
} 

Der Code Stück über eine Tülle in Wortzählimpulsprogramm von Tutorial „mit Sturm Getting Started“. In der emit-Methode ist der zweite Parameter "str" ​​die messageId. Ich bin verwirrt durch diesen Parameter: 1) Wie ich verstehe, sollte jedes Mal, wenn ein Tupel (d. H. Eine Nachricht) in Tüllen oder in Bolzen ausgegeben wird, Storm in der Verantwortung sein, dieser Nachricht eine 64-bit messageId zuzuweisen. Ist das korrekt? Oder ist hier "str" ​​nur ein lesbarer Alias ​​für diese Nachricht? 2) Egal, was auf 1) antwortet, hier wäre "str" ​​dasselbe Wort in zwei verschiedenen Nachrichten, weil in einer Textdatei viele doppelte Wörter vorkommen sollten. Wenn das wahr ist, wie unterscheidet Storm verschiedene Nachrichten? Und was bedeutet dieser Parameter? 3) In einigen Code Stück, sehe ich einige Ausläufe den folgenden Code verwenden, um die Nachricht Id in Spout einzustellen emittieren Methode:

public class RandomIntegerSpout extends BaseRichSpout { 
    private long msgId = 0; 
    collector.emit(new Values(..., ++msgId), msgId); 
} 

Das ist viel näher an, was ich denke, es sein sollte: die ID-Nachricht sein sollte völlig unterschiedlich über verschiedene Nachrichten. Aber für diesen Codestück ist eine weitere Verwirrung: Was passiert mit privatem Feld "msgId" über verschiedene Executoren hinweg? Da für jeden Executor eine eigene msgId mit 0 initialisiert wird, werden Nachrichten in verschiedenen Executoren mit den Namen 0, 1, 2 usw. benannt. Wie unterscheidet Storm dann diese Nachrichten?

Ich bin Anfänger in Storm, also sind diese Probleme vielleicht naiv. Hoffe jemand könnte mir helfen, es herauszufinden. Vielen Dank!

Antwort

0

Über Nachricht ID ist allgemein: intern könnte es ein 64-Bit-Wert sein, aber dieser 64-Bit-Wert wird als Hash aus msgID Objekt in emit() innerhalb Spout berechnet. Sie können also jedes Objekt als Nachrichten-ID übergeben (die Wahrscheinlichkeit, dass zwei Objekte, die auf denselben Wert synchronisiert sind, nahe bei Null liegen).

Über die Verwendung str: Ich denke, in diesem Beispiel enthält str eine Zeile (und kein Wort) und es ist sehr unwahrscheinlich, dass Dokument die exakt gleiche Zeile zweimal enthält (wenn es keine leeren Zeilen, die viele sein können).

Über den Zähler als Nachrichten-ID: Sie haben absolut Recht über Ihre Beobachtung - wenn mehrere Ausläufe parallel ausgeführt werden, würde dies zu einem Konflikt bei der Nachrichten-ID führen und die Fehlertoleranz beeinträchtigen.

Wenn Sie den Zähleransatz "reparieren" möchten, sollte jeder Zähler anders initialisiert werden (am besten von 1...#SpoutTasks). Dazu können Sie die TaskID verwenden (die eindeutig ist und über TopologyContext in Spout.open() erreichbar ist). Grundsätzlich erhalten Sie alle TaskIDs für alle parallelen Auslaufaufgaben, sortieren sie und weisen jeder Auslaufaufgabe ihre Bestellnummer zu. Außerdem müssen Sie anstelle von 1 um "Anzahl paralleler Ausläufe" erhöhen.

+0

Danke. Aber ich verwechsle immer noch die ersten 2 Absätze in Ihrer Antwort. – acekiller

+0

Für den ersten Absatz, es ist der 64-Bit-Wert von msgID berechnet, wie wäre es dann mit den Nachrichten in Bolt emittiert? Weil Bolt keine Nachrichten mit einer MsgID zuweisen wird.Wie ich dachte, werden die 64-Bit-Werte von Nachrichten in Spout und Bolt zufällig von Storm generiert. – acekiller

+0

Für den zweiten Absatz ist str in der Tat eine Linie, aber hier ist jede Linie ein einzelnes Wort, also gibt es viele Duplikate in diesen Wörtern über Linien. Wenn also der 64-Bit-Wert von str gehasht wird (wie Sie im ersten Absatz angegeben haben), dann können verschiedene Nachrichten die gleiche msgID und den gleichen 64bit-Wert haben. Habe ich recht? Vielen Dank! – acekiller