0

Ich benutze Python (2.7) und arbeite in der DataFlow-Umgebung von Google, unnötig zu sagen, Google hat noch nicht alles ausgelöscht, und die Dokumentation reicht noch nicht ganz aus. Der Abschnitt für das Schreiben von Dataflow nach BigQuery ist hier jedoch dokumentiert BigQuery Sink.Warum werden in meiner Python BigQuery Dataflow-Senke keine Datensätze in die Datenbank eingefügt?

Gemäß der Dokumentation, um das Schema zu spezifizieren, müssen Sie Eingabe in einen String:

schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING' 

Der Tabellenname, Projekt-ID und Daten-Set ID ist wie folgt: 'example_project_id: example_dataset_id.example_table_name'

Jetzt funktioniert das alles. Siehe den Code unten, aber von dem, was ich sehen kann, ist es erfolgreich, die Tabelle und die Felder zu erstellen. Hinweis: Die Projekt-ID wird als Teil der Argumente für die Funktion festgelegt.

bq_data | beam.io.Write(
    "Write to BQ", beam.io.BigQuerySink(
     'example_dataset_id.{}'.format(bq_table_name), 
     schema=schema, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED 
    ) 
) 

Jetzt sieht es aus wie ich durch die Verwendung Dinge Einsetzen dieses nicht erhalten:

bq_data = pipeline | beam.Create(
    [{ 
     'field_1': 'ExampleIdentifier', 
     'field_2': 'ExampleValue', 
     'field_3': 'ExampleFieldValue', 
     'created_at': '2016-12-26T05:50:39Z', 
     'updated_at': '2016-12-26T05:50:39Z', 
     'field_4': 'ExampleDataIdentifier', 
     'field_5: 'ExampleData' 
    }] 
) 

Aber aus irgendeinem Grund, wenn die Werte in eine PCollection Verpackung, heißt es, dass es in BigQuery einfügt, aber wenn Ich frage die Tabelle ab, es zeigt nichts.

Warum fügt es nicht ein? Ich sehe keine Fehler, aber nichts fügt BigQuery hinzu.

Dies ist, was die Daten sehen, dass in der PCollection enthalten ist, habe ich die Nähe von 1.100 Zeilen einzufügen:

{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'} 

Hinweis: I in das Datumsformat überprüft und die Datumsformatierung oben ist erlaubt für die BigQuery-Einfügung.

+0

Wir untersuchen dies. Verwenden Sie DirectPipelineRunner (der Standard)? Können Sie auch weitere Details zu Ihrer Pipeline angeben und feststellen, dass Sie verifiziert haben, dass Daten in BigQuery nicht verfügbar sind? Stellen Sie sicher, dass Sie pipeline.run() am Ende aufrufen, um Ihre Pipeline auszuführen. – chamikara

+0

Hier ist eine Kopie des Befehls, der zum Ausführen des Datenflusses mit Beispielinformationen verwendet wird: python -m dataflow_sample - runner DirectPipelineRunner --setup_file ./setup.py --jobname beispieldatenfluss-run-1 --server dev - -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict

+0

Soweit ich die Daten verifiziert habe, ist das nicht in BigQuery, Ich führe eine Abfrage gegen die Tabelle durch, in der die Informationen enthalten sein sollen. Und ja, ich führe pipeline.run() am Ende der Funktion dataflow run() aus. – Jravict

Antwort

0

Ich habe ein Beispiel mit Ihrem genauen Schema und Eingabe versucht und es hat für mich funktioniert. Ich musste folgende Fixes machen.

(1) Scheint so, als ob Sie in Ihren Argumenten kein Projekt angeben. Möglicherweise geben Sie dies in Ihrer Pipelinedefinition an, da für Sie kein Fehler angezeigt wird. (2) Es gibt einen Tippfehler in dem Code, den Sie oben erwähnt haben. 'field_5: 'ExampleData' sollte 'field_5': 'ExampleData' sein Aber ich nehme an, dies ist nur ein Tippfehler in dieser Frage nicht in Ihrer ursprünglichen Pipeline, da Sie keinen Fehler dafür erhalten.

Verwenden Sie die neueste Version von Dataflow? Sie können versuchen, eine neue virtuelle Umgebung zu erstellen und "pip install google-cloud-dataflow" auszuführen, um die neueste Version zu installieren.

Ist es möglich, Ihre volle Piplein für mich zu testen?

Es ist schwierig, dies aus der Ferne zu debuggen, da Sie "DirectPipelineRunner" verwenden. Ist es möglich, die gleiche Pipeline mit 'DataflowPipelineRunner' auszuführen (beachten Sie, dass Sie ein GCP-Projekt mit aktivierter Abrechnung benötigen)? Ich kann Protokolle anzeigen, wenn Sie dies mit 'DataflowPipelineRunner' ausführen und eine Job-ID angeben können.

+0

Projekt ist in den Argumenten: --server, erforderlich = Falsch - Region --Ausgang, erforderlich = Falsch ** - Projekt **, erforderlich = False --bucket, erforderlich = False --job_name, erforderlich = False --staging_location, erforderlich = False - temp_location, erforderlich = False --runner, required = False --setup_file, required = False - Geräte, nargs = "*", required = False --start_date, required = Wahr --end_date, required = True – Jravict

+0

Ehrlich gesagt nehme ich an, dass etwas passiert ist auf der Seite von Google, weil ich mit den Dingen spielte, und nachdem ich Ihre Antwort gesehen hatte, kehrte ich zurück zu dem, was ich hatte und es funktionierte perfekt. Ich weiß nicht, was das Problem war, aber es sieht so aus, als würde es jetzt einfügen. – Jravict

Verwandte Themen