2016-07-21 4 views
0

Unten Code funktioniert gut, aber es dauert einige Zeit, um nach Cassandra zu schreiben, wenn wir einen riesigen Zustrom von Transaktionen haben.Spark Streaming - mit foreachPartition und saveToCassandra für bessere Parallelisierung

Unten Code ist schriftlich in Cassandra in sequenziellen und führt in einem Executor.

parsedStream.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 

Aber ich wollte den Schreibvorgang an Cassandra parallelisieren, indem ich foreachPartition mache. Aber ich sehe nicht saveToCassandra Option bei ForeachPartition.

parsedStream.foreachRDD{rdd => 
    rdd.foreachPartition { partition => 
     partition.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 
    } 
} 

Wie können wir das erreichen?

+0

'saveToCassandra' ist auf der Ebene' RDD'/'DStream' definiert, während' partition' eine einfache Skala 'Iterator' ist, daher sieht man es nicht definiert. –

+0

OK. Irgendeine Idee, wie wir erreichen könnten, Cassandra in parallelen Exekutionen von allen meinen Executoren zu schreiben? – JKPEAK

+0

könnten Sie tun parseStream.repartition (num) .saveToCassandra' – Knight71

Antwort

0

Da Sie bereits Direct Stream verwenden, gibt es zwei Möglichkeiten, die Parallelität zu erhöhen.

  1. Erhöhen Sie die Anzahl der Kafka-Partitionen. Bei der Verwendung von Direct Stream erstellt Spark automatisch so viele Partitionen wie Kafka. Je nach Setup ist dies jedoch möglicherweise nicht möglich.

  2. Mit Spark repartition. In den meisten Fällen ist es besser, repartition am Eingang als an der Ausgabe.

    val num: Int = ? // Number of partitition 
    val parsedStream = stream.repartition(num).map(_._2).map(EmpParser.parse(_)) 
    parsedStream.saveToCassandra(...) 
    

Wenn Sie nur parsedStream einmal zu verwenden, ist es nicht notwendig, es zu cachen.

+0

Ich habe eine Anforderung, die gleichen Daten zu elastischen Suche auch zu schreiben. parsedStream.saveToCassandra ("test", "ct_table", SomeColumns ("emp_id", "emp_name", "emp_sal", "emp_dept")) momentan speichere ich wie unten beschrieben elastisch. Bitte schlagen Sie einen parellierten Ansatz zum Speichern in ElasticSearch vor. parsedStream.foreachRDD (rdd => {rdd.saveToEs ("test/emp")}) – JKPEAK

+0

Wenn mein Code in Cassandra und Elasticsearch schreibt, erhöht sich die Verarbeitungszeit auf 3 Sekunden (das hängt von der Datengröße ab). Aber wenn ich versuche nur auf Cassandra zu schreiben, dann beträgt die Verarbeitungszeit 0,3 bis 0,7 Sekunden. – JKPEAK