Ich benutze Python (2.7) und arbeite in der DataFlow-Umgebung von Google, unnötig zu sagen, Google hat noch nicht alles ausgelöscht, und die Dokumentation reicht noch nicht ganz aus. Der Abschnitt für das Schreiben von Dataflow nach BigQuery ist hier jedoch dokumentiert BigQuery Sink.Warum werden in meiner Python BigQuery Dataflow-Senke keine Datensätze in die Datenbank eingefügt?
Gemäß der Dokumentation, um das Schema zu spezifizieren, müssen Sie Eingabe in einen String:
schema = 'field_1:STRING, field_2:STRING, field_3:STRING, created_at:TIMESTAMP, updated_at:TIMESTAMP, field_4:STRING, field_5:STRING'
Der Tabellenname, Projekt-ID und Daten-Set ID ist wie folgt: 'example_project_id: example_dataset_id.example_table_name'
Jetzt funktioniert das alles. Siehe den Code unten, aber von dem, was ich sehen kann, ist es erfolgreich, die Tabelle und die Felder zu erstellen. Hinweis: Die Projekt-ID wird als Teil der Argumente für die Funktion festgelegt.
bq_data | beam.io.Write(
"Write to BQ", beam.io.BigQuerySink(
'example_dataset_id.{}'.format(bq_table_name),
schema=schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
Jetzt sieht es aus wie ich durch die Verwendung Dinge Einsetzen dieses nicht erhalten:
bq_data = pipeline | beam.Create(
[{
'field_1': 'ExampleIdentifier',
'field_2': 'ExampleValue',
'field_3': 'ExampleFieldValue',
'created_at': '2016-12-26T05:50:39Z',
'updated_at': '2016-12-26T05:50:39Z',
'field_4': 'ExampleDataIdentifier',
'field_5: 'ExampleData'
}]
)
Aber aus irgendeinem Grund, wenn die Werte in eine PCollection Verpackung, heißt es, dass es in BigQuery einfügt, aber wenn Ich frage die Tabelle ab, es zeigt nichts.
Warum fügt es nicht ein? Ich sehe keine Fehler, aber nichts fügt BigQuery hinzu.
Dies ist, was die Daten sehen, dass in der PCollection enthalten ist, habe ich die Nähe von 1.100 Zeilen einzufügen:
{'field_1': 'ExampleIdentifier', 'field_2': 'ExampleValue', 'field_3': 'ExampleFieldValue', 'created_at': '2016-12-29 12:10:32', 'updated_at': '2016-12-29 12:10:32', 'field_4': 'ExampleDataIdentifier', 'field_5': 'ExampleData'}
Hinweis: I in das Datumsformat überprüft und die Datumsformatierung oben ist erlaubt für die BigQuery-Einfügung.
Wir untersuchen dies. Verwenden Sie DirectPipelineRunner (der Standard)? Können Sie auch weitere Details zu Ihrer Pipeline angeben und feststellen, dass Sie verifiziert haben, dass Daten in BigQuery nicht verfügbar sind? Stellen Sie sicher, dass Sie pipeline.run() am Ende aufrufen, um Ihre Pipeline auszuführen. – chamikara
Hier ist eine Kopie des Befehls, der zum Ausführen des Datenflusses mit Beispielinformationen verwendet wird: python -m dataflow_sample - runner DirectPipelineRunner --setup_file ./setup.py --jobname beispieldatenfluss-run-1 --server dev - -worker_machine_type g1-small --num_workers 10 --start_date '2016-12-01' --end_date '2016-12-30' --devices device_id_1 device_id_2 device_id_3 – Jravict
Soweit ich die Daten verifiziert habe, ist das nicht in BigQuery, Ich führe eine Abfrage gegen die Tabelle durch, in der die Informationen enthalten sein sollen. Und ja, ich führe pipeline.run() am Ende der Funktion dataflow run() aus. – Jravict