2017-06-30 4 views
2

Ich habe eine Sammlung von homogenen Dicts, wie schreibe ich sie in BigQuery, ohne das Schema zu kennen?Programmgesteuertes Erstellen von BigQuery-Schema in Beam-Pipeline

Der BigQuerySink erfordert, dass ich das Schema angeben, wenn ich es konstruiere. Aber ich kenne das Schema nicht: es ist durch die Schlüssel der Diktate definiert, die ich zu schreiben versuche.

Gibt es eine Möglichkeit, meine Pipeline auf das Schema zu schließen und es dann (als Side-Input?) An die Senke zurückzugeben?

Zum Beispiel:

# Create a PCollection of dicts, something like 
# {'field1': 'myval', 'field2': 10} 
data = (p | 'generate_data' >> beam.ParDo(CreateData()) 

# Infer the schema from the data 
# Generates a string for each element (ok to assume all dict keys equal) 
# "field1:STRING, field2:INTEGER" 
schema = (data 
    | 'infer_schema' >> beam.ParDo(InferSchema()) 
    | 'sample_one' >> beam.combiners.Sample.FixedSizeGlobally(1)) 

Aber dann, wie füttere ich das Schema als Parameter an die BigQuerySink, und verwenden, die in einem beam.io.Write?

Ich weiß, dass dies nicht richtig ist, aber was ich will zu tun ist:

sink = BigQuerySink(tablename, dataset, project, schema=Materialize(schema)) 
p | 'write_bigquery' >> beam.io.Write(sink) 

tl; dr Gibt es eine Möglichkeit, eine BigQuery Tabelle von Apache Strahl programmatisch das Schema von Folgern zu erstellen und schreiben die Daten?

+1

Die BigQuery-API verfügt über eine automatische Erkennungsschema-Funktion. Wenn dies in Beam nicht unterstützt wird, sollten Sie ein Problem mit dem Beam SDK einreichen. –

+0

Docs für automatische Schemaerkennung sind unter https://cloud.google.com/bigquery/docs/schema-detect –

+0

Danke Tim. Unf, derzeit lehnt die API eine Senke ohne Schema ab, wenn die Tabelle noch nicht existiert. Ich werde sehen, ob ich herausfinden kann, wo ich eine Feature-Anfrage an Beam senden soll. – Greg

Antwort

0

Angenommen, Ihr Schema kann sich häufig ändern, kann es für Sie besser sein, die Daten in einer allgemeineren Form zu halten.

Zum Beispiel kann Ihre Zeile aus einem einzelnen wiederholten Datensatz bestehen (Ihre Wörterbucheinträge). Das Datensatzschema sieht wie folgt aus: Schlüssel (STRING) | optional string_val (STRING) | optional int_val (INTEGER) optional double_val (DOUBLE) | optional boolean_val (BOOLEAN) | ...

Dann können Sie Abfragen schreiben, die Ihre Datensätze nach Typ scannen. Dies ist etwas weniger effizient (weil Sie Zeilen scannen müssen, die Sie andernfalls überspringen könnten, wenn sie sich in verschiedenen Spalten befinden würden), aber Sie vermeiden vollständig, Ihr Schema im Voraus anzugeben.

0

Die beste Lösung, die ich mir vorgenommen habe, ist das explizite Hardcoding einer Zuordnung von dict Schlüsseln zum BigQuery-Schema. Zwei Vorteile - es funktioniert um das Muss-Spezifiziertes-Schema-Problem herum und erlaubt mir, Elemente aus dem Diktat herauszufiltern, die ich nicht in BigQuery haben möchte.

SCHEMA = { 
    'field1': 'INTEGER', 
    'field2': 'STRING', 
    ... 
} 
schema_str = ','.join(['%s:%s' % (k, v) for k,v in SCHEMA.iteritems()]) 

sink = BigQuerySink(tablename, 
     dataset=dataset, 
     project=project, 
     schema=schema_str, 
     write_disposition=BigQueryDisposition.WRITE_TRUNCATE) 

(pipeline 
    # filters just the keys of each dict to the keys of SCHEMA. 
    | 'filter_fields' >> beam.ParDo(FilterFieldKeysDoFn(SCHEMA)) 
    | 'to_bigquery' >> beam.io.Write(sink)) 
Verwandte Themen