2017-05-10 3 views
0

Ich versuche, einen Filter auf einen JSON-Stream von Daten aus einem Kafka Direct Stream anzuwenden. Ich verwende net.liftweb lift-json_2.11, um ein Beispiel JSON {"type": "fast", "k":%d} zu analysieren. Dies ist mein Code:JSON mit Apache Funke filtern (NO SPARK SQL) - Scala

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) 

val s1 = stream.map(record => parse(record.value)) 

Das Ergebnis s1.print() ist:

... 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11428)))) 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11429)))) 
JObject(List(JField(type,JString(fast)), JField(k,JInt(11430)))) 
... 

Wie kann ich einen Funken Filter auf dem k Feld anwenden? Zum Beispiel: k%2==0

Ich möchte nicht SparkSQL verwenden, weil ich auch Joins auf Datenströme anwenden muss und SparkSQL erlaubt mir nicht, es zu tun. Dank

+0

Können Sie nicht tragen Sie einfach ein '.filter' zu' stream.map (record => Parse (record.value)) '? – Interfector

+0

Definieren Sie eine Fallklasse, die den JSON-Eintrag darstellt, z. 'case class Entry (Typ: String, k: Int)', dann benutze 'parse (record.value) .extract [Entry]', um einen Stream von 'Entry's zu erhalten. Die Filterung sollte auf "s1.filter" (e => e.k% 2 == 0) 'reduziert werden. –

+0

@HristoIliev Wenn ich versuche, bekomme ich eine "Ausnahme im Thread" main "org.apache.spark.SparkException: Task nicht serialisierbar" Fehler –

Antwort

0

LÖSUNG:

//spark import 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.dstream.DStream 

//kafka import 
import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark.streaming.kafka010._ 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 

//json library import 
import org.json4s._ 
import org.json4s.native.JsonMethods._ 
import org.json4s.native.Serialization 
import org.json4s.native.Serialization.{read, write} 

object App { 

    def main(args : Array[String]) { 

// Create the context with a 1 second batch size 
val sparkConf = new SparkConf().setAppName("SparkScript").setMaster("local[4]") 
val ssc = new StreamingContext(sparkConf, Seconds(5)) 

case class MySens(elem: String, k: Int, pl: String) 

val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "localhost:9092", 
    "key.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "value.deserializer" -> classOf[StringDeserializer].getCanonicalName, 
    "group.id" -> "test_luca", 
    "auto.offset.reset" -> "latest", 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 

val topics1 = Array("fast-messages") 

val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)) 

val s1 = stream.map(record => { 
    implicit val formats = DefaultFormats 
    parse(record.value).extract[MySens] 
} 
) 

val p1 = s1.filter {e => e.k.%(10)==0} 

p1.print() 

ssc.start() 
ssc.awaitTermination() 
} 

}