2016-04-06 10 views
7

Ich habe eine Methode geschrieben, die eine Zufallszahl berücksichtigen muss, um eine Bernoulli-Verteilung zu simulieren. Ich benutze random.nextDouble, um eine Zahl zwischen 0 und 1 zu generieren und dann meine Entscheidung basierend auf diesem Wert unter Berücksichtigung meines Wahrscheinlichkeitsparameters zu treffen.Spark - Zufallszahlengenerierung

Mein Problem ist, dass Spark die gleichen Zufallszahlen innerhalb jeder Iteration meiner For-Schleife Mapping-Funktion generiert. Ich benutze die DataFrame API. Mein Code folgt diesem Format:

val myClass = new MyClass() 
val M = 3 
val myAppSeed = 91234 
val rand = new scala.util.Random(myAppSeed) 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 

Hier ist die Klasse:

class myClass extends Serializable { 
    val q = qProb 

    def myMethod(s: String, rand: Double) = { 
    if (rand <= q) // do something 
    else // do something else 
    } 
} 

Ich brauche eine neue Zufallszahl jedes Mal myMethod genannt wird. Ich habe auch versucht mit java.util.Random die Zahl in meinem Verfahren zu erzeugen, wie unten (scala.util.Random v10 nicht Serializable erstreckt), aber ich bin immer noch die gleichen Zahlen immer innerhalb jeder für Schleife

val r = new java.util.Random(s.hashCode.toLong) 
val rand = r.nextDouble() 

ich einige der Forschung getan haben, und Es scheint, dass dies mit Funken deterministischer Natur zu tun hat.

Antwort

2

Der Grund, warum die gleiche Sequenz wiederholt wird, ist, dass der Zufallsgenerator erzeugt und mit einem Samen initialisiert, bevor die Daten partitioniert ist. Jede Partition beginnt dann mit demselben Zufallssamen. Vielleicht nicht der effizienteste Weg, es zu tun, aber die folgende soll funktionieren:

val myClass = new MyClass() 
val M = 3 

for (m <- 1 to M) { 
    val newDF = sqlContext.createDataFrame(myDF 
    .map{ 
     val rand = scala.util.Random 
     row => RowFactory 
     .create(row.getString(0), 
     myClass.myMethod(row.getString(2), rand.nextDouble()) 
    }, myDF.schema) 
} 
+0

ich modifiziert diese leicht mein Problem zu lösen . Ich habe das Random val in meine Methode eingegeben und dort Zufallszahlen erzeugt. Dies löste mein Problem, aber ich musste 'java.util.Random' aus Gründen der Serialierbarkeit verwenden. –

4

Verwenden Sie einfach die SQL-Funktion rand:

import org.apache.spark.sql.functions._ 

//df: org.apache.spark.sql.DataFrame = [key: int] 

df.select($"key", rand() as "rand").show 
+---+-------------------+ 
|key|    rand| 
+---+-------------------+ 
| 1| 0.8635073400704648| 
| 2| 0.6870153659986652| 
| 3|0.18998048357873532| 
+---+-------------------+ 


df.select($"key", rand() as "rand").show 
+---+------------------+ 
|key|    rand| 
+---+------------------+ 
| 1|0.3422484248879837| 
| 2|0.2301384925817671| 
| 3|0.6959421970071372| 
+---+------------------+ 
+0

das ist nicht ganz mein Problem lösen, aber es ist eine elegante Lösung, die ich wahrscheinlich in der Zukunft verwenden werden, so +1 –

2

Nach this post, die beste Lösung ist nicht die new scala.util.Random in der Karte zu setzen, noch ganz außen (dh in dem Treibercode.), aber in einem Zwischen mapPartitionsWithIndex:

import scala.util.Random 
val myAppSeed = 91234 
val newRDD = myRDD.mapPartitionsWithIndex { (indx, iter) => 
    val rand = new scala.util.Random(indx+myAppSeed) 
    iter.map(x => (x, Array.fill(10)(rand.nextDouble))) 
} 
Verwandte Themen