Ich entwickle einen Algorithmus mit Kafka und Spark-Streaming. Dies ist ein Teil meines Empfänger:Spark-Streaming mit Kafka: leere Sammlung Ausnahme
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val slice=30
val lines = messages.map(_._2)
val dStreamDst=lines.transform(rdd => {
val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
})
dStreamDst.print()
, auf dem ich die folgende Fehlermeldung:
ERROR JobScheduler: Error generating jobs for time 1484927230000 ms
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
Was bedeutet dies? Wie könnte ich es lösen? Jede Art von Hilfe ist wirklich appreciated..thanks im Voraus
Update: gelöst. Verwenden Sie nicht die Methode transform oder print(). Verwenden Sie foreachRDD, ist die beste Lösung.
Ich verwandle bin mit(), weil, wenn ich es nicht verwenden, ich nicht in der Lage bin, diese Operation zu tun: rdd.map (x => (((x.split ("") (0) .toInt - y.toInt) .toLong/Scheibe) .round * Scheibe + "" + (x.split (",") (2)), 1)). ReduceByKey (_ + _) –
Schön genug , aktualisierte meine Antwort. – jeff