2016-12-30 4 views
0

Ich habe einige Arbeiten, die wiederholt durchgeführt werden müssen. Nehmen wir zum Beispiel an, ich möchte 2000 Würfel würfeln und das Ergebnis sammeln. Der Vorbehalt ist der Würfelwurf hängt von PCollection Wie kann dies mit Dataflow getan werden?Wie wird die gleiche Arbeit mehrmals mit Dataflow ausgeführt?

Ich habe versucht, eine PCollectionList verwenden, aber das Ergebnis ist, dass mein Dataflow zu groß ist, um zu starten (> 10 MB). Hier ist ein Beispiel dafür, was Ich mag würde zu tun (mit PCollectionList):

// I'd like to operate on things 2000 times. 
PCollection<Thing> things = ...; 
List<PCollection<ModifiedThing>> modifiedThingsList = new ArrayList<>(); 
for (int i = 0; i < 2000; ++i) { 
    modifiedThingsList.add(things.apply(ParDo.of(thing -> modify(thing))); 
} 
PCollection<ModifiedThing> modifiedThings = PCollectionList.of(modifiedThingsList).apply(Flatten.pCollections()); 

Da die JSON Darstellung der obigen Grafik für Datenfluss zu groß ist, ich brauche eine andere Art und Weise, diese Logik zu repräsentieren. Irgendwelche Ideen?

+0

Können Sie näher auf "der Würfelwurf hängt von PCollection"? Dies ist aus Ihrem Code-Snippet nicht ersichtlich. – jkff

+0

Zum Beispiel, sagen wir, 'Dinge' war wirklich eine 'PCollection '. Ein Beispiel für "Würfeln" wäre, jedem Element in der PCollection eine Zufallszahl hinzuzufügen, die den Vorgang 2000 Mal wiederholt. Ich habe versucht, dies über den Methodenaufruf 'modify (thing)' zu veranschaulichen. – Max

+1

Ich bin immer noch verwirrt: ParDo kann eine beliebig große Anzahl von Ausgaben pro Eingabe zurückgeben. Kannst du einfach dinge.apply (ParDo.of (c -> für (i = 0..2000) c.output (ändern (c.element())))) oder missverstehe ich was du versuchst zu tun? – jkff

Antwort

2

ParDo oder FlatMapElements können beliebig viele Ausgänge pro Eingang zurückgeben. Zum Beispiel:

PCollection<ModifiedThing> modifiedThings = things.apply(
    ParDo.of(new DoFn<Thing, ModifiedThing>() { 
    public void processElement(ProcessContext c) { 
    for (int i = 0; i < 2000; ++i) { 
     c.output(modify(c.element())); 
    } 
    } 
})); 

Caveat: Wenn Sie vorhaben, sofort gelten andere ParDo s modifiedThings, be careful with fusion, seit 2000 ist ein ziemlich hoher Fanout-Faktor. Ein gutes Beispiel Code-Snippet zur Verhinderung der Fusion ist here.

Verwandte Themen