Ich versuche, einen Filter auf einen JSON-Stream von Daten aus einem Kafka Direct Stream anzuwenden. Ich verwende net.liftweb lift-json_2.11
, um ein Beispiel JSON {"type": "fast", "k":%d}
zu analysieren. Dies ist mein Code:JSON mit Apache Funke filtern (NO SPARK SQL) - Scala
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val s1 = stream.map(record => parse(record.value))
Das Ergebnis s1.print()
ist:
...
JObject(List(JField(type,JString(fast)), JField(k,JInt(11428))))
JObject(List(JField(type,JString(fast)), JField(k,JInt(11429))))
JObject(List(JField(type,JString(fast)), JField(k,JInt(11430))))
...
Wie kann ich einen Funken Filter auf dem k
Feld anwenden? Zum Beispiel: k%2==0
Ich möchte nicht SparkSQL verwenden, weil ich auch Joins auf Datenströme anwenden muss und SparkSQL erlaubt mir nicht, es zu tun. Dank
Können Sie nicht tragen Sie einfach ein '.filter' zu' stream.map (record => Parse (record.value)) '? – Interfector
Definieren Sie eine Fallklasse, die den JSON-Eintrag darstellt, z. 'case class Entry (Typ: String, k: Int)', dann benutze 'parse (record.value) .extract [Entry]', um einen Stream von 'Entry's zu erhalten. Die Filterung sollte auf "s1.filter" (e => e.k% 2 == 0) 'reduziert werden. –
@HristoIliev Wenn ich versuche, bekomme ich eine "Ausnahme im Thread" main "org.apache.spark.SparkException: Task nicht serialisierbar" Fehler –