2016-04-29 12 views
1

Ich habe ein benutzerdefiniertes Dateiformat in Google Cloud Storage und möchte es von Google DataFlow lesen.So implementieren Sie einen benutzerdefinierten Dateiparser in Google DataFlow für eine Google Cloud Storage-Datei

Ich habe eine Quelle und einen Reader durch Subklassifizierung von FileBasedReader implementiert, aber dann erkannte ich, dass das Lesen von Google Cloud Storage nicht unterstützt wurde (während FileBasedSink das tatsächlich tut ...), also bin ich mir nicht sicher, was die beste Idee ist um das hier zu lösen ...

Ich habe versucht, TextIO zu untergliedern, aber ich konnte damit kein Ende erreichen, da es anscheinend nicht für Unterklassen entwickelt wurde.

Eine gute Idee, wie man damit umgeht?

Danke.

Update zu den Kommentaren zu reflektieren

Dateimuster verwendet: gs://mybucket/my.json

der Source-Klasse von FileBasedSource Implementiert:

MessageSource<T> extends FileBasedSource<T> 

der Reader-Klasse implementiert (was ich kümmern uns um hier wirklich) aus FileBasedReader:

MessageReader<T> extends FileBasedReader<T> 

Verfahren zum Lesen ist:

MySource source = // instantiate source 
Pipeline p = Pipeline.create(options); 
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData")) 
    .apply(ParDo.of(new DoFn<String, String>() { 

Und die getSource() stammt aus dieser Befehlszeilenparameter (verifiziert korrekt):

--source=gs://${BUCKET_NAME}/my.json \ 

Bin ich etwas fehlt?

2. UPDATE

Während source.getEstimatedSizeBytes(options) läuft es sagt mir kein Handler gefunden?

java.io.IOException: Unable to find handler for gs://mybucket/my.json 
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186) 
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182) 
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:497) 
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
at java.lang.Thread.run(Thread.java:745) 

Ich dachte, die FileBasedSource sollte GCS behandeln?

+1

Hmm, FileBasedReader ist absolut dazu gedacht, mit Google Cloud Storage verwendet zu werden. Können Sie Ihre Frage bearbeiten, um zu klären, welche Probleme bei der Verwendung aufgetreten sind? – jkff

+0

Hmm ... Ich könnte es verpasst haben, aber ich konnte keinen Verweis auf GCS in den Quellen sehen? Unabhängig davon, das Problem, das ich habe, ist die Datei wird nie geladen, ohne dass ein Fehler ausgelöst wird. Ich bin mir nicht sicher, wie das zu debuggen ist.Ich habe auch kein Beispiel mit FileBasedReader gefunden, um das Gleiche zu tun. Ich werde versuchen, die Frage zu aktualisieren, um dies im Lichte Ihres Kommentars zu reflektieren @ jkff – nembleton

+0

Yup, bitte aktualisieren Sie die Frage mit mehr Details - nicht viel kann ich ohne das tun. Tatsächlich sind fast alle Quellen, einschließlich TextIO, unter Verwendung von FileBasedSource/Reader unter der Haube implementiert. – jkff

Antwort

2

Von der Stapelverfolgung, die Sie in "2nd Update" anzeigen, sieht es so aus, als hätten Sie getEstimatedSizeBytes direkt von Ihrer main() Methode aufgerufen. Dies wird voraussichtlich zu dem Fehler führen, den Sie sehen.

Die standardmäßigen URL-Schema-Handler werden registriert, wenn ein Pipeline-Runner erstellt wird. In Ihrem Beispielcode würde das passieren, wenn Sie Pipeline.create(options) aufrufen (dies ruft PipelineRunner.fromOptions(options) auf, wo die Standardhandler registriert sind). Wenn Sie die Standard-URL-Schemas in einem anderen Kontext als dem Ausführen einer Pipeline registrieren möchten, können Sie explizit IOChannelUtils.registerStandardIOFactories() aufrufen. Ich sollte beachten, dass dies keine unterstützte API ist, sondern ein bisschen "unter die Haube" gelangt. Als solche kann es sich jederzeit ändern.

+0

Ja, Sie haben Recht. Danke für die Details. Ich habe das später erkannt und es wurde vom Google-Support bestätigt. Es war leider nicht mit dem Grundproblem verbunden. – nembleton

Verwandte Themen