2017-10-27 1 views
1

Bei Verwendung von Sitzung Windowing und Schreiben in eine Datei über TextIO.write in Apache Beam 2.0.0 wird die folgende Ausnahme durch Aufrufen von TextIO.write() erstellt:Schreiben über TextIO.write mit Sessions windowing löst GroupByKey-Verbrauchsausnahme

java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey

die Ausnahme tritt auf, auch wenn es keine dazwischen GroupByKey s potenziell das Fenster verbrauchen. Ich habe Code hinzugefügt - die Hauptfunktion veranschaulicht das Problem und enthält eine Helper Policy Writer-Klasse für 2.0.0.

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.io.FileBasedSink; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.fs.ResolveOptions; 
import org.apache.beam.sdk.io.fs.ResourceId; 
import org.apache.beam.sdk.transforms.*; 
import org.apache.beam.sdk.transforms.windowing.*; 
import org.apache.beam.sdk.values.PCollection; 
import org.apache.beam.sdk.values.TimestampedValue; 
import org.joda.time.Duration; 
import org.joda.time.Instant; 
import org.joda.time.format.DateTimeFormatter; 
import org.joda.time.format.ISODateTimeFormat; 


public class TestSessionWindowToFile { 
    /** 
    * Support class: a filename policy for getting one file per window. 
    * See https://github.com/apache/beam/blob/release-2.0.0/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java 
    */ 
    public static class LocalPerWindowFiles extends FileBasedSink.FilenamePolicy { 
     private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute(); 
     private final String prefix; 

     public LocalPerWindowFiles(String prefix) { 
      this.prefix = prefix; 
     } 

     public String filenamePrefixForWindow(IntervalWindow window) { 
      return String.format("%s-%s-%s", 
        prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); 
     } 

     @Override 
     public ResourceId windowedFilename(
       ResourceId outputDirectory, WindowedContext context, String extension) { 
      IntervalWindow window = (IntervalWindow) context.getWindow(); 
      String filename = String.format(
        "%s-%s-of-%s%s", 
        filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), 
        extension); 
      return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); 
     } 

     @Override 
     public ResourceId unwindowedFilename(
       ResourceId outputDirectory, Context context, String extension) { 
      throw new UnsupportedOperationException("Unsupported."); 
     } 
    } 


    /** 
    * Creating a session windows and then asking TextIO to write the results leads to 
    * "java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. 
    * Invalid because: WindowFn has already been consumed by previous GroupByKey" 
    */ 
    public static void main(String[] args) { 
     Pipeline p = Pipeline.create(); 

     PCollection<String> input = p.apply(
       Create.timestamped(
         TimestampedValue.of("this", new Instant(1)), 
         TimestampedValue.of("is", new Instant(2)), 
         TimestampedValue.of("a", new Instant(3)), 
         TimestampedValue.of("test", new Instant(4)), 
         TimestampedValue.of("test", new Instant(5)), 
         TimestampedValue.of("test", new Instant(50)), 
         TimestampedValue.of("test", new Instant(51)), 
         TimestampedValue.of("test", new Instant(52)) 
       ) 
     ); 

     PCollection<String> windowedInputs = input 
       // session windowing fails: 
       .apply(Window.into(Sessions.withGapDuration(new org.joda.time.Duration(10)))); 
       // sliding windowing succeeds: 
       //.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10)))); 

     // Invoke counting of elements so that sessioning is more clear 
     PCollection<KV<String, Long>> counts = 
       windowedInputs.apply(Count.perElement()); 
     PCollection<String> writeableStrings = counts.apply("Convert to text", 
      ParDo.of(new DoFn<KV<String, Long>, String>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
       String word = c.element().getKey(); 
       Long count = c.element().getValue(); 
       c.output(String.format("%s,%d", word, count)); 
      } 
     })); 

     writeableStrings 
       .apply(TextIO.write() 
         .to("i_am_ignored_given_filename_policy") 
         .withFilenamePolicy(new LocalPerWindowFiles("results/testSessionWindow")) 
         .withWindowedWrites() 
         .withNumShards(1) 
     ); 
     p.run(); 
    } 
} 

Ich habe keine Wirkung aus gesehen Absichten um Wasserzeichen/Auslösung Klärung, Zeitstempel kombiniert, Window.remerge() ing, oder mit Strahl 2.1.0 (und Strahl 2.1.0 enthält eine Dateinamen Standardrichtlinie, dass weiß, wie man gefensterte Dateien sowie nicht entfernte Dateien schreibt).

Die Protokollierung zeigt, dass die Sitzungen korrekt aufgebaut sind und ein SlidingWindow erfolgreich die Ausgabedateien erzeugt (mit Hilfe von Varianten wie .apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10)))); anstelle von Sessions). Dies deutet auf eine falsch konfigurierte oder sich schlecht benehmende Interaktion der Sessions windowing + TextIO.write hin.

Wie kann dieser Code geändert werden, um eine Textdatei für jede Schlüssel + Start + Endfenstergruppierung zu schreiben?

Antwort

0

Dies ist ein Fehler in der WriteFiles-Transformation. Ich habe https://issues.apache.org/jira/browse/BEAM-3122 abgelegt. Leider kann ich mir keine Problemumgehung vorstellen, kurz nachdem ich den Fehler behoben habe.

+0

Ich schätze die Bestätigung, dass es sich nicht lohnt, damit weiter zu kämpfen - danke. Ich konnte keine klaren Käfer finden, als ich aussah (obwohl ich ein Babe im Wald bin, der mit dieser Codebasis anfängt), also wer auch immer das macht, bekommt alle meine besten Wünsche. – ppptomlin

+0

Können Sie zur Klärung dieses Problems klären, was Sie erreichen möchten? Windows ist immer per Schlüssel, nur einige Fensterstrategien machen dasselbe unabhängig für alle Schlüssel (z. B. verschiebbare und feste Fenster), aber für Sitzungsfenster ist der Schlüssel wichtig, z. Man kann sich an Benutzer-ID-Sitzungen denken. Versuchen Sie, eine Sitzungsfensterung durchzuführen, indem Sie das gesamte Dataset so behandeln, dass es zu einem einzelnen "Schlüssel" innerhalb von Sitzungen gehört? Oder haben Sie eine Art Gruppierungsschlüssel? – jkff

+0

Ich bearbeite das Beispiel so, dass es tatsächlich die Gruppierung des Wortes und die Berechnung der Anzahl der Male, die es erscheint, beinhaltet - ich war zu aggressiv, um das Beispiel zu reduzieren. Ich versuche tatsächlich, auf einen Schlüssel zu gruppieren und zählt. Zum Beispiel könnten wir bei t = 1, t = 2 und t = 10 Daten mit dem Schlüssel "earlySession" empfangen, und bei t = 5, t = 9 und t = 60 Daten mit dem Schlüssel "lateSession". Ich erwarte, dass die Sitzung mit einer Lücke von 25 t-Einheiten zu 3 Dateien führt: t_10_earlySession (Information von allen 3), t_9_lateSession (Information von den ersten 2) und t_60_lateSession (Information von der letzten 1). (Entschuldigung, ich habe das verpasst. Ich werde jetzt E-Mails erhalten.) – ppptomlin