2017-11-14 5 views
0

Ich möchte unseren alten Cassandra-Cluster zu einem neuen migrieren.Migrate große Cassandra-Tabelle zu einem anderen Cluster mit Funken

Anforderungen: -

Ich habe einen cassandra Cluster von 10 Knoten und die Tabelle i migrieren möchten ~ 100 GB. Ich verwende Spark, um die Daten zu migrieren. Mein Funkencluster hat 10 Knoten und jeder Knoten hat ungefähr 16 GB Speicher. In der Tabelle haben wir einige Junk-Daten, die ich nicht in die neue Tabelle migrieren möchte. zB: - Sagen wir, ich möchte die Zeilen, die die cid = 1234 haben, nicht übertragen. Was ist also der beste Weg, um dies mithilfe eines Spark-Jobs zu migrieren? Ich kann nicht eine Stelle, die auf die CassandraRdd direkt filtern, da die CID nicht die einzige Spalte in partitionierten Schlüssel enthalten ist.

Cassandra Tabelle: -

test_table (
    cid text, 
    uid text, 
    key text, 
    value map<text, timestamp>, 
    PRIMARY KEY ((cid, uid), key) 
) 

Beispieldaten: -

cid | uid    | key  | value 
------+--------------------+-----------+------------------------------------------------------------------------- 
1234 | 899800070709709707 | testkey1 | {'8888': '2017-10-22 03:26:09+0000'} 
6543 | 097079707970709770 | testkey2 | {'9999': '2017-10-20 11:08:45+0000', '1111': '2017-10-20 15:31:46+0000'} 

ich von so etwas wie unten denke. Aber ich denke, das ist nicht der beste effiziente Ansatz.

val filteredRdd = rdd.filter { row => row.getString("cid") != "1234" } 
filteredRdd.saveToCassandra(KEYSPACE_NAME,NEW_TABLE_NAME) 

Was ist der bestmögliche Ansatz hier?

+0

Warum denken u, der Ansatz erwähnt Sie ist nicht leistungsfähig? Sie führen Filter auf RDD aus. Die Filteroperation ist eine verteilte Operation durch Funken. Sie können mit dem Spark Data Frame- oder Dataset-Filtering-Ansatz fortfahren. Aber entweder Sie führen Filter auf RDD oder DF/DS durch, es wird verteilt und leistungsfähig –

Antwort

0

Diese Methode ist ziemlich gut. Sie können es in DataFrames schreiben, um die Zeilencodierung zu nutzen, aber dies kann nur einen kleinen Vorteil haben. Der größte Engpass bei dieser Operation wird das Schreiben und Lesen von Cassandra sein.

DF Beispiel
spark 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .option("keyspace", ks) 
    .option("table", table) 
    .load 
    .filter('cid !== "1234") 
    .write 
    .format("org.apache.spark.sql.cassandra") 
    .option("keyspace", ks2) 
    .option("table", table2) 
    .save 
Verwandte Themen