2017-06-08 3 views
0

Ich bin auf der Suche nach einer Möglichkeit, große Google BigTable mit Filter dynamisch basierend auf den Ereignissen zu scannen und machen Bulk Update/Löschen auf eine große Anzahl von Zeilen.BigTable Bulk-Update auf der Grundlage von dynamischen Filter

Im Moment versuche ich, BigTable mit Java-basierten Dataflow (für intensive Serverless Computing Power) zu kombinieren. Ich habe den Punkt erreicht, an dem ich ein "Scan" -Objekt mit einem dynamischen Filter basierend auf den Ereignissen erstellen kann, aber ich kann immer noch keine Möglichkeit finden, Ergebnisse von CloudBigtableIO.read() zu einer nachfolgenden Datenfluss-Pipeline zu streamen.

Alle Hinweise zu schätzen wissen.

Antwort

1

Erweitern Sie Ihr DoFn von AbstractCloudBigtableTableDoFn. Dadurch erhalten Sie Zugriff auf eine getConnection() -Methode. Sie werden so etwas tun:

try(Connection c = getConnection(); 
    Table t = c.getTable(YOUR_TABLE_NAME); 
    ResultScanner resultScanner = t.getScanner(YOUR_SCAN)) { 
    for(Result r : resultScanner) { 
    Mutation m = ... // construct a Put or Delete 
    context.output(m) 
    } 
} 

Ich gehe davon aus, dass Ihre Pipeline mit CloudBigtableIO.read() beginnt, die AbstractCloudBigtableTableDoFn hat als nächstes, und hat dann eine CloudBigtableIO.write().

Verwandte Themen