2016-11-20 2 views
0

Ich bin Begine zu Spark; Ich arbeite an Spark-Streaming-Anwendungsfall, wo ich eine JSON-Nachrichten erhalten jede JSON-Nachricht hat ein Attribut 'Wert', die nach dem Analysieren JSON Double ist Ich bekomme ein Array [Double] .Ich möchte herausfinden, max (Wert) und min (Wert) für die letzten 15 Sekunden mit Schiebefenster von 2 Sek. Hier ist mein Code.Spark Streaming Schiebefenster max und min

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2) 
val lines=record.map(_._2) 

val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) } 
          .window(Seconds(15),Seconds(2)) 

     valueDtsream.foreachRDD 
     { 
     rdd => 
      if (!rdd.partitions.isEmpty) 
      { 
       //code to find min and max 
      } 
     } 

ssc.start() 
ssc.awaitTermination() 

Antwort

1

Versuchen:

valueDtsream.transform(rdd => { 
    val stats = rdd.flatMap(x => x).stats 
    rdd.sparkContext.parallelize(Seq((stats.min, stats.max))) 
}) 
+0

Vielen Dank für die Antwort, sorry für diese dumme Frage, warum ich tun müssen, verwandeln und wieder parallelisieren. Ist der untere Code nicht genug, um max, min Werte auf der Konsole alle 15 Sekunden zu drucken. valueDtsream.foreachRDD { rdd => if (! rdd.partitions.isEmpty) { val = stats rdd.flatMap (x => x) println (stats.min, stats.max) } } – nilesh1212

+0

Wenn Sie nur drucken möchten, können Sie 'foreachRDD' verwenden und den Rest löschen. 'parallelize' weil' transform' '' RDD => RDD' ist. –

+0

Danke, dass ich diese Antwort für meine Frage angenommen habe. Ich habe noch 1 Problem. Ich habe den Code ausprobiert (ohne transformieren und parallelisieren), aber mein Spark-Streaming-Job druckt die Werte min, max alle 2 Sek. Idealerweise sollte min, max nach 15 Sek. des Fensters nur gedruckt werden, aber dies geschieht nicht – nilesh1212