2017-05-27 3 views
1

Ist es möglich, eine RDD in Python zu senden?Wie RDD in PySpark zu senden?

Ich folge dem Buch "Advanced Analytics mit Spark: Muster zum Lernen von Daten im Maßstab" und in Kapitel 3 muss eine RDD gesendet werden. Ich versuche, den Beispielen zu folgen, indem ich Python anstelle von Scala benutze.

Wie dem auch sei, auch mit diesem einfachen Beispiel habe ich einen Fehler:

my_list = ["a", "d", "c", "b"] 
my_list_rdd = sc.parallelize(my_list) 
sc.broadcast(my_list_rdd) 

Der Fehler Wesen:

"It appears that you are attempting to broadcast an RDD or reference an RDD from an " 
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an 
action or transformation. RDD transformations and actions can only be invoked by the driver, n 
ot inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) i 
s invalid because the values transformation and count action cannot be performed inside of the 
rdd1.map transformation. For more information, see SPARK-5063. 

Ich verstehe nicht wirklich, was „-Aktion oder Transformation“ der Fehler bezieht zu.

Ich verwende spark-2.1.1-hadoop2.7.

Wichtig Bearbeiten: das Buch ist korrekt. Ich konnte gerade nicht lesen, dass es keine RDD war, die ausgestrahlt wurde, sondern eine Kartenversion, die mit collectAsMap() erstellt wurde.

Dank!

Antwort

2

Is it possible to broadcast an RDD in Python?

TL; DR Nr

Wenn man bedenkt, was RDD wirklich ist, Sie werden es einfach nicht möglich, finden. Es gibt nichts in einer RDD, das Sie übertragen könnten. Es ist auch zerbrechlich (sozusagen).

RDD ist eine Datenstruktur, beschreibt eine verteilte Berechnung auf einigen Datensätzen. Durch die Funktionen von RDD können Sie beschreiben, was und wie zu berechnen ist. Es ist eine abstrakte Einheit.

Zitiert die scaladoc von RDD:

Represents an immutable, partitioned collection of elements that can be operated on in parallel

Internally, each RDD is characterized by five main properties:

  • A list of partitions

  • A function for computing each split

  • A list of dependencies on other RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

Es gibt nicht viel Sie übertragen, wie (unter Angabe SparkContext.broadcast Methode des scaladoc):

broadcast[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T] Broadcast a read-only variable to the cluster, returning a org.apache.spark.broadcast.Broadcast object for reading it in distributed functions. The variable will be sent to each cluster only once.

Sie können nur einen realen Wert übertragen, sondern ein RDD nur ein Container von Werten, die nur verfügbar sind, wenn Executoren seine Daten verarbeiten.

Von Broadcast Variables:

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner.

Und später im selben Dokument:

This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Sie könnten jedoch collect der Datensatz ein RDD hält und es ausgestrahlt wie folgt:

my_list = ["a", "d", "c", "b"] 
my_list_rdd = sc.parallelize(my_list) 
sc.broadcast(my_list_rdd.collect) // <-- collect the dataset 

Bei " Sammeln Sie das Dataset "Schritt, verlässt das Dataset einen RDD-Raum und wird zu einer lokal verfügbaren Sammlung, einem Python-Wert, der dann gesendet werden kann.

+1

Schöne Antwort, aber es ist immer wichtig, die Warnung über die Verwendung von sammeln mit großen RDDs, speziell für neue Benutzer. – eliasah

1

Sie können keine RDD senden. Sie senden Werte an alle Executor-Knoten, die während der Verarbeitung Ihrer RDD mehrfach verwendet werden. In Ihrem Code sollten Sie Ihre RDD vor dem Senden sammeln. Der collect konvertiert einen in ein lokales Python-Objekt, das ohne Probleme übertragen werden kann.

Wenn Sie einen Wert senden, wird der Wert serialisiert und über das Netzwerk an alle Executor-Knoten gesendet. Ihre my_list_rdd ist nur eine Referenz auf eine RDD, die über mehrere Knoten verteilt ist. Das Serialisieren dieses Verweises und das Übertragen dieses Verweises an alle Arbeiterknoten würde nichts in dem Arbeiterknoten bedeuten. Sie sollten also die Werte Ihrer RDD sammeln und stattdessen den Wert übertragen.

Weitere Informationen über Spark-Live Übertragung kann here

Hinweis zu finden: Wenn Ihr RDD zu groß ist, kann die Anwendung in einen OutOfMemory Fehler ausgeführt. Die collect Methode zieht alle Daten des Treiberspeichers, der normalerweise nicht groß genug ist.

+1

@eliasah getan. hinzugefügt die Warnung –