2017-10-09 5 views
4

Während in einer verteilten Verarbeitungsumgebung ist es üblich, "Teil" Dateinamen wie "part-000" zu verwenden, ist es möglich, eine Erweiterung einer Art zu schreiben, um die einzelnen Ausgabedateinamen (wie ein per Fenster Dateiname) von Apache Beam?Unterstützt Apache Beam benutzerdefinierte Dateinamen für die Ausgabe?

Dazu muss man möglicherweise einen Namen für ein Fenster vergeben oder einen Dateinamen basierend auf dem Inhalt des Fensters ableiten. Ich würde gerne wissen, ob ein solcher Ansatz möglich ist.

Zur Frage, ob die Lösung sollte Streaming oder Batch wird, ein Streaming-Modus Beispiel vorzuziehen ist

Antwort

1

Ja. documentation of TextIO Per:

Wenn Sie über eine bessere Kontrolle wollen, wie Dateinamen generiert werden als die Standardrichtlinie erlaubt, können Sie eine benutzerdefinierte FilenamePolicy auch TextIO.Write.to (FilenamePolicy)

+0

Könnten Sie bitte etwas Beispielcode geben? Als ich diesen Ansatz ausprobierte, bekam ich eine ClassCastException ... –

+0

Bitte fügen Sie Ihren Code und die komplette Stack-Trace des Fehlers, den Sie bekommen haben. – jkff

5

Ja Verwendung eingestellt werden, wie durch JKFF vorgeschlagen Sie können dies mit TextIO.write.to (DateinamePolicy) erreichen.

Beispiele sind unten:

Wenn Sie die Ausgabe auf bestimmte lokale Datei schreiben möchten, die Sie verwenden können.

lines.apply (TextIO.write() bis ("/ path/to/file.txt "));

Unten ist die einfache Möglichkeit, die Ausgabe mit dem Präfix link zu schreiben. Dieses Beispiel ist für den Google-Speicher gedacht, stattdessen können Sie lokale/s3-Pfade verwenden.

public class MinimalWordCountJava8 { 

    public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 
    // In order to run your pipeline, you need to make following runner specific changes: 
    // 
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner 
    // or FlinkRunner. 
    // CHANGE 2/3: Specify runner-required options. 
    // For BlockingDataflowRunner, set project and temp location as follows: 
    // DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); 
    // dataflowOptions.setRunner(BlockingDataflowRunner.class); 
    // dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); 
    // dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); 
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions} 
    // for more details. 
    // options.as(FlinkPipelineOptions.class) 
    //  .setRunner(FlinkRunner.class); 

    Pipeline p = Pipeline.create(options); 

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*")) 
    .apply(FlatMapElements 
     .into(TypeDescriptors.strings()) 
     .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+")))) 
    .apply(Filter.by((String word) -> !word.isEmpty())) 
    .apply(Count.<String>perElement()) 
    .apply(MapElements 
     .into(TypeDescriptors.strings()) 
     .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) 
    // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. 
    .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); 

    p.run().waitUntilFinish(); 
    } 
} 

This example code geben Ihnen mehr Kontrolle über den Ausgang zu schreiben:

/** 
    * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data 
    * being written. This always includes the shard number and the total number of shards. For 
    * windowed writes, it also includes the window and pane index (a sequence number assigned to each 
    * trigger firing). 
    */ 
    protected static class PerWindowFiles extends FilenamePolicy { 

    private final ResourceId prefix; 

    public PerWindowFiles(ResourceId prefix) { 
     this.prefix = prefix; 
    } 

    public String filenamePrefixForWindow(IntervalWindow window) { 
     String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename(); 
     return String.format(
      "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end())); 
    } 

    @Override 
    public ResourceId windowedFilename(int shardNumber, 
             int numShards, 
             BoundedWindow window, 
             PaneInfo paneInfo, 
             OutputFileHints outputFileHints) { 
     IntervalWindow intervalWindow = (IntervalWindow) window; 
     String filename = 
      String.format(
       "%s-%s-of-%s%s", 
       filenamePrefixForWindow(intervalWindow), 
       shardNumber, 
       numShards, 
       outputFileHints.getSuggestedFilenameSuffix()); 
     return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
    } 

    @Override 
    public ResourceId unwindowedFilename(
     int shardNumber, int numShards, OutputFileHints outputFileHints) { 
     throw new UnsupportedOperationException("Unsupported."); 
    } 
    } 

    @Override 
    public PDone expand(PCollection<InputT> teamAndScore) { 
    if (windowed) { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix)); 
    } else { 
     teamAndScore 
      .apply("ConvertToRow", ParDo.of(new BuildRowFn())) 
      .apply(TextIO.write().to(filenamePrefix)); 
    } 
    return PDone.in(teamAndScore.getPipeline()); 
    } 
+0

Danke! Sieht so aus als wäre das nicht in der Version 2.1.0. Und 2.2.0 ist noch nicht raus: https: //issues.apache.org/jira/projects/BEAM/versions/12341044 –

+0

Ich habe dies mit Beam Version 2.1.0 in einer lokalen Umgebung getestet, die perfekt funktioniert. –

+1

Wie in der Dokumentation zu 2.1.0 ist dies verfügbar: https://beam.apache.org/documentation/sdks/javadoc/2.1.0/org/apache/beam/sdk/io/TextIO.html –

0

Das ist vollkommen gültig Beispiel mit Strahl 2.1.0. Sie können Ihre Daten aufrufen (PCollection).

import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; 
import org.apache.beam.sdk.io.fs.ResourceId; 
import org.apache.beam.sdk.transforms.display.DisplayData; 

@SuppressWarnings("serial") 
public class FilePolicyExample { 

    public static void main(String[] args) { 
     FilenamePolicy policy = new WindowedFilenamePolicy("somePrefix"); 

     //data 
     data.apply(TextIO.write().to("your_DIRECTORY") 
      .withFilenamePolicy(policy) 
      .withWindowedWrites() 
      .withNumShards(4)); 

    } 

    private static class WindowedFilenamePolicy extends FilenamePolicy { 

     final String outputFilePrefix; 

     WindowedFilenamePolicy(String outputFilePrefix) { 
      this.outputFilePrefix = outputFilePrefix; 
     } 

     @Override 
     public ResourceId windowedFilename(
       ResourceId outputDirectory, WindowedContext input, String extension) { 
      String filename = String.format(
        "%s-%s-%s-of-%s-pane-%s%s%s", 
        outputFilePrefix, 
        input.getWindow(), 
        input.getShardNumber(), 
        input.getNumShards() - 1, 
        input.getPaneInfo().getIndex(), 
        input.getPaneInfo().isLast() ? "-final" : "", 
        extension); 
      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE); 
     } 

     @Override 
     public ResourceId unwindowedFilename(
       ResourceId outputDirectory, Context input, String extension) { 
      throw new UnsupportedOperationException("Expecting windowed outputs only"); 
     } 

     @Override 
     public void populateDisplayData(DisplayData.Builder builder) { 
      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix) 
        .withLabel("File Name Prefix")); 
     } 
    } 
} 
Verwandte Themen