Ich habe viele große unpartitionierte BigQuery-Tabellen und -Dateien, die ich auf verschiedene Arten partitionieren möchte. Also habe ich beschlossen, einen Dataflow-Job zu schreiben, um dies zu erreichen. Der Job, denke ich, ist einfach genug. Ich habe versucht, mit Generika zu schreiben, so dass ich es leicht sowohl TextIO- als auch BigQueryIO-Quellen anwenden kann. Es funktioniert gut mit kleinen Tabellen, aber ich bekomme java.lang.OutOfMemoryError: Java heap space
, wenn ich es auf großen Tabellen ausführen.Google Dataflow Out Heap beim Erstellen mehrerer getaggter Ausgaben
In meiner Hauptklasse lese ich entweder eine Datei mit Zielschlüsseln (die mit einem anderen DF-Job erstellt wurden) oder führe eine Abfrage mit einer BigQuery-Tabelle durch, um eine Liste der Schlüssel zu erhalten. Meine Hauptklasse sieht wie folgt aus:
Pipeline sharder = Pipeline.create(opts);
// a functional interface that shows the tag map how to get a tuple tag
KeySelector<String, TableRow> bqSelector = (TableRow row) -> (String) row.get("COLUMN") != null ? (String) row.get("COLUMN") : "null";
// a utility class to store a tuple tag list and hash map of String TupleTag
TupleTagMap<String, TableRow> bqTags = new TupleTagMap<>(new ArrayList<>(inputKeys),bqSelector);
// custom transorm
ShardedTransform<String, TableRow> bqShard = new ShardedTransform<String, TableRow>(bqTags, TableRowJsonCoder.of());
String source = "PROJECTID:ADATASET.A_BIG_TABLE";
String destBase = "projectid:dataset.a_big_table_sharded_";
TableSchema schema = bq.tables().get("PROJECTID","ADATASET","A_BIG_TABLE").execute().getSchema();
PCollectionList<TableRow> shards = sharder.apply(BigQueryIO.Read.from(source)).apply(bqShard);
for (PCollection<TableRow> shard : shards.getAll()) {
String shardName = StringUtils.isNotEmpty(shard.getName()) ? shard.getName() : "NULL";
shard.apply(BigQueryIO.Write.to(destBase + shardName)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withSchema(schema));
System.out.println(destBase+shardName);
}
sharder.run();
ich eine Reihe von TupleTags
erzeugen Transformation in einem benutzerdefinierten zu verwenden. Ich habe eine Utility-Klasse, die ein TupleTagList
und HashMap
so speichert, dass ich die Tupel-Tags durch Schlüssel verweisen kann:
public class TupleTagMap<Key, Type> implements Serializable {
private static final long serialVersionUID = -8762959703864266959L;
final private TupleTagList tList;
final private Map<Key, TupleTag<Type>> map;
final private KeySelector<Key, Type> selector;
public TupleTagMap(List<Key> t, KeySelector<Key, Type> selector) {
map = new HashMap<>();
for (Key key : t)
map.put(key, new TupleTag<Type>());
this.tList = TupleTagList.of(new ArrayList<>(map.values()));
this.selector = selector;
}
public Map<Key, TupleTag<Type>> getMap() {
return map;
}
public TupleTagList getTagList() {
return tList;
}
public TupleTag<Type> getTag(Type t){
return map.get(selector.getKey(t));
}
Dann habe ich die benutzerdefinierte Transformation, die hat im Grunde eine Funktion, die die Tupel Karte zur Ausgabe verwendet PCollectionTuple
und dann geht es weiter zu einem PCollectionList
zu der Hauptklasse zurückzukehren:
public class ShardedTransform<Key, Type> extends
PTransform<PCollection<Type>, PCollectionList<Type>> {
private static final long serialVersionUID = 3320626732803297323L;
private final TupleTagMap<Key, Type> tags;
private final Coder<Type> coder;
public ShardedTransform(TupleTagMap<Key, Type> tags, Coder<Type> coder) {
this.tags = tags;
this.coder = coder;
}
@Override
public PCollectionList<Type> apply(PCollection<Type> in) {
PCollectionTuple shards = in.apply(ParDo.of(
new ShardFn<Key, Type>(tags)).withOutputTags(
new TupleTag<Type>(), tags.getTagList()));
List<PCollection<Type>> shardList = new ArrayList<>(tags.getMap().size());
for (Entry<Key, TupleTag<Type>> e : tags.getMap().entrySet()){
PCollection<Type> shard = shards.get(e.getValue()).setName(e.getKey().toString()).setCoder(coder);
shardList.add(shard);
}
return PCollectionList.of(shardList);
}
}
die eigentliche DoFn ist tot einfach es nur das Lambda in der Hauptklasse vorgesehen verwendet den passende Tupel-Tag in der Hash-Karte für Seiten Ausgang fände:
public class ShardFn<Key, Type> extends DoFn<Type, Type> {
private static final long serialVersionUID = 961325260858465105L;
private final TupleTagMap<Key, Type> tags;
ShardFn(TupleTagMap<Key, Type> tags) {
this.tags = tags;
}
@Override
public void processElement(DoFn<Type, Type>.ProcessContext c)
throws Exception {
Type element = c.element();
TupleTag<Type> tag = tags.getTag(element);
if (tag != null)
c.sideOutput(tags.getTag(element), element);
}
}
Was ist Ihr Worker-Maschinentyp, in welchem Stadium ist es OOMing? Wie groß 'bqTags' auch sein können, sollten Sie vielleicht Side-Input verwenden, um' bqTags' zu verteilen (doc: https://cloud.google.com/dataflow/model/par-do#side-inputs) – ravwojdyla
Ich habe es mit n1 versucht -Standard-4. Es scheint, als würden die OOMs in ShadedTransform oder ShardFn passieren. Soweit ich das beurteilen kann, wird nie etwas an die PCollections gesendet, die von dort ausgegeben werden. Es gibt 80 Tags in der Tag-Map für den großen Datensatz. – klaatu