2017-01-29 2 views
1

Ich entwickle einen Algorithmus mit Kafka und Spark-Streaming. Dies ist ein Teil meines Empfänger:Spark-Streaming mit Kafka: leere Sammlung Ausnahme

val Array(brokers, topics) = args 
val sparkConf = new SparkConf().setAppName("Traccia2014") 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
val slice=30 
val lines = messages.map(_._2) 
val dStreamDst=lines.transform(rdd => { 
    val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b) 
    rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _) 
}) 
dStreamDst.print() 

, auf dem ich die folgende Fehlermeldung:

ERROR JobScheduler: Error generating jobs for time 1484927230000 ms 
java.lang.UnsupportedOperationException: empty collection 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034) 

Was bedeutet dies? Wie könnte ich es lösen? Jede Art von Hilfe ist wirklich appreciated..thanks im Voraus

Update: gelöst. Verwenden Sie nicht die Methode transform oder print(). Verwenden Sie foreachRDD, ist die beste Lösung.

Antwort

1

Sie begegnen diesem b/c Sie interagieren mit dem DStream mit der transform() API. Wenn Sie diese Methode verwenden, erhalten Sie die RDD, die diesen Snapshot der Daten in der Zeit darstellt, in Ihrem Fall das 10-Sekunden-Fenster. Ihr Code schlägt fehl, weil in einem bestimmten Zeitfenster keine Daten vorhanden waren und die RDD, auf der Sie arbeiten, leer ist. Dadurch erhalten Sie beim Aufruf von reduce() den Fehler "leere Sammlung".

Verwenden Sie die rdd.isEmpty(), um sicherzustellen, dass die RDD nicht leer ist, bevor Sie Ihre Operation aufrufen.

lines.transform(rdd => { 
    if (rdd.isEmpty) 
    rdd 
    else { 
    // rest of transformation 
    } 
}) 
+0

Ich verwandle bin mit(), weil, wenn ich es nicht verwenden, ich nicht in der Lage bin, diese Operation zu tun: rdd.map (x => (((x.split ("") (0) .toInt - y.toInt) .toLong/Scheibe) .round * Scheibe + "" + (x.split (",") (2)), 1)). ReduceByKey (_ + _) –

+0

Schön genug , aktualisierte meine Antwort. – jeff

Verwandte Themen