0

Ich habe einen Spark-Job, der liest Daten aus einer Cassandra-Tabelle und speichert das Ergebnis zurück in zwei Tabellen mit geringfügigen Änderungen. Mein Problem ist, dass der Job viel länger dauert als erwartet.Spark-Cassandra schreiben dauert länger als erwartet

Der Code lautet wie folgt:

val range = sc.parallelize(0 to 100) 

val rdd1 = range.map(x => (some_value, x)).joinWithCassandraTable[Event](keyspace_name, table2).select("col1", "col2", "col3", "col4", "col5", "col6", "col7").map(x => x._2) 

val rdd2: RDD[((Int, String, String, String), Iterable[Event])] = rdd1.keyBy(r => (r.col1, r.col2, r.col3, r.col4)).groupByKey 

val rdd3 = rdd2.mapValues(iter => someFunction(iter.toList.sorted)) 

//STORE 1 

rdd3.map(r => (r._1._1, r._1._2, r._1._3, r._1._4, r._2.split('|')(1).toDouble)).saveToCassandra(keyspace_name, table1, SomeColumns("col1","col2", "col3","col4", "col5")) 

//STORE 2 

rdd3.map(r => (to, r._1%100, to, "MANUAL_"+r._1+"_"+r._2+"_"+r._3+"_"+r._4+"_"+java.util.UUID.randomUUID(), "M", to, r._4, r._3, r._1, r._5, r._2)).saveToCassandra(keyspace_name, table2, SomeColumns("col1", "col2", "col3", "col4", "col5", "col6", "col7", "col8", "col9", "col10", "col11")) 

Seit etwa einer Million Datensätzen, Speicher 1 bis 40 Sekunden dauert und Schließen Speicher 2 (leichte Modifikation RDD3) dauert länger als eine Minute. Bin mir nicht sicher, wo ich falsch liege oder warum es so viel Zeit kostet. Meine Funken Umwelt ist wie folgt:

DSE 4.8.9 mit 6 Knoten 70 GB RAM 12 Kerne jeweils

Jede Hilfe sehr geschätzt wird.

+0

haben Sie RDD3 versucht, indem einen Checkpoint und sehen es schneller geht? –

+0

Nein. Es geht nicht schneller –

Antwort

0

Lassen Sie mich meine Vermutung. Protokolle, Perf-Monitoring-Output und C * -Datenmodell werden für eine genauere Antwort benötigt. Aber einige Mathematik: Sie haben

  • joinWithCassandra - zufällig C * lesen
  • saveToCassandra - sec C * Schreib
  • Funken repartition?/Reduzieren

(Ich erwarte saveToCassadndra die Hälfte aller Zeit in Anspruch nimmt) und wenn Sie laufen nicht alle Fragen, bevor Sie zu minus 12-20 sec für die Funken brauchen

Vollstrecker und andere Dinge zu starten

SO 1M Einträge auf 6 Knoten und 40 Sekunden haben Sie: 1000000/6/40 = 4166 Datensatz/Sek/Knoten. Das ist nicht schlecht. 10 K/s pro Knoten mit gemischter Arbeitslast ist ein gutes Ergebnis.

Der zweite Schreibvorgang ist 2 mal größer (11 Spalte im Vergleich zu 5) und läuft nach dem ersten, also erwarte ich, dass Kassandra zu diesem Zeitpunkt damit beginnt, vorherige Daten auf die Festplatte zu übertragen, so dass Sie hier mehr Leistung verlieren können.

verstehe ich richtig, wenn Sie den Aufruf von rdd3.cache() hinzufügen, änderte sich nichts für den zweiten Lauf? Das ist seltsam.

und ja, man kann besser werden Ergebnisse mit Tuning von C * Datenmodell und Funken/C * Parameter

Verwandte Themen