1

Ich bin einen DSTREAM mit einem Schlüssel-Wert-Paar VideoID-Benutzer-ID mit, was eine gute Praxis ist eine deutliche UserID Gruppe von VideoID zählen?Spark-Streaming - Count eigenständiges Element im Zustand

// VideoID,UserID 
foo,1 
foo,2 
bar,1 
bar,2 
foo,1 
bar,2 

Wie oben, ich will durch Entfernen redundanter foo,1 und bar,2 jederzeit VideoID-CountUserID bekommen, so sollte das Ergebnis sein:

foo: 2 
bar: 2 

Mit anderen Worten, ich möchte eine große halten Zustandsdatensatz im Speicher. Wenn ein neuer Batch von dstream ankommt, vergleicht er ihn mit dem Dataset, um die einzelnen Nutzer jedes Videos zu zählen.

Wie geht das?

Ich arbeite an Spark-1.6, aber eine Antwort der Weiten Version akzeptiert wird. Python-Code, wenn möglich.

Antwort

0

Um eine eindeutige Anzahl von Benutzer-IDs zu erhalten, die nach Video-IDs gruppiert sind, sollten Sie aggregateByKey verwenden. Tut mir leid, das ist Scala, also musst du übersetzen.

val rdd = sc.textFile("your_file.txt") 

val initialSet = Set.empty[Int] 
val addToSet = (s: Set[Int], v:Int) => s + v 
val mergeSets = (s1: Set[Int], s2: Set[Int]) => s1 ++ s2 

val distinctValSets = rdd.aggregateByKey(initialSet)(addToSet, mergeSets) 
val distinctValCountd = rdd.map({case(k,s) => (k,s.size)}) 

Anfangssatz ist der Anfangswert des Aggregations Objekt, addToSet und mergeSets festlegen, wie Werte zu Ihrem Satz hinzuzufügen und verschiedene Sets verschmelzen auf Schlüssel basiert. Dies sollte Ihnen die eindeutige Menge von Benutzern geben, die mit jedem Video verbunden sind, und ist billiger (Platzweise) als reduceByKey und groupByKey.

+0

Dank für das Verbringen der Zeit! Jetzt denke ich, dass es keine gute Möglichkeit ist, einen großen Datensatz im Speicher zu halten. Daher verwende ich die Fensterfunktion, um in einer Periode anders zu zählen. Vielen Dank! –

0
val rdd1 = sc.parallelize(Seq(("foo", 1),("foo", 2),("foo", 1))) 
    rdd1.groupByKey.mapValues(x=>x.toSet.toSeq).flatMapValues(x=>x).collect