2015-03-12 8 views
38

Ich brauche dieWie konvertiert man ein DataFrame wieder in normale RDD in Pyspark?

(rdd.)partitionBy(npartitions, custom_partitioner) 

Methode zu verwenden, die auf dem Datenrahmen nicht zur Verfügung steht. Alle DataFrame-Methoden beziehen sich nur auf DataFrame-Ergebnisse. Wie kann man dann eine RDD aus den Daten von DataFrame erstellen?

Hinweis: Dies ist eine Änderung (in 1.3.0) von 1.2.0.

Update aus der Antwort von @dpangmao: Die Methode ist .rdd. Ich war interessiert zu verstehen, ob (a) es öffentlich war und (b) was die Auswirkungen auf die Leistung sind.

Well (a) ist ja und (b) - gut können Sie hier sehen, dass es erhebliche perf Auswirkungen: eine neue RDD muss durch Aufrufen mapPartitions erstellt werden:

In dataframe.py (beachten Sie den Dateinamen als auch (war sql.py) geändert:

@property 
def rdd(self): 
    """ 
    Return the content of the :class:`DataFrame` as an :class:`RDD` 
    of :class:`Row` s. 
    """ 
    if not hasattr(self, '_lazy_rdd'): 
     jrdd = self._jdf.javaToPython() 
     rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) 
     schema = self.schema 

     def applySchema(it): 
      cls = _create_cls(schema) 
      return itertools.imap(cls, it) 

     self._lazy_rdd = rdd.mapPartitions(applySchema) 

    return self._lazy_rdd 

Antwort

74

Verwenden Sie die Methode .rdd wie folgt aus:

rdd = df.rdd 
+1

ja Sie richtig sind. Ich habe das OP aktualisiert, nachdem ich tiefer hineingegangen bin. – javadba

+14

ja, aber es konvertieren in org.apache.spark.rdd.RDD [org.apache.spark.sql.Row], aber nicht org.apache.spark.rdd.RDD [string] –

38

@ dapangmao Antwort funktioniert, aber es gibt nicht die regelmäßige Funken RDD, es gibt ein Row-Objekt zurück. Wenn Sie das reguläre RDD Format haben wollen.

Versuchen Sie folgendes:

rdd = df.rdd.map(tuple) 

oder

rdd = df.rdd.map(list) 
+1

Dies sollte das Standardverhalten imo beim Aufruf sein 'df.rdd' –

Verwandte Themen