Ich habe einen Spark-Job, der liest Daten aus einer Cassandra-Tabelle und speichert das Ergebnis zurück in zwei Tabellen mit geringfügigen Änderungen. Mein Problem ist, dass der Job viel länger dauert als erwartet.Spark-Cassandra schreiben dauert länger als erwartet
Der Code lautet wie folgt:
val range = sc.parallelize(0 to 100)
val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2)
val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey
val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted))
//STORE 1
rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5"))
//STORE 2
rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11"))
Seit etwa einer Million Datensätzen, Speicher 1 bis 40 Sekunden dauert und Schließen Speicher 2 (leichte Modifikation RDD3) dauert länger als eine Minute. Bin mir nicht sicher, wo ich falsch liege oder warum es so viel Zeit kostet. Meine Funken Umwelt ist wie folgt:
DSE 4.8.9 mit 6 Knoten 70 GB RAM 12 Kerne jeweils
Jede Hilfe sehr geschätzt wird.
haben Sie RDD3 versucht, indem einen Checkpoint und sehen es schneller geht? –
Nein. Es geht nicht schneller –