2015-08-19 15 views
6

Ich versuche, die Reihenfolge der Elemente in einer RDD randomisieren. Mein aktueller Ansatz besteht darin, die Elemente mit einer RDD aus gemischten Zahlen zu komprimieren, um dann später mit diesen ganzen Zahlen zu verbinden.Pyspark: shuffle RDD

Allerdings fällt pyspark mit nur 100000000 ganzen Zahlen über. Ich verwende den folgenden Code.

Meine Frage ist: Gibt es eine bessere Möglichkeit, entweder mit dem Zufallsindex oder anders Shuffle?

Ich habe versucht, nach einem zufälligen Schlüssel zu sortieren, der funktioniert, aber ist langsam.

def random_indices(n): 
    """ 
    return an iterable of random indices in range(0,n) 
    """ 
    indices = range(n) 
    random.shuffle(indices) 
    return indices 

passiert Folgendes in pyspark:

Using Python version 2.7.3 (default, Jun 22 2015 19:33:41) 
SparkContext available as sc. 
>>> import clean 
>>> clean.sc = sc 
>>> clean.random_indices(100000000) 
Killed 

Antwort

5

Ein möglicher Ansatz zufällige Schlüssel hinzuzufügen, ist mit mapParitions

import os 
import numpy as np 

swap = lambda x: (x[1], x[0]) 

def add_random_key(it): 
    # make sure we get a proper random seed 
    seed = int(os.urandom(4).encode('hex'), 16) 
    # create separate generator 
    rs = np.random.RandomState(seed) 
    # Could be randint if you prefer integers 
    return ((rs.rand(), swap(x)) for x in it) 

rdd_with_keys = (rdd 
    # It will be used as final key. If you don't accept gaps 
    # use zipWithIndex but this should be cheaper 
    .zipWithUniqueId() 
    .mapPartitions(add_random_key, preservesPartitioning=True)) 

Weiter Sie partitionieren können, sortieren jeder Partition und Extraktwerte:

n = rdd.getNumPartitions() 
(rdd_with_keys 
    # partition by random key to put data on random partition 
    .partitionBy(n) 
    # Sort partition by random value to ensure random order on partition 
    .mapPartitions(sorted, preservesPartitioning=True) 
    # Extract (unique_id, value) pairs 
    .values()) 

Wenn die Sortierung pro Partition noch zu langsam ist, könnte sie durch Fisher-Yates Shuffle ersetzt werden.

Wenn Sie einfach ein zufälligen Daten benötigen, dann können Sie mllib.RandomRDDs

from pyspark.mllib.random import RandomRDDs 

RandomRDDs.uniformRDD(sc, n) 

Theoretisch verwenden Sie es mit Eingang rdd gezippt werden könnte, aber es erfordern würde, die Anzahl der Elemente pro Partition entspricht.

+0

Danke, das ist nützlich. Ich brauche eigentlich die Schlüssel, um einzigartig zu sein. – Marcin

+0

Haben Sie andere Anforderungen hier? Wenn nicht, kannst du einfach 'zipWithIndex'' zipWithUniqueId' hinterher ziehen. Es fügt eine weitere Transformation hinzu, ist aber nicht extrem teuer. – zero323

+0

Ich brauche die Schlüssel, um sowohl zufällig als auch einzigartig zu sein. Ich kann nach einem zufälligen Schlüssel sortieren, aber das ist ziemlich langsam. – Marcin

Verwandte Themen