2015-05-04 16 views
7

ich eine cassandra Tabelle mit einem Feld vom Typ Text namens Snapshot enthält JSON-Objekte habe:Spark-JSON Textfeld RDD

[identifier, timestamp, snapshot] 

verstand ich, dass in der Lage sein, Transformationen mit Funken auf diesem Gebiet zu tun, Ich muss dieses Feld dieser RDD in eine andere RDD konvertieren, um Transformationen im JSON-Schema vorzunehmen.

Ist das korrekt? Wie soll ich dazu vorgehen?

Edit: Vorerst gelang es mir, eine RDD von einem einzelnen Textfeld zu erstellen:

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val first = snapshots.first() 
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3))) 
firstJson.printSchema() 

die mir das JSON Schema zeigt. Gut!

Wie gehe ich fort, um Spark mitzuteilen, dass dieses Schema auf alle Zeilen der Tabelle Snapshots angewendet werden soll, um eine RDD für dieses Snapshot-Feld aus jeder Zeile zu erhalten?

+0

Wenn ich mich richtig passieren verstehen, Sie haben mehrere JSON-Objekte in jedem Feld in der Cassandra-Tabelle, und Sie müssen jedes Objekt unabhängig berechnen. –

+0

Ja, Sie haben recht, aber ich habe irgendwo gelesen, dass Spark dieses Textfeld als JSON verstehen kann und dass ich Transformationen an einigen Werten dieser JSons durchführen kann, ist das korrekt? – galex

Antwort

12

Fast da, man muss nur wollen Ihr eine RDD [Zeichenfolge] mit Ihrem json in die jsonRDD Methode

val conf = new SparkConf().setAppName("signal-aggregation") 
val sc = new SparkContext(conf) 
val sqlContext = new SQLContext(sc) 
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots") 
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly 
jsonSchemaRDD.registerTempTable("testjson") 
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

Ein kurzes Beispiel

val stringRDD = sc.parallelize(Seq(""" 
    { "isActive": false, 
    "balance": "$1,431.73", 
    "picture": "http://placehold.it/32x32", 
    "age": 35, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": true, 
    "balance": "$2,515.60", 
    "picture": "http://placehold.it/32x32", 
    "age": 34, 
    "eyeColor": "blue" 
    }""", 
    """{ 
    "isActive": false, 
    "balance": "$3,765.29", 
    "picture": "http://placehold.it/32x32", 
    "age": 26, 
    "eyeColor": "blue" 
    }""") 
) 
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson") 
csc.sql("SELECT age from testjson").collect 
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26]) 
+0

Perfekt, danke! – galex