Ich brauche dieWie konvertiert man ein DataFrame wieder in normale RDD in Pyspark?
(rdd.)partitionBy(npartitions, custom_partitioner)
Methode zu verwenden, die auf dem Datenrahmen nicht zur Verfügung steht. Alle DataFrame-Methoden beziehen sich nur auf DataFrame-Ergebnisse. Wie kann man dann eine RDD aus den Daten von DataFrame erstellen?
Hinweis: Dies ist eine Änderung (in 1.3.0) von 1.2.0.
Update aus der Antwort von @dpangmao: Die Methode ist .rdd. Ich war interessiert zu verstehen, ob (a) es öffentlich war und (b) was die Auswirkungen auf die Leistung sind.
Well (a) ist ja und (b) - gut können Sie hier sehen, dass es erhebliche perf Auswirkungen: eine neue RDD muss durch Aufrufen mapPartitions erstellt werden:
In dataframe.py (beachten Sie den Dateinamen als auch (war sql.py) geändert:
@property
def rdd(self):
"""
Return the content of the :class:`DataFrame` as an :class:`RDD`
of :class:`Row` s.
"""
if not hasattr(self, '_lazy_rdd'):
jrdd = self._jdf.javaToPython()
rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
schema = self.schema
def applySchema(it):
cls = _create_cls(schema)
return itertools.imap(cls, it)
self._lazy_rdd = rdd.mapPartitions(applySchema)
return self._lazy_rdd
ja Sie richtig sind. Ich habe das OP aktualisiert, nachdem ich tiefer hineingegangen bin. – javadba
ja, aber es konvertieren in org.apache.spark.rdd.RDD [org.apache.spark.sql.Row], aber nicht org.apache.spark.rdd.RDD [string] –