2015-08-28 8 views
6

Ich habe eine Streaming-Pipeline angeschlossen an Pub/Sub, die Dateinamen von GCS-Dateien veröffentlicht. Von dort möchte ich jede Datei lesen und die Ereignisse in jeder Zeile analysieren (die Ereignisse sind, was ich letztendlich verarbeiten möchte).Dateien aus einer PCollection von GCS-Dateinamen in Pipeline lesen?

Kann ich TextIO verwenden? Können Sie es in einer Streaming-Pipeline verwenden, wenn der Dateiname während der Ausführung definiert wird (im Gegensatz zur Verwendung von TextIO als Quelle und der Datei (en) sind bei der Konstruktion bekannt)? Wenn nicht ich denke an folgend etwas wie das tun:

das Thema aus pub/sub Pardo Erhalten Sie jede Datei zu lesen und die Linien Prozess der Zeilen der Datei bekommen ...

Kann ich Verwenden Sie den FileBasedReader oder etwas ähnliches in diesem Fall, um die Dateien zu lesen? Die Dateien sind nicht zu groß, so dass ich nicht das Lesen einer einzelnen Datei parallelisieren müsste, aber ich müsste viele Dateien lesen.

+0

Wir sind kurz davor, ausreichende API-Unterstützung zu haben, um eine effiziente Implementierung dieser zu erstellen. Bitte folgen Sie https://issues.apache.org/jira/browse/BEAM-2511. TextIO sollte das Lesen einer PCollection von Dateinamen unterstützen. – jkff

+0

Ich habe meine Antwort bearbeitet, um die neue API wiederzugeben. – jkff

Antwort

3

Sie können die TextIO.readAll() Transformation verwenden, die kürzlich zu Beam in #3443 hinzugefügt wurde. Beispiel:

PCollection<String> filenames = p.apply(PubsubIO.readStrings()...); 
PCollection<String> lines = filenames.apply(TextIO.readAll()); 

Dies wird alle Zeilen in jeder Datei lesen, die über Pubsub ankommen.

Verwandte Themen