2017-10-25 4 views

Antwort

3
  • Sie können RDD.randomSplit verwenden, die vorhandene RDD basierend auf den in den Parametern übergebenen Gewichtungen und dem Rückgabe-Array von RDDs teilen.

Die internen Arbeits wie unten sein ...

/** 
* Randomly splits this RDD with the provided weights. 
* 
* @param weights weights for splits, will be normalized if they don't sum to 1 
* @param seed random seed 
* 
* @return split RDDs in an array 
*/ 
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { 
    require(weights.forall(_ >= 0), 
    s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}") 
    require(weights.sum > 0, 
    s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}") 

    withScope { 
    val sum = weights.sum 
    val normalizedCumWeights = weights.map(_/sum).scanLeft(0.0d)(_ + _) 
    normalizedCumWeights.sliding(2).map { x => 
    randomSampleWithRange(x(0), x(1), seed) 
    }.toArray 
} 

HINWEIS: Gewichte Gewichte für Splits, werden normalisiert werden, wenn sie summieren nicht 1

Basierend auf dem obigen Verhalten habe ich ein Beispiel-Snippet wie unten erstellt, das funktionierte:

def getDoubleWeights(numparts:Int) : Array[Double] = { 
    Array.fill[Double](numparts)(1.0d) 
} 

Anrufer wie ....

val rddWithNumParts : Array[RDD] = yourRDD.randomSplit(getDoubleWeights(yourRDD.partitions.length)) 

Dies wird gleichmäßig aufzuteilen in der Anzahl der RDD

HINWEIS wäre: Das Gleiche gilt für unter DataFrame.randomSplit auch

anwendbar ist
  • Sie können dies auch in Dataframe konvertieren, indem Sie Schema geben RDD und verwenden wie unter Beispiel ..
    sqlContext.createDataFrame(rddOfRow, Schema)

später können Sie diese Methode aufrufen.

Datenrahmen [] randomSplit (double [] Gewichte) zufälliges spaltet dieses Datenrahmen mit den vorgesehenen Gewichte.

  • anderer Gedanke, den ich basierend auf der Anzahl der Partitionen hatte spaltet ...

d.h RDD.mapPartitionWithIndex(....)

für jede Partition Sie haben ein Iterator (kann in bis RDD umgewandelt werden). Sie können etwas wie Anzahl der Partitionen haben = Anzahl der RDDs

+0

aber es teilen immer Daten in zwei richtig? – AkhilaV

+0

Summe der Gewichte ist größer als 1 dann wird es gleichmäßig teilen sie als Teil der Normalisierung –

+0

Verwenden Sie "rdd.partitions.length" als Gewichte, wenn Summe mehr als 1 dann wird es automatisch normalisiert und in gleich viele Gewichte zu teilen. Ich habe das Gleiche getan.Es sollte funktionieren –