Ich versuche, von pub/sub mit dem folgenden CodeQuittieren Google Pub/Sub-Nachricht auf Apache Strahl
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
Im Vergleich zu lesen, was ich auf NodeJS erhalten, bekomme ich die Meldung, das in enthalten wäre das Feld data
. Wie bekomme ich das Feld ackId
(mit dem ich später die Nachricht bestätigen kann)? Die Attributkarte, die ich drucke, lautet null
. Gibt es eine andere Möglichkeit, alle Nachrichten zu bestätigen, ohne die ackId herauszufinden?
Ich benutze v0.6.0 – njLT
Welchen Läufer benutzen Sie? Ich glaube, dass die 'PubsubIO.read()' Nachrichten für Sie nach erfolgreicher Verarbeitung bestätigen sollte - sind Sie sicher, dass es notwendig ist, dass Sie sie selbst bestätigen? –
Ich benutze flink-runner. Es schien nicht so, als würden die Nachrichten bestätigt, aber ich werde es noch einmal überprüfen. – njLT