2016-05-15 9 views
1

Ich habe viele große unpartitionierte BigQuery-Tabellen und -Dateien, die ich auf verschiedene Arten partitionieren möchte. Also habe ich beschlossen, einen Dataflow-Job zu schreiben, um dies zu erreichen. Der Job, denke ich, ist einfach genug. Ich habe versucht, mit Generika zu schreiben, so dass ich es leicht sowohl TextIO- als auch BigQueryIO-Quellen anwenden kann. Es funktioniert gut mit kleinen Tabellen, aber ich bekomme java.lang.OutOfMemoryError: Java heap space, wenn ich es auf großen Tabellen ausführen.Google Dataflow Out Heap beim Erstellen mehrerer getaggter Ausgaben

In meiner Hauptklasse lese ich entweder eine Datei mit Zielschlüsseln (die mit einem anderen DF-Job erstellt wurden) oder führe eine Abfrage mit einer BigQuery-Tabelle durch, um eine Liste der Schlüssel zu erhalten. Meine Hauptklasse sieht wie folgt aus:

Pipeline sharder = Pipeline.create(opts); 

// a functional interface that shows the tag map how to get a tuple tag 
KeySelector<String, TableRow> bqSelector = (TableRow row) -> (String) row.get("COLUMN") != null ? (String) row.get("COLUMN") : "null"; 

// a utility class to store a tuple tag list and hash map of String TupleTag 
TupleTagMap<String, TableRow> bqTags = new TupleTagMap<>(new ArrayList<>(inputKeys),bqSelector); 

// custom transorm 
ShardedTransform<String, TableRow> bqShard = new ShardedTransform<String, TableRow>(bqTags, TableRowJsonCoder.of()); 

String source = "PROJECTID:ADATASET.A_BIG_TABLE"; 
String destBase = "projectid:dataset.a_big_table_sharded_"; 

TableSchema schema = bq.tables().get("PROJECTID","ADATASET","A_BIG_TABLE").execute().getSchema(); 


PCollectionList<TableRow> shards = sharder.apply(BigQueryIO.Read.from(source)).apply(bqShard); 
for (PCollection<TableRow> shard : shards.getAll()) { 
    String shardName = StringUtils.isNotEmpty(shard.getName()) ? shard.getName() : "NULL"; 
    shard.apply(BigQueryIO.Write.to(destBase + shardName) 
      .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE) 
      .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) 
      .withSchema(schema)); 
    System.out.println(destBase+shardName); 
} 
sharder.run(); 

ich eine Reihe von TupleTags erzeugen Transformation in einem benutzerdefinierten zu verwenden. Ich habe eine Utility-Klasse, die ein TupleTagList und HashMap so speichert, dass ich die Tupel-Tags durch Schlüssel verweisen kann:

public class TupleTagMap<Key, Type> implements Serializable { 

private static final long serialVersionUID = -8762959703864266959L; 
final private TupleTagList tList; 
final private Map<Key, TupleTag<Type>> map; 
final private KeySelector<Key, Type> selector; 

public TupleTagMap(List<Key> t, KeySelector<Key, Type> selector) { 
    map = new HashMap<>(); 
    for (Key key : t) 
     map.put(key, new TupleTag<Type>()); 
    this.tList = TupleTagList.of(new ArrayList<>(map.values())); 
    this.selector = selector; 

} 

public Map<Key, TupleTag<Type>> getMap() { 
    return map; 
} 

public TupleTagList getTagList() { 
    return tList; 
} 

public TupleTag<Type> getTag(Type t){ 
    return map.get(selector.getKey(t)); 
} 

Dann habe ich die benutzerdefinierte Transformation, die hat im Grunde eine Funktion, die die Tupel Karte zur Ausgabe verwendet PCollectionTuple und dann geht es weiter zu einem PCollectionList zu der Hauptklasse zurückzukehren:

public class ShardedTransform<Key, Type> extends 
    PTransform<PCollection<Type>, PCollectionList<Type>> { 


private static final long serialVersionUID = 3320626732803297323L; 
private final TupleTagMap<Key, Type> tags; 
private final Coder<Type> coder; 


public ShardedTransform(TupleTagMap<Key, Type> tags, Coder<Type> coder) { 
    this.tags = tags; 
    this.coder = coder; 
} 

@Override 
public PCollectionList<Type> apply(PCollection<Type> in) { 

    PCollectionTuple shards = in.apply(ParDo.of(
      new ShardFn<Key, Type>(tags)).withOutputTags(
      new TupleTag<Type>(), tags.getTagList())); 

    List<PCollection<Type>> shardList = new ArrayList<>(tags.getMap().size()); 

    for (Entry<Key, TupleTag<Type>> e : tags.getMap().entrySet()){ 
     PCollection<Type> shard = shards.get(e.getValue()).setName(e.getKey().toString()).setCoder(coder); 
     shardList.add(shard); 
    } 
     return PCollectionList.of(shardList); 
    } 
} 

die eigentliche DoFn ist tot einfach es nur das Lambda in der Hauptklasse vorgesehen verwendet den passende Tupel-Tag in der Hash-Karte für Seiten Ausgang fände:

public class ShardFn<Key, Type> extends DoFn<Type, Type> { 

private static final long serialVersionUID = 961325260858465105L; 

private final TupleTagMap<Key, Type> tags; 

ShardFn(TupleTagMap<Key, Type> tags) { 

    this.tags = tags; 
} 

@Override 
public void processElement(DoFn<Type, Type>.ProcessContext c) 
     throws Exception { 
    Type element = c.element(); 
    TupleTag<Type> tag = tags.getTag(element); 

    if (tag != null) 
     c.sideOutput(tags.getTag(element), element); 
    } 
} 
+0

Was ist Ihr Worker-Maschinentyp, in welchem ​​Stadium ist es OOMing? Wie groß 'bqTags' auch sein können, sollten Sie vielleicht Side-Input verwenden, um' bqTags' zu verteilen (doc: https://cloud.google.com/dataflow/model/par-do#side-inputs) – ravwojdyla

+0

Ich habe es mit n1 versucht -Standard-4. Es scheint, als würden die OOMs in ShadedTransform oder ShardFn passieren. Soweit ich das beurteilen kann, wird nie etwas an die PCollections gesendet, die von dort ausgegeben werden. Es gibt 80 Tags in der Tag-Map für den großen Datensatz. – klaatu

Antwort

1

Das Beam-Modell bietet derzeit keine gute Unterstützung für die dynamische Partitionierung/große Anzahl von Partitionen. Ihr Ansatz wählt die Anzahl der Shards bei der Erstellung des Diagramms aus, und die resultierenden ParDos werden wahrscheinlich alle miteinander verschmolzen, sodass jeder Worker versucht, gleichzeitig in 80 verschiedene BQ-Tabellen zu schreiben. Jeder Schreibvorgang erfordert eine lokale Pufferung, daher ist es wahrscheinlich einfach zu viel.

Es gibt einen alternativen Ansatz, der die Parallelisierung zwischen Tabellen durchführt (aber nicht über Elemente hinweg). Dies würde gut funktionieren, wenn Sie eine große Anzahl relativ kleiner Ausgabetabellen haben. Verwenden Sie ein ParDo, um jedes Element mit der Tabelle zu kennzeichnen, zu der es gehen soll, und führen Sie dann einen GroupByKey aus. Dies gibt Ihnen eine PCollection<KV<Table, Iterable<ElementsForThatTable>>>. Bearbeiten Sie dann jede KV<Table, Iterable<ElementsForThatTable>>, indem Sie die Elemente in die Tabelle schreiben.

Leider müssen Sie jetzt den BQ manuell schreiben, um diese Option zu verwenden. Wir versuchen, die Sink-APIs mit eingebauter Unterstützung dafür zu erweitern. Und da das Dataflow-SDK als Teil von Apache Beam weiterentwickelt wird, verfolgen wir diese Anfrage hier: https://issues.apache.org/jira/browse/BEAM-92

+0

Danke Frances. Ich habe einige SO-Posts gesehen, die eine Parallelisierung zwischen Tabellen vorschlagen. In meinem Fall habe ich eine kleinere Anzahl von großen Ausgabetabellen. TBs von Daten, die in 80 Tabellen aufgeteilt werden müssen. – klaatu

Verwandte Themen