2017-12-21 1 views
-2

Ich habe zwei große RDDs wie unten.Erzeugen einer neuen RDD-Variable aus zwei RDDs im großen Maßstab

#First RDD 
key1 value1 labelA 
key2 value2 lableB 
..... 

#Second RDD 
stepA key1 value1 
stepB key2 value2 
... 

Und was ich extrahieren möchte, ist wie folgt.

labelA stepA key1 value1 
labelB stepB key2 value2 

Allerdings ist mein Problem, dass die zwei RDDs Größe sehr groß ist. Also, Join könnte viel Zeit in Anspruch nehmen. Ich möchte eine Join-Methode vermeiden und möchte die Shuffle-Größe so gering wie möglich halten. Auch das Sammeln einer RDD und das Erstellen von Broadcast-Variablen funktionierte wegen ihrer Größe nicht. Beachten Sie, dass die RDD-Größe mehr als 10 Gigabyte beträgt, was bedeutet, dass der Treiber sie nicht auf einmal speichern kann.

Gibt es also eine Möglichkeit, eine neue RDD aus zwei großen RDD zu generieren? Wie ich oben erwähnt habe, möchte ich Join-Methode vermeiden.

Danke.

+0

können Sie weitere Details zu den RDDs hinzufügen. Wie sehen die aktuellen Daten aus? Wie soll die neue RDD aussehen? Wie werden die Daten in der neuen RDD abgeleitet? Bitte [bearbeiten] und stellen Sie [mcve] bereit. – philantrovert

+0

Es gibt mehrere Möglichkeiten, RDDs zu kombinieren: Union, Join ... aber wir wissen nicht, was Sie tun wollen ;-) – Oli

Antwort

1

Unter der Annahme, dass Sie RDDs von 3-Tupel haben, sollte dies Ihnen geben, was Sie wollen.

val left_rdd = rdd1 
    .map{ case (key, value, label) => (key, value) -> label } 
val right_rdd = rdd2 
    .map{ case (step, key, value) => (key, value) -> step } 
left_rdd.join(right_rdd) 
    .map{ case ((key, value), (label, step)) => (label, step, key, value) } 

Sie sollten es versuchen, bevor Sie davon ausgehen, dass es zu lange dauern wird. 10GB ist nicht so groß. Shuffle sollte vermieden werden wenn möglich in Funken. Aber manchmal ist genau das, was Sie brauchen.

Verwandte Themen