5

Ich versuche, einige Daten von Google PubSub in BigQuery mit einem Python-Datenfluss zu streamen. So indemStreaming von Pub/Sub zu BigQuery

options.view_as(StandardOptions).streaming = True 

angepasst haben, dann änderte ich die record_ids Pipeline von Pub zu lesen/Sub

# ADDED THIS 
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15)) 
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) 
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode)) 
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) 
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
     OUTPUT, 
     schema=table_schema, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

Hinweis Zu Testzwecken ich den folgenden Code https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py in eine Streaming-Pipeline: Ich habe die weiße Liste gesetzt worden von google den Code (in alpha)

nun laufen, wenn ich es versuchen, ich habe einen Fehler

Workfl Es ist fehlgeschlagen. Ursachen: (f215df7c8fcdbb00): Unbekannt Streaming Spüle: BigQuery

Sie können den vollständigen Code finden Sie hier: https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

Ich denke, dass dies mit der Pipeline jetzt vom Typ Streaming ist, kann mir zu tun hat jemand bitte sagen, wie eine bigQuery schreiben in einer Streaming-Pipeline?

Antwort

2

Beam Python unterstützt das Schreiben von Streaming-Pipelines in BigQuery nicht. Fürs Erste müssen Sie Beam Java verwenden - Sie können PubsubIO.readStrings() und BigQueryIO.writeTableRows() verwenden.

+0

OK, danke Eugene. Ich hatte gehofft, Python zu benutzen. Weißt du, ob sich das in Zukunft ändern wird? Kannst du mir auch ein Beispiel für das Lesen von Code aus Pub/Sub und das Schreiben in BigQuery in Java nennen? –

+0

Ich glaube, dieses Beispiel verwendet beide https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java. Ja, Python wird schließlich Java einholen (möglicherweise über Beam's derzeit in Entwicklung befindliches Portabilitäts-Framework, das Python-Pipelines die Verwendung von Java-Transformationen ermöglicht), aber ich kann nicht vorhersagen, was die Timeline sein wird. – jkff

Verwandte Themen