2

Ich verwende den Datastax Cassandra Java-Treiber, um von Funkenarbeitern an Cassandra zu schreiben. Code-SnippetLesen und Schreiben an Cassandra von Spark worker wirft Fehler

rdd.foreachPartition(record => { 
     val cluster = SimpleApp.connect_cluster(Spark.cassandraip) 
     val session = cluster.connect() 
     record.foreach { case (bin_key: (Int, Int), kpi_map_seq: Iterable[Map[String, String]]) => { 
     kpi_map_seq.foreach { kpi_map: Map[String, String] => { 
      update_tables(session, bin_key, kpi_map) 
     } 
     } 
     } 
     } //record.foreach 
     session.close() 
     cluster.close() 
    } 

Beim Lesen mir die Funken cassandra Anschluss verwenden (das intern die gleichen Treiber verwendet Ich gehe davon aus)

val bin_table = javaFunctions(Spark.sc).cassandraTable("keyspace", "bin_1") 
     .select("bin").where("cell = ?", cellname) // assuming this will run on worker nodes 
    println(s"get_bins_for_cell Count of Bins for Cell $cellname is ", cell_bin_table.count()) 
    return bin_table 

diesen jeweils zu einem Zeitpunkt tun verursacht kein Problem. Wenn Sie es zusammen tun, werfen Sie diese Stack-Spur.

Mein Hauptziel ist nicht das Schreiben oder Lesen direkt aus dem Spark-Treiberprogramm zu tun. Dennoch scheint es, dass es etwas mit dem Kontext zu tun hat; zwei Kontext wird verwendet?

16/07/06 06:21:29 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 22, euca-10-254-179-202.eucalyptus.internal): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_5_piece0 of broadcast_5 
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222) 
     at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
     at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
     at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
     at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
     at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Antwort

0

Die Spark Context wurde nach der Verwendung der Sitzung mit Cassandra wie unten

Beispiel

def update_table_using_cassandra_driver() ={ 
CassandraConnector(SparkWriter.conf).withSessionDo { session => 
val statement_4: Statement = QueryBuilder.insertInto("keyspace", "table") 
      .value("bin", my_tuple_value) 
      .value("cell", my_val("CName")) 
    session.executeAsync(statement_4) 
    ... 
} 

Also das nächste Mal, wenn ich nenne das

in der Schleife immer geschlossen war ich Ausnahme bekommen. Sieht aus wie ein Fehler im Cassandra-Treiber, muss dies überprüfen. Einstweilen tat, um dieses die folgenden arbeiten

for(a <- 1 to 1000) { 
    val sc = new SparkContext(SparkWriter.conf) 
    update_table_using_cassandra_driver() 
    sc.stop() 
    ...sleep(xxx) 
} 
+1

könnten Sie mir bitte mit diesem 'http://stackoverflow.com/questions/39363586/issue-while-storing-data-from-spark-streaming helfen -zu-Cassanadra' – Naresh

Verwandte Themen