Das folgende ist mein Code, die CSV mit 1000 Zeilen Datei liest und schreibt sie in BQ Tabelle:Ziehen hinunter BigQuery Tabellenschema auf jedem Element langsam wird mit Balken/Datenfluß-
public static void main(String[] args) {
PipelineOptionsFactory.register(TemplateOptions.class);
TemplateOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TemplateOptions.class);
options.setZone("europe-west1-c");
options.setProject("myProject-dev");
options.setRunner(DataflowRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("READ", TextIO.read().from("gs://myBucket/LOG_EBM_1000.csv"))
.apply("TRANSFORM", ParDo.of(new WikiParDo()))
.apply("WRITE", BigQueryIO.writeTableRows()
.to("myProject:DF_TEST.LOG_EBM_PM")
.withCreateDisposition(CreateDisposition.CREATE_NEVER).withWriteDisposition(WRITE_APPEND)
);
pipeline.run().waitUntilFinish();
}
private static Schema getTableSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
// The name for the new dataset and Table
String datasetId = "DF_TEST";
String tableId = "LOG_EBM_PM";
BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();
return bigQuery.getTable(datasetId, tableId).getDefinition().getSchema();
}
public interface TemplateOptions extends DataflowPipelineOptions {
@Description("GCS path of the file to read from")
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
@Description("My custom command line argument.")
@Default.String("D-FAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
private static class WikiParDo extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] split = c.element().split(",");
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
Field col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
c.output(row);
}
}
das folgende ist das screanshot der pipline, wenn es die Aufgabe ausführt:
Wie Sie sehen, die Trans Aufgabe hat den wall Zeit von 01.45 minutes.In die Definition der Wand Zeit geschrieben:
Ungefähre Zeit in diesem Schritt beim Initialisieren, Verarbeiten von Daten, Mischen von Daten und Beenden über alle Threads in allen Worker. Bei zusammengesetzten Schritten die Summe der in den Komponentenschritten aufgewendeten Zeit. Diese Schätzung hilft Ihnen, langsame Schritte zu erkennen.
Die Ausführung der gesamten pipline dauert ca. 10 Minuten insgesamt für 1000 Zeilen mit dem Maschinentyp: n1-highcpu-16.
In meinen endgültigen CSV-Dateien werden wir Millionen von Datensätzen haben (Dateigröße etwa 2 GB), daher sollte die Pipline viel schneller funktionieren. Was stimmt nicht mit meiner Pipline, die so langsam ist, auch wenn ich eine hohe CPU-Maschine benutze?
Danke jkff für deine tolle Antwort :) – Majico