Dies ist meine Json Daten, ich sende dies zu kafka Themen, Lesen mit Funken rdd, und speichern in Cassandra.So speichern Sie den gesamten JSON von Kafka Thema zu Cassandra Tabelle, mit Spark Streaming
[{
"sensor": "swapSensor",
"sendtime": "2016-09-15T11:05:01.000Z",
"data": [{
"@context": "Context"
}]
}]
das ist mein Cassandras Tabelle CREATE TABLE IF NOT EXISTS event(sensor text,sendTime text,count bigint,entireJson text, PRIMARY KEY ((sensor)));
ich wollte gesamte json (raw) Daten in der Tabelle der Spalte entireJson schieben.
das ist mein Code.
object StreamingData {
var count = 1
def main(args: Array[String]) {
val Array(brokers, topics, cassandraHost) = Array("1.11.22.50:9092", "c", "localhost")
def createSparkContext(): StreamingContext = {
val conf = new SparkConf()
.setAppName("c Events Processing")
.setMaster("local[2]")
.set("spark.cassandra.connection.host", cassandraHost)
.set("spark.cassandra.connection.keep_alive_ms", "60000") // prevent cassandra connection from being closed after every write
val sc = new SparkContext(conf)
// Create the context
val ssc = new StreamingContext(sc, Seconds(8))
val sqlContext = new SQLContext(sc);
// Kafka stream
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val topicsSet = topics.split(",").toSet
val cEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
cEvents.foreachRDD { rdd =>
count = count + 1
sqlContext.read.json(rdd).registerTempTable("eventTable")
val eventdf1 = sqlContext.sql("SELECT * FROM eventTable")
eventdf1.collect.foreach(println)
val eventdf = sqlContext.sql("SELECT sensor, sendtime,data.actor FROM eventTable")
eventdf.printSchema()
eventdf.map {
case (r) => (r.getString(0) + count, sendtime, count,eventdf1)
}
.saveToCassandra("c", "event", SomeColumns("sensor", "sendtime", "count","entireJson"))
}
ssc
}
}
was ist Ihre Frage? – tesnik03