2014-05-11 8 views
7

Ich möchte einige Transformationen nur auf einer Teilmenge einer RDD durchführen (um das Experimentieren mit REPL zu beschleunigen).Ausführen von Operationen nur auf einer Teilmenge einer RDD

Ist es möglich?

RDD hat take(num: Int): Array[T] Methode, glaube ich, etwas Ähnliches brauchen würde, aber Rückkehr RDD [T]

+0

ist Ihre Frage noch offen? Wenn Sie eine akzeptable Antwort haben, vergessen Sie nicht, sie als solche zu markieren. – maasg

Antwort

16

Sie können RDD.sample verwenden, um eine RDD heraus zu bekommen, nicht eine Array. Zum Beispiel ~ 1% ersatzlos zu probieren:

val data = ... 
data.count 
... 
res1: Long = 18066983 

val sample = data.sample(false, 0.01, System.currentTimeMillis().toInt) 
sample.count 
... 
res3: Long = 180190 

Der dritte Parameter ist ein Samen, und ist zum Glück optional in der nächsten Spark-Version.

0

Offenbar ist es möglich RDD Teilmenge zu erzeugen, indem man zuerst seine take-Methode und dann makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism) zu SparkContext des zurückgegebenen Array vorbei die gibt neue RDD zurück.

Dieser Ansatz scheint mir jedoch zweifelhaft. Gibt es einen schöneren Weg?

0

Ich verwende immer parallelize Funktion von SparkContext, um von Array [T] zu verteilen, aber es scheint, dass makeRDD dasselbe tut. Es ist richtig, beide.

0

s sind verteilte Sammlungen, die nur für Aktionen materialisiert sind. Es ist nicht möglich, truncate Ihre RDD auf eine feste Größe, und nach wie vor eine RDD zurück (daher RDD.take(n) gibt eine Array[T], wie collect)

ich Ihnen ähnlich RDD s unabhängig von der Eingangsgröße Größe erhalten möchten Sie können Elemente in jeder Ihrer Partitionen abschneiden. Auf diese Weise können Sie die absolute Anzahl der Elemente in der resultierenden besser steuern. Die Größe des resultierenden hängt von der Funkenparallelität ab.

Ein Beispiel aus spark-shell:

import org.apache.spark.rdd.RDD 
val numberOfPartitions = 1000 

val millionRdd: RDD[Int] = sc.parallelize(1 to 1000000, numberOfPartitions) 

val millionRddTruncated: RDD[Int] = rdd.mapPartitions(_.take(10)) 

val billionRddTruncated: RDD[Int] = sc.parallelize(1 to 1000000000, numberOfPartitions).mapPartitions(_.take(10)) 

millionRdd.count   // 1000000 
millionRddTruncated.count // 10000 = 10 item * 1000 partitions 
billionRddTruncated.count // 10000 = 10 item * 1000 partitions 
Verwandte Themen