2014-07-22 12 views
6

Das folgende ist einfacher Code, um die Wortzahl über eine Fenstergröße von 30 Sekunden und eine Foliengröße von 10 Sekunden zu erhalten.Spark-Streaming-Fenster-Operation

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.api.java.function._ 
import org.apache.spark.streaming.api._ 
import org.apache.spark.storage.StorageLevel 

val ssc = new StreamingContext(sc, Seconds(5)) 

// read from text file 
val lines0 = ssc.textFileStream("test") 
val words0 = lines0.flatMap(_.split(" ")) 

// read from socket 
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) 
val words1 = lines1.flatMap(_.split(" ")) 

val words = words0.union(words1) 
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

wordCounts.print() 
ssc.checkpoint(".") 
ssc.start() 
ssc.awaitTermination() 

Aber ich bin immer Fehler aus dieser Zeile:

val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

. Insbesondere von _ + _. Der Fehler ist

51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3)) 

Könnte mir jemand sagen, was das Problem ist? Vielen Dank!

Antwort

10

Dies ist extrem einfach zu beheben, nur explizit über die Typen.
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))

Der Grund scala die Art in diesem Fall nicht ableiten kann, ist in this answer

+0

Danke erklärt! Nach der Änderung gab das Programm die erwarteten Ergebnisse, aber es gab in der Zwischenzeit einen weiteren Fehler: java.util.NoSuchElementException: Schlüssel nicht gefunden: 1406051860000 ms \t bei scala.collection.MapLike $ class.default (MapLike.scala : 228) \t bei scala.collection.AbstractMap.default (Map.scala: 58) \t bei scala.collection.mutable.HashMap.apply (HashMap.scala: 64) \t bei org.apache.spark.streaming. dstream.ReceiverInputDStream.getReceivedBlockInfo (ReceiverInputDStream.scala: 77) Ich frage mich, wie ist das passiert? – user2895478

+0

@ user2895478 Ich glaube, dass von diesem [Jira Ticket] (https://issues.apache.org/jira/browse/SPARK-2009) das Problem in 1.0.1 und 1.1.0 gelöst ist – aaronman