2016-05-04 8 views
1

Dies ist meine Json Daten, ich sende dies zu kafka Themen, Lesen mit Funken rdd, und speichern in Cassandra.So speichern Sie den gesamten JSON von Kafka Thema zu Cassandra Tabelle, mit Spark Streaming

[{ 
"sensor": "swapSensor", 
"sendtime": "2016-09-15T11:05:01.000Z", 
"data": [{ 
"@context": "Context" 
     }] 
}] 

das ist mein Cassandras Tabelle CREATE TABLE IF NOT EXISTS event(sensor text,sendTime text,count bigint,entireJson text, PRIMARY KEY ((sensor)));

ich wollte gesamte json (raw) Daten in der Tabelle der Spalte entireJson schieben.

das ist mein Code.

object StreamingData { 

    var count = 1 


    def main(args: Array[String]) { 

    val Array(brokers, topics, cassandraHost) = Array("1.11.22.50:9092", "c", "localhost") 


    def createSparkContext(): StreamingContext = { 

     val conf = new SparkConf() 
     .setAppName("c Events Processing") 
     .setMaster("local[2]") 
     .set("spark.cassandra.connection.host", cassandraHost) 
     .set("spark.cassandra.connection.keep_alive_ms", "60000") // prevent cassandra connection from being closed after every write 

     val sc = new SparkContext(conf) 
     // Create the context 
     val ssc = new StreamingContext(sc, Seconds(8)) 
     val sqlContext = new SQLContext(sc); 

     // Kafka stream 
     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
     val topicsSet = topics.split(",").toSet 
     val cEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 

     cEvents.foreachRDD { rdd => 
     count = count + 1 
     sqlContext.read.json(rdd).registerTempTable("eventTable") 

     val eventdf1 = sqlContext.sql("SELECT * FROM eventTable") 


     eventdf1.collect.foreach(println) 

     val eventdf = sqlContext.sql("SELECT sensor, sendtime,data.actor FROM eventTable") 
     eventdf.printSchema() 
     eventdf.map { 
      case (r) => (r.getString(0) + count, sendtime, count,eventdf1) 
     } 
      .saveToCassandra("c", "event", SomeColumns("sensor", "sendtime", "count","entireJson")) 

     } 


     ssc 

    } 


    } 
+1

was ist Ihre Frage? – tesnik03

Antwort

0

Ich habe dies versucht, und es funktioniert, um Rohdaten in meine Cassandras-Tabellenspalte zu speichern.

var rawdata = "" 
     for (item <- rdd.collect().toArray) { 
      System.out.println(item); 
      rawdata = item 
     } 
Verwandte Themen