2015-03-11 10 views
5

Sie fragen sich, warum die StatefulNetworkWordCount.scala Beispiel nennt die berüchtigte updateStateByKey() -Funktion, die eine Funktion nur als Parameter mit stattdessen nehmen soll:Funken Beispiel Streaming ruft updateStateByKey mit zusätzlichen Parametern

val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, 
    new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD) 

Warum die Notwendigkeit (und wie wird das verarbeitet - das ist nicht in der Signatur von updateStateByKey()?), um einen Partitionierer, einen booleschen und einen RDD zu übergeben?

Dank, Matt

Antwort

4

Es ist, weil:

  1. Sie die verschiedenen Funken Release-Zweig zu sehen: https://github.com/apache/spark/blob/branch-1.3/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala. In Spark 1.2 wurde dieser Code mit nur updateStateByKey erhalten eine einzige Funktion als Parameter, während in 1.3 haben sie es optimiert
  2. Verschiedene Versionen von updateStateByKey existieren in beiden 1.2 und 1.3. Aber in 1.2 gibt es keine Version mit 4 Parametern wurde nur in 1.3 eingeführt: https://github.com/apache/spark/blob/branch-1.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala

Hier ist der Code:

/** 
* Return a new "state" DStream where the state for each key is updated by applying 
* the given function on the previous state of the key and the new values of each key. 
* org.apache.spark.Partitioner is used to control the partitioning of each RDD. 
* @param updateFunc State update function. Note, that this function may generate a different 
* tuple with a different key than the input key. Therefore keys may be removed 
* or added in this way. It is up to the developer to decide whether to 
* remember the partitioner despite the key being changed. 
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new 
* DStream 
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. 
* @param initialRDD initial state value of each key. 
* @tparam S State type 
*/ 
def updateStateByKey[S: ClassTag](
    updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], 
    partitioner: Partitioner, 
    rememberPartitioner: Boolean, 
    initialRDD: RDD[(K, S)] 
): DStream[(K, S)] = { 
    new StateDStream(self, ssc.sc.clean(updateFunc), partitioner, 
    rememberPartitioner, Some(initialRDD)) 
} 
Verwandte Themen