0

Dies ist unser Setup, wir haben ein Thema und dieses Thema hat zwei Abonnements v1 und v2 mit exakt identischen Einstellungen, beide sind Pull-Abonnement mit 10 sek ack Frist.
Beide Subskriptionen v1 und v2 trennen dedizierte Datenflüsse, wobei der Datenfluss von v2 optimiert ist, aber ziemlich genau dasselbe tut.gcp PubSub bekommen "Verarbeitungsverzögerung war hoch"

Problem ist, dass wir hin und wieder sehen wir unten Warnmeldungen und Backlog beginnt nur in v2 Abonnement aufzubauen und v1 zeigt wenig bis keinen Rückstand.

08:53:56.000 ${MESSAGE_ID} Pubsub processing delay was high at 72 sec. 

Dataflow-Protokoll in v2 zeigt nichts offensichtliches außer den obigen Nachrichten. In der Tat ist v2 Datenfluss CPU-Nutzung niedriger als v1, so kann ich keinen Sinn daraus machen.

Fragen:

  1. Was Verzögerungsverarbeitung verursacht, und wie kann ich es beheben?
  2. Warum erhält das Abonnement der Version 1 nicht dieselben Warnungen?

bei 2017/01/17

via @ Ben es scheint, dass wir Wie vorgeschlagen aktualisiert, dass Pardo Filteroperation direkt nach PubSub Lese tun unerwartet hohe Latenz zu treffen. Aber unter Berücksichtigung getClassroomIds ist eine einfache Java-Liste Ich bin mir nicht sicher, wie ich dieses Problem angehen kann. Eine Frage ist, dass wir Coder auf Pubsub faul angewendet haben? Entpacken und Deserialisieren haben wir im Coder definiert, wenn ProcessContext#element() aufgerufen wird?

def processElement(c: DoFn[Entity, Entity]#ProcessContext) = { 
    val startTime = System.currentTimeMillis() 
    val entity = c.element() 
    if (!entity.getClassroomIds.isEmpty) { 
    c.output(entity) 
    } 

    val latencyMs = System.currentTimeMillis() - startTime 
    if (latencyMs > 1000) { 
    // We see this warning messages during the load spike 
    log.warn(s"latency breached 1 second threshold: $latencyMs ms") 
    } 
} 
+0

Job-IDs sind nützlich, um zu sehen, was mit diesen Läufen passiert ist. In der Regel bedeutet diese Meldung, dass die ParDo-Schritte, die unmittelbar auf die PubSub-Quelle folgen, langsam sind. Das Betrachten der "Shuffler" -Protokolle kann auch anzeigen, ob die Flusssteuerung Warteschlangenverzögerungen einführt. –

+0

@BenChambers ParDo direkt nach sollte sehr einfache Filterlogik, so dass ich weiß nicht, ob das der Fall ist. Allerdings ist stream eine verschlüsselte zip, die deserialisiert werden muss, damit sie dort langsamer wird. Aber dieser Teil sollte zwischen v1 und v2 gleich sein.Und wenn Sie meinen, dass Datenfluss-Protokolle durch das "Shuffler" -Protokoll funktionieren, dann habe ich sie und andere als gelegentliche GC-Protokolle und das Protokoll "Prozessverzögerung" oben betrachtet. Ich sehe nichts um die Ereigniszeitlinie herum. – codingtwinky

+0

v1 stream ist dataflow v1.6.1 und v2 ist dataflow v1.9.0, aber ich sehe keine offensichtliche Sache, die dazu führen könnte, dass v1.6.1 besser funktioniert – codingtwinky

Antwort

1

Das Timing Sie erwähnen Buchhaltung nicht genau für die Zeit in diesem Schritt ausgegeben. Insbesondere aufgrund der fusion optimization spiegelt es alle ParDo s nach dem Filtervorgang.

Wenn Sie eine Pipeline, die wie folgt aussieht:

ReadFromPubSub -> Pardo (Filter) -> Pardo (Teure) -> Pardo (Write)

Beide Expensive und Write ausführen auf jedes Element aus kommend der Filter vor dem Anruf an c.output zurückgibt. Dies ist ein weiteres Problem, da es in die Elemente verschmolzen wird, die aus PubSub kommen.

Die einfachste Lösung ist wahrscheinlich eine Reshuffle auszuführen:

pipeline 
    .apply(PubSubIO.read(...)) 
    .apply(keyEachElement by their value and Void) 
    .apply(new Reshuffle<E, Void>()) 
    .apply(move the key back to the element) 
    // rest of the pipeline 

Beachten Sie, dass die Reshuffle anstelle eines normalen GroupByKey mit netten Eigenschaften hat, da es löst schneller als jede normalerweise Feuer auslösen.

+0

Es gibt so viele Fragen jetzt ... Zwei Probleme haben wir, "Verarbeitung Verzögerung Warnung" und Rückstand Aufbau. "Verarbeitungsverzögerung" ist wegen pubsub Nachricht ack passiert am Ende expansive Datenfluss Schritte? Baut der Rückstand durch "Verarbeitungsverzögerung" auf? Mischen Sie auch effektiv, indem Sie die Nachricht quittieren, bevor die Schritte ausgeführt werden? – codingtwinky

+0

und warum ist nicht v1 mit v1.6.1 Datenfluss hat dieses Problem nicht? – codingtwinky

+0

Die PubSub-Nachricht wird quittiert, sobald die Daten an anderer Stelle "dauerhaft festgeschrieben" sind. In diesem Fall wird das Element innerhalb des Shuffle dauerhaft festgeschrieben, wodurch es dann bestätigt wird. Wenn die Pipeline nur eine Sequenz von ParDos ist, gibt es kein Commit. –