2015-08-27 12 views
16

Ich habe Datensatz, der Benutzer und Kaufdaten enthält. Hier ist ein Beispiel, in dem das erste Element userId, das zweite Produkt productId und das dritte Element boolean ist.Geschichtete Stichprobe in Spark

(2147481832,23355149,1) 
(2147481832,973010692,1) 
(2147481832,2134870842,1) 
(2147481832,541023347,1) 
(2147481832,1682206630,1) 
(2147481832,1138211459,1) 
(2147481832,852202566,1) 
(2147481832,201375938,1) 
(2147481832,486538879,1) 
(2147481832,919187908,1) 
... 

Ich möchte sicherstellen, dass ich nur 80% der einzelnen Nutzer Daten übernehmen und eine RDD bauen, während die restlichen 20% übernehmen und eine andere RDD bauen. Lets Anruf Zug und Test. Ich möchte nicht mit groupBy anfangen, da es Speicherprobleme verursachen kann, da der Datensatz groß ist. Was ist der beste Weg, dies zu tun?

Ich könnte folgendes tun, aber das wird nicht geben 80% von jedem Benutzer.

Antwort

18

Eine Möglichkeit ist in Holden Antwort, und dies ist ein anderes:

Sie die Transformation sampleByKeyExact verwenden können, von der PairRDDFunctions Klasse.

sampleByKeyExact (boolean withReplacement, scala.collection.Map Fraktionen, lange Samen) eine Teilmenge Rückkehr dieses RDD durch Schlüssel abgetastet (via geschichtetem sampling) enthalten genau Math.ceil (numItems * Samplingrate) für jeden Stratum (Gruppe von Paaren mit dem gleichen Schlüssel).

Und das ist, wie ich tun würde:

die folgende Liste Betrachtet:

val list = List((2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)) 

Ich würde ein RDD Paar erstellen, abbildet alle Benutzer als Schlüssel:

val data = sc.parallelize(list.toSeq).map(x => (x._1,(x._2,x._3))) 

Dann werde ich fractions für jeden Schlüssel wie folgt einrichten, seit Sie bemerkt haben, dass sa mpleByKeyExact nimmt eine Karte der Fraktion für jeden Schlüssel:

val fractions = data.map(_._1).distinct.map(x => (x,0.8)).collectAsMap 

Was ich hier ist getan haben, tatsächlich, Mapping auf den Tasten verschieden zu finden und dann jede Taste auf einen Bruchteil zuzuordnen ist gleich 0,8, dann sammle ich das ganze als eine Karte.

jetzt zu probieren, alles, was ich tun müssen, ist:

import org.apache.spark.rdd.PairRDDFunctions 
val sampleData = data.sampleByKeyExact(false, fractions, 2L) 

oder

val sampleData = data.sampleByKeyExact(withReplacement = false, fractions = fractions,seed = 2L) 

Sie die Zählung auf Ihre Schlüssel oder Daten oder Datenprobe überprüfen:

scala > data.count 
// [...] 
// res10: Long = 12 

scala > sampleData.count 
// [...] 
// res11: Long = 10 

EDIT: Ich habe beschlossen, ein Teil hinzufügen, um geschichtete Stichproben o durchzuführen n DataFrame s.

Also werden wir die gleichen Daten (list) aus dem obigen Beispiel betrachten.

val df = list.toDF("keyColumn","value1","value2") 
df.show 
// +----------+----------+------+ 
// | keyColumn| value1|value2| 
// +----------+----------+------+ 
// |2147481832| 23355149|  1| 
// |2147481832| 973010692|  1| 
// |2147481832|2134870842|  1| 
// |2147481832| 541023347|  1| 
// |2147481832|1682206630|  1| 
// |2147481832|1138211459|  1| 
// |2147481832| 852202566|  1| 
// |2147481832| 201375938|  1| 
// |2147481832| 486538879|  1| 
// |2147481832| 919187908|  1| 
// | 214748183| 919187908|  1| 
// | 214748183| 91187908|  1| 
// +----------+----------+------+ 

Wir werden die zugrunde liegenden RDD brauchen, an dem zu tun, wir die erste Spalte sein Tupeln der Elemente in diesem RDD durch die Definition unserer Schlüssel erstellt:

val data: RDD[(Int, Row)] = df.rdd.keyBy(_.getInt(0)) 
val fractions: Map[Int, Double] = data.map(_._1) 
             .distinct 
             .map(x => (x, 0.8)) 
             .collectAsMap 

val sampleData: RDD[Row] = data.sampleByKeyExact(withReplacement = false, fractions, 2L) 
           .values 

val sampleDataDF: DataFrame = spark.createDataFrame(sampleData, df.schema) // you can use sqlContext.createDataFrame(...) instead for spark 1.6) 

Sie können nun die Zählung überprüfen auf Ihre Schlüssel oder df oder Datenprobe:

scala > df.count 
// [...] 
// res9: Long = 12 

scala > sampleDataDF.count 
// [...] 
// res10: Long = 10 

EDIT 2: Da Spark-1.5.0 können Sie DataFrameStatFunc verwenden tions.sampleBy Methode:

df.stat.sampleBy("keyColumn", fractions, seed) 
+0

Das ist großartig, kann ich die Subtraktion, um die 20% zu bekommen? oder empfehlen Sie eine bessere Lösung? –

+1

gut, sobald Sie die neue RDD haben, können Sie alles damit machen. – eliasah

+0

Ich sehe die Beispielmethode auf der RDD, aber nicht die sampleByKeyExact –

2

So etwas ist vielleicht gut für etwas wie "Blink DB" geeignet, aber schauen wir uns die Frage an. Es gibt zwei Möglichkeiten zu interpretieren, was Sie gefragt haben:

1) Sie möchten 80% Ihrer Benutzer, und Sie möchten alle Daten für sie. 2) Sie wollen 80% der einzelnen Benutzer Daten

Für 1 # Sie eine Karte tun könnte die Benutzer-IDs, rufen verschieden zu bekommen, und dann bei kFold in MLUtils sehen Probe 80% von ihnen (Sie möchten oder BernoulliCellSampler). Sie können Ihre Eingabedaten dann auf die gewünschten IDs beschränken.

Für # 2 können Sie BernoulliCellSampler betrachten und einfach direkt anwenden.

Verwandte Themen