2017-12-14 4 views
0

Ich verwende derzeit Apache Beam mit Google Dataflow für die Verarbeitung von Echtzeitdaten. Die Daten stammen von Google PubSub, das unbegrenzt ist, daher verwende ich derzeit Streaming-Pipeline. Es stellt sich jedoch heraus, dass eine 24/7 Streaming-Pipeline sehr teuer ist. Um Kosten zu reduzieren, denke ich daran, zu einer Batch-Pipeline zu wechseln, die in einem festen Zeitintervall (z. B. alle 30 Minuten) läuft, da es für die Verarbeitung nicht wirklich wichtig ist, Echtzeit für den Benutzer zu sein.Apache Beam: Batch-Pipeline mit unbegrenzter Quelle

Ich frage mich, ob es möglich ist, PubSub-Abonnement als eine begrenzte Quelle zu verwenden? Meine Idee ist, dass jedes Mal, wenn der Job ausgeführt wird, die Daten eine Minute lang gesammelt werden, bevor sie ausgelöst werden. Bisher scheint es nicht möglich zu sein, aber ich bin auf eine Klasse namens BoundedReadFromUnboundedSource gestoßen (von der ich keine Ahnung habe, wie ich sie benutzen soll), also gibt es vielleicht einen Weg?

Im Folgenden finden Sie ungefähr, wie die Quelle wie folgt aussieht:

PCollection<MyData> data = pipeline 
      .apply("ReadData", PubsubIO 
        .readMessagesWithAttributes() 
        .fromSubscription(options.getInput())) 
      .apply("ParseData", ParDo.of(new ParseMyDataFn())) 
      .apply("Window", Window 
        .<MyData>into(new GlobalWindows()) 
        .triggering(Repeatedly 
          .forever(AfterProcessingTime 
            .pastFirstElementInPane() 
            .plusDelayOf(Duration.standardSeconds(5)) 
          ) 
        ) 
        .withAllowedLateness(Duration.ZERO).discardingFiredPanes() 
      ); 

Ich habe versucht, die folgenden zu tun, aber der Job läuft noch im Streaming-Modus:

PCollection<MyData> data = pipeline 
      .apply("ReadData", PubsubIO 
        .readMessagesWithAttributes() 
        .fromSubscription(options.getInput())) 
      .apply("ParseData", ParDo.of(new ParseMyDataFn())) 

      // Is there a way to make the window trigger once and turning it into a bounded source? 
      .apply("Window", Window 
        .<MyData>into(new GlobalWindows()) 
        .triggering(AfterProcessingTime 
         .pastFirstElementInPane() 
         .plusDelayOf(Duration.standardMinutes(1)) 
        ) 
        .withAllowedLateness(Duration.ZERO).discardingFiredPanes() 
      ); 
+0

Fügen Sie relevante Teile Ihres aktuellen Codes hinzu, um bessere Antworten zu erhalten –

+0

@TahirAkhtar Ich habe Code hinzugefügt, um meine Frage besser zu illustrieren. –

Antwort

0

Dies wird nicht explizit in PubsubIO unterstützt Derzeit könnten Sie jedoch versuchen, in regelmäßigen Abständen einen Streaming-Job zu starten und einige Minuten später drain programmgesteuert Drain zu starten.