2017-05-16 5 views
0

Ich versuche, von pub/sub mit dem folgenden CodeQuittieren Google Pub/Sub-Nachricht auf Apache Strahl

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() { 
    @Override 
    public String apply(PubsubMessage input) { 
     LOG.info("hola " + input.getAttributeMap()); 
     return new String(input.getMessage()); 
    } 
}); 
PCollection<String> pps = p.apply(pubsub) 
     .apply(
       Window.<String>into(
        FixedWindows.of(Duration.standardSeconds(15)))); 
pps.apply("printdata",ParDo.of(new DoFn<String, String>() { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
     LOG.info("hola amigo "+c.element()); 
     c.output(c.element()); 
    } 
    })); 

Im Vergleich zu lesen, was ich auf NodeJS erhalten, bekomme ich die Meldung, das in enthalten wäre das Feld data. Wie bekomme ich das Feld ackId (mit dem ich später die Nachricht bestätigen kann)? Die Attributkarte, die ich drucke, lautet null. Gibt es eine andere Möglichkeit, alle Nachrichten zu bestätigen, ohne die ackId herauszufinden?

+0

Ich benutze v0.6.0 – njLT

+1

Welchen Läufer benutzen Sie? Ich glaube, dass die 'PubsubIO.read()' Nachrichten für Sie nach erfolgreicher Verarbeitung bestätigen sollte - sind Sie sicher, dass es notwendig ist, dass Sie sie selbst bestätigen? –

+0

Ich benutze flink-runner. Es schien nicht so, als würden die Nachrichten bestätigt, aber ich werde es noch einmal überprüfen. – njLT

Antwort

0

Der Leser PubsubIO ist für das Quittieren von Nachrichten zuständig. Es ist an das Checkpoint-Verhalten des Läufers gebunden. Insbesondere quittiert die Quelle Nachrichten nur, wenn die resultierenden Elemente überprüft wurden.

In diesem Fall sollten Sie sich ansehen, wann der Flink-Runner den Status dieser Quelle überprüft. Ich glaube, das hängt mit der Flink-Konfiguration für die Checkpoint-Frequenz zusammen.

Verwandte Themen