2016-10-19 4 views
1

Meine Daten auf HDFS sind im Sequenzdateiformat. Ich bin mit PySpark (Funke 1.6) und versuchen, 2 Dinge zu erreichen:Erhalte den HDFS-Dateipfad in PySpark für Dateien im Sequenzdateiformat

  1. Datenpfad einen Zeitstempel enthält in yyyy/mm/dd/hh Format, das Ich mag würde sich in den Daten bringen. Ich habe SparkContext.wholeTextFiles ausprobiert, aber ich denke, dass das Sequence-Dateiformat nicht unterstützt wird.

  2. Wie gehe ich mit dem obigen Punkt um, wenn ich Daten für einen Tag verarbeiten will und das Datum in die Daten einbringen will? In diesem Fall würde ich Daten wie JJJJ/MM/TT/* Format laden.

Alle Zeiger zu schätzen wissen.

Antwort

1

Wenn gespeicherte Typen mit SQL-Typen kompatibel sind und Sie Spark 2.0 verwenden, ist das sehr einfach. Import input_file_name:

from pyspark.sql.functions import input_file_name 

lesen Datei und konvertieren zu einem DataFrame:

df = sc.sequenceFile("/tmp/foo/").toDF() 

Dateinamen hinzufügen:

df.withColumn("input", input_file_name()) 

Wenn diese Lösung nicht anwendbar in Ihrem Fall ist dann universell ist zu listet Dateien direkt auf (für HDFS können Sie die Bibliothek hdfs3 verwenden):

files = ... 

lesen nacheinander Zugabe Dateiname:

def read(f): 
    """Just to avoid problems with late binding""" 
    return sc.sequenceFile(f).map(lambda x: (f, x)) 

rdds = [read(f) for f in files] 

und Vereinigung:

sc.union(rdds) 
+0

Dank user6910411. Ich bin auf Spark 1.6 und benutze pyspark. – Arnkrishn

Verwandte Themen