2017-10-13 2 views
0

BigQuery erlauben Partitionierung, nur nach Datum, zu diesem Zeitpunkt.Partitionieren einer Tabelle

Lässt anzeigen Ich habe eine 1 Milliarde Tabellenzeilen mit inserted_timestamp Feld. Dieses Feld hat Daten von vor 1 Jahr.

Was ist der richtige Weg, um vorhandene Daten in eine neue partitionierte Tabelle zu verschieben?

Edited

Ich sah es eine elegante Lösung auf Java mit Version 2.0 < Sharding BigQuery output tables auch bei BigQuery partitioning with Beam streams erarbeitet wurde, die Tabellennamen (oder Partition Suffix) Windowing-Daten parametrisieren ist.

Aber ich vermisse BigQueryIO.Write auf 2.x Strahl Projekt auch gibt es keine Beispiele über Fensterzeit von Python serialisierbare Funktion erhalten.

Ich habe versucht, Partitionen auf Rohr zu machen, aber wenn mit einer großen Anzahl von Partitionen fehlschlägt (läuft mit 100 aber mit 1000 fehlschlägt).

Dies ist mein Code so weit wie ich konnte:

   ( p 
       | 'lectura' >> beam.io.ReadFromText(input_table) 
       | 'noheaders' >> beam.Filter(lambda s: s[0].isdigit()) 
       | 'addtimestamp' >> beam.ParDo(AddTimestampDoFn()) 
       | 'window' >> beam.WindowInto(beam.window.FixedWindows(60)) 
       | 'table2row' >> beam.Map(to_table_row) 
       | 'write2table' >> beam.io.Write(beam.io.BigQuerySink(
         output_table, #<-- unable to parametrize by window 
         dataset=my_dataset, 
         project=project, 
         schema='dia:DATE, classe:STRING, cp:STRING, import:FLOAT', 
         create_disposition=CREATE_IF_NEEDED, 
         write_disposition=WRITE_TRUNCATE, 
            ) 
           ) 
       ) 

p.run() 
+2

https://stackoverflow.com/questions/38993877/migrating-from-non-partitioned-to-partitioned-tables sollte ein paar Ansätze, die relevant sind. Ich denke auch, dass Sie in der Lage sein sollten, JSON oder AVRO anstelle von CSV zu verwenden, um die Arbeit mit flachen Dateien zu vermeiden. –

+0

@NhanNguyen, gerade meine Frage bearbeitet, um genauer zu sein. Existiert eine elegante Lösung auf <2.0 und ich vermisse es> 2.x. Danke über deinen Link, ich folgte ihm und war sehr verbundenes Thema. Danke noch einmal. – danihp

Antwort

2

der Funktionalität sind alle notwendigen dies in Breite zu tun gibt, obwohl es zur Zeit auf das Java SDK beschränkt werden kann.

Sie würden BigQueryIO verwenden. Insbesondere können Sie DynamicDestinations verwenden, um eine Zieltabelle für jede Zeile zu bestimmen.

Aus dem Beispiel von DynamicDestinations:

events.apply(BigQueryIO.<UserEvent>write() 
    .to(new DynamicDestinations<UserEvent, String>() { 
     public String getDestination(ValueInSingleWindow<String> element) { 
      return element.getValue().getUserId(); 
     } 
     public TableDestination getTable(String user) { 
      return new TableDestination(tableForUser(user), 
      "Table for user " + user); 
     } 
     public TableSchema getSchema(String user) { 
      return tableSchemaForUser(user); 
     } 
     }) 
    .withFormatFunction(new SerializableFunction<UserEvent, TableRow>() { 
    public TableRow apply(UserEvent event) { 
     return convertUserEventToTableRow(event); 
    } 
    })); 
+0

Warum sind sie keine Python-Wrapper, um es zu tun? Ich sollte mir Dataflow-Projekte mit Java statt Python leisten können? Wissen Sie, ob Google Ressourcen für Java bereitstellt? Ich meine, wenn ich in Python arbeite, werde ich mehr Funktionen als diese verpassen? Vielen Dank! – danihp

+0

Wie gezeigt wird, gibt es unterschiedliche Funktionen zwischen den Java und Python SDKs. Die Beseitigung dieser Lücken ist Teil der laufenden Bemühungen von Apache Beam. Dieses spezielle Problem wird als [BEAM-2801] (https://issues.apache.org/jira/browse/BEAM-2801) verfolgt. –

Verwandte Themen