2015-10-29 6 views
13

In Apache Flink habe ich einen Strom von Tupeln. Nehmen wir an, ein wirklich einfaches Tuple1<String>. Das Tupel kann einen beliebigen Wert in seinem Wertfeld haben (z. B. "P1", "P2" usw.). Die Menge der möglichen Werte ist endlich, aber ich kenne die ganze Menge nicht vorher (es könnte also ein 'P362' sein). Ich möchte dieses Tupel an einen bestimmten Ausgabeort schreiben, abhängig vom Wert innerhalb des Tupels. So z.B. Ich möchte folgende Dateistruktur haben:Flink Streaming: Wie kann ein Datenstrom abhängig von den Daten an verschiedene Ausgänge ausgegeben werden?

  • /output/P1
  • /output/P2

In der Dokumentation I Möglichkeiten nur an Orte schreiben gefunden, die ich vorher wissen (zB stream.writeCsv("/output/somewhere")), aber keine Möglichkeit, den Inhalt der Daten entscheiden zu lassen, wo die Daten tatsächlich landen.

Ich habe in der Dokumentation über die Ausgabeaufteilung gelesen, aber dies scheint keine Möglichkeit zu bieten, die Ausgabe an andere Ziele umzuleiten, so wie ich es gerne hätte (oder ich verstehe einfach nicht, wie das funktionieren würde) .

Kann dies mit der Flink API gemacht werden, wenn ja, wie? Wenn nicht, gibt es vielleicht eine Bibliothek von Dritten, die das tun kann oder müsste ich so etwas selbst bauen?

aktualisiert

Nach Matthias' Vorschlag, den ich mit einer Siebung Senkenfunktion kam, die den Ausgangspfad bestimmt und dann nach der Serialisierung es das Tupel auf die jeweilige Datei schreibt. Ich lege es hier als Referenz, vielleicht ist es nützlich für jemand anderen:

public class SiftingSinkFunction<IT> extends RichSinkFunction<IT> { 

    private final OutputSelector<IT> outputSelector; 
    private final MapFunction<IT, String> serializationFunction; 
    private final String basePath; 
    Map<String, TextOutputFormat<String>> formats = new HashMap<>(); 

    /** 
    * @param outputSelector  the selector which determines into which output(s) a record is written. 
    * @param serializationFunction a function which serializes the record to a string. 
    * @param basePath    the base path for writing the records. It will be appended with the output selector. 
    */ 
    public SiftingSinkFunction(OutputSelector<IT> outputSelector, MapFunction<IT, String> serializationFunction, String basePath) { 
     this.outputSelector = outputSelector; 
     this.serializationFunction = serializationFunction; 
     this.basePath = basePath; 
    } 


    @Override 
    public void invoke(IT value) throws Exception { 
     // find out where to write. 
     Iterable<String> selection = outputSelector.select(value); 
     for (String s : selection) { 
      // ensure we have a format for this. 
      TextOutputFormat<String> destination = ensureDestinationExists(s); 
      // then serialize and write. 
      destination.writeRecord(serializationFunction.map(value)); 
     } 
    } 

    private TextOutputFormat<String> ensureDestinationExists(String selection) throws IOException { 
     // if we know the destination, we just return the format. 
     if (formats.containsKey(selection)) { 
      return formats.get(selection); 
     } 

     // create a new output format and initialize it from the context. 
     TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection)); 
     StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); 
     format.configure(context.getTaskStubParameters()); 
     format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks()); 

     // put it into our map. 
     formats.put(selection, format); 
     return format; 
    } 

    @Override 
    public void close() throws IOException { 
     Exception lastException = null; 
     try { 
      for (TextOutputFormat<String> format : formats.values()) { 
       try { 
        format.close(); 
       } catch (Exception e) { 
        lastException = e; 
        format.tryCleanupOnError(); 
       } 
      } 
     } finally { 
      formats.clear(); 
     } 

     if (lastException != null) { 
      throw new IOException("Close failed.", lastException); 
     } 
    } 
} 

Antwort

6

Sie können eine benutzerdefinierte Senke implementieren. Vererben von einem der beiden:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

in Ihrem Programm verwenden:

stream.addSink(SinkFunction<T> sinkFunction); 

statt stream.writeCsv("/output/somewhere").

+4

Vielen Dank! Ich überprüfte die Implementierung von 'FileSinkFunction' und entwickelte selbst etwas ähnliches. Ich habe die Implementierung meiner Frage als Referenz hinzugefügt. –

Verwandte Themen