2016-10-21 2 views

Antwort

1

Der Ansatz, den ich nahm folgt:

  • Stellen Sie das Fenster für den eingehenden Datensatz
  • Konvertieren das Fenster in den Tabellennamen

    p.apply(PubsubIO.Read 
          .subscription(subscription) 
          .withCoder(TableRowJsonCoder.of()) 
         ) 
         .apply(Window.into(new TablePartitionWindowFn())) 
         .apply(BigQueryIO.Write 
             .to(new DayPartitionFunc(dataset, table)) 
             .withSchema(schema) 
             .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
         ); 
    

das Fenster Einstellung basierend auf Bei den eingehenden Daten kann das Ende Instant ignoriert werden, da der Startwert zum Festlegen der Partition verwendet wird:

public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { 

private IntervalWindow assignWindow(AssignContext context) { 
    TableRow source = (TableRow) context.element(); 
    String dttm_str = (String) source.get("DTTM"); 

    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC(); 

    Instant start_point = Instant.parse(dttm_str,formatter); 
    Instant end_point = start_point.withDurationAdded(1000, 1); 

    return new IntervalWindow(start_point, end_point); 
}; 

Einstellen der Tabellenpartition dynamisch:

public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> { 

String destination = ""; 

public DayPartitionFunc(String dataset, String table) { 
    this.destination = dataset + "." + table+ "$"; 
} 

@Override 
public String apply(BoundedWindow boundedWindow) { 
    // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows. 
    String dayString = DateTimeFormat.forPattern("yyyyMMdd") 
            .withZone(DateTimeZone.UTC) 
            .print(((IntervalWindow) boundedWindow).start()); 
    return destination + dayString; 
}} 

Lassen Sie mich wissen, ob es einen besseren Weg, um das gleiche Ergebnis zu erzielen.

Verwandte Themen