Ich habe ein benutzerdefiniertes Dateiformat in Google Cloud Storage und möchte es von Google DataFlow lesen.So implementieren Sie einen benutzerdefinierten Dateiparser in Google DataFlow für eine Google Cloud Storage-Datei
Ich habe eine Quelle und einen Reader durch Subklassifizierung von FileBasedReader implementiert, aber dann erkannte ich, dass das Lesen von Google Cloud Storage nicht unterstützt wurde (während FileBasedSink das tatsächlich tut ...), also bin ich mir nicht sicher, was die beste Idee ist um das hier zu lösen ...
Ich habe versucht, TextIO zu untergliedern, aber ich konnte damit kein Ende erreichen, da es anscheinend nicht für Unterklassen entwickelt wurde.
Eine gute Idee, wie man damit umgeht?
Danke.
Update zu den Kommentaren zu reflektieren
Dateimuster verwendet: gs://mybucket/my.json
der Source-Klasse von FileBasedSource Implementiert:
MessageSource<T> extends FileBasedSource<T>
der Reader-Klasse implementiert (was ich kümmern uns um hier wirklich) aus FileBasedReader:
MessageReader<T> extends FileBasedReader<T>
Verfahren zum Lesen ist:
MySource source = // instantiate source
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from(options.getSource()).named("ReadFileData"))
.apply(ParDo.of(new DoFn<String, String>() {
Und die getSource() stammt aus dieser Befehlszeilenparameter (verifiziert korrekt):
--source=gs://${BUCKET_NAME}/my.json \
Bin ich etwas fehlt?
2. UPDATE
Während source.getEstimatedSizeBytes(options)
läuft es sagt mir kein Handler gefunden?
java.io.IOException: Unable to find handler for gs://mybucket/my.json
at com.google.cloud.dataflow.sdk.util.IOChannelUtils.getFactory(IOChannelUtils.java:186)
at com.google.cloud.dataflow.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:182)
at com.etc.TrackingDataPipeline.main(TrackingDataPipeline.java:66)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Ich dachte, die FileBasedSource sollte GCS behandeln?
Hmm, FileBasedReader ist absolut dazu gedacht, mit Google Cloud Storage verwendet zu werden. Können Sie Ihre Frage bearbeiten, um zu klären, welche Probleme bei der Verwendung aufgetreten sind? – jkff
Hmm ... Ich könnte es verpasst haben, aber ich konnte keinen Verweis auf GCS in den Quellen sehen? Unabhängig davon, das Problem, das ich habe, ist die Datei wird nie geladen, ohne dass ein Fehler ausgelöst wird. Ich bin mir nicht sicher, wie das zu debuggen ist.Ich habe auch kein Beispiel mit FileBasedReader gefunden, um das Gleiche zu tun. Ich werde versuchen, die Frage zu aktualisieren, um dies im Lichte Ihres Kommentars zu reflektieren @ jkff – nembleton
Yup, bitte aktualisieren Sie die Frage mit mehr Details - nicht viel kann ich ohne das tun. Tatsächlich sind fast alle Quellen, einschließlich TextIO, unter Verwendung von FileBasedSource/Reader unter der Haube implementiert. – jkff