2017-10-10 1 views
0

Das folgende ist mein Code, die CSV mit 1000 Zeilen Datei liest und schreibt sie in BQ Tabelle:Ziehen hinunter BigQuery Tabellenschema auf jedem Element langsam wird mit Balken/Datenfluß-

public static void main(String[] args) { 
     PipelineOptionsFactory.register(TemplateOptions.class); 
     TemplateOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TemplateOptions.class); 
     options.setZone("europe-west1-c"); 
     options.setProject("myProject-dev"); 
     options.setRunner(DataflowRunner.class); 
     Pipeline pipeline = Pipeline.create(options); 
     pipeline.apply("READ", TextIO.read().from("gs://myBucket/LOG_EBM_1000.csv")) 
       .apply("TRANSFORM", ParDo.of(new WikiParDo())) 
       .apply("WRITE", BigQueryIO.writeTableRows() 
           .to("myProject:DF_TEST.LOG_EBM_PM") 
           .withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WRITE_APPEND) 
           ); 
     pipeline.run().waitUntilFinish(); 
    } 

    private static Schema getTableSchema() { 
     List<TableFieldSchema> fields = new ArrayList<>(); 
     // The name for the new dataset and Table 
     String datasetId = "DF_TEST"; 
     String tableId = "LOG_EBM_PM"; 
     BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService(); 
     return bigQuery.getTable(datasetId, tableId).getDefinition().getSchema(); 
    } 

    public interface TemplateOptions extends DataflowPipelineOptions { 
     @Description("GCS path of the file to read from") 
     ValueProvider<String> getInputFile(); 
     void setInputFile(ValueProvider<String> value); 

     @Description("My custom command line argument.") 
     @Default.String("D-FAULT") 
     String getMyCustomOption(); 
     void setMyCustomOption(String myCustomOption); 
    } 

    private static class WikiParDo extends DoFn<String, TableRow> { 
     @ProcessElement 
     public void processElement(ProcessContext c) throws Exception { 
      String[] split = c.element().split(","); 
      TableRow row = new TableRow(); 
      for (int i = 0; i < split.length; i++) { 
       Field col = getTableSchema().getFields().get(i); 
       row.set(col.getName(), split[i]); 
      } 
      c.output(row); 
     } 
    } 

das folgende ist das screanshot der pipline, wenn es die Aufgabe ausführt: enter image description here

Wie Sie sehen, die Trans Aufgabe hat den wall Zeit von 01.45 minutes.In die Definition der Wand Zeit geschrieben:

Ungefähre Zeit in diesem Schritt beim Initialisieren, Verarbeiten von Daten, Mischen von Daten und Beenden über alle Threads in allen Worker. Bei zusammengesetzten Schritten die Summe der in den Komponentenschritten aufgewendeten Zeit. Diese Schätzung hilft Ihnen, langsame Schritte zu erkennen.

Die Ausführung der gesamten pipline dauert ca. 10 Minuten insgesamt für 1000 Zeilen mit dem Maschinentyp: n1-highcpu-16.

In meinen endgültigen CSV-Dateien werden wir Millionen von Datensätzen haben (Dateigröße etwa 2 GB), daher sollte die Pipline viel schneller funktionieren. Was stimmt nicht mit meiner Pipline, die so langsam ist, auch wenn ich eine hohe CPU-Maschine benutze?

Antwort

1

Ihr Code ruft für jedes Feld jeder Zeile getTableSchema() (die einen API-Aufruf an den BigQuery-Dienst absetzt) ​​auf - viele Tausende identische (und damit redundante) BigQuery-API-Aufrufe. Es sieht aus, als ob Sie auch eine Verbindung zu BigQuery (getService()) viele Tausende Male herstellen. Deshalb ist es langsam.

Sie können es schrittweise beschleunigen auf verschiedene Weise auf:

  • nur einmal den Anruf Sie pro processElement(): diese es beschleunigt wird um einen Faktor (Anzahl der Felder) auf.
  • Führen Sie den Aufruf nur einmal pro DOFn-Instanz durch, indem Sie die Methode @Setup des DoFn ausführen und das Ergebnis in eine Instanzvariable zwischenspeichern: Dadurch wird es um ein weiteres Hundert beschleunigt.
  • Sie nur einmal den Anruf per Pipeline, indem sie es in Ihrem Hauptprogramm und das Bestehen der Liste der Felder als Konstruktor Argument zu Ihrem DoFn tun:

Etwas wie folgt aus:

class WikiParDo ... { 
    private final List<String> fields; 
    WikiParDo(List<String> fields) { this.fields = fields; } 
    // in processElement method, use "fields" instead of calling 
    // "getTableSchema" 
} 

... main program: ... 
TableSchema schema = getTableSchema(); 
List<String> fields = new ArrayList<>(); 
... populate fields from schema ... 

p.apply(...) 
.apply(..., new WikiParDo(fields)) 
+0

Danke jkff für deine tolle Antwort :) – Majico

Verwandte Themen