2017-07-07 5 views
0

Ich versuche eine Datei in Spark zu laden. Wenn ich eine normale Textdatei in Funken laden wie unten:Wie fügt man einem Datensatz in Spark ein Schema hinzu?

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

Das Ergebnis ist:

partFile: org.apache.spark.sql.Dataset[String] = [value: string] 

ich einen Datensatz in der Ausgabe sehen. Aber wenn ich eine Json-Datei laden:

val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson") 

Das Ergebnis ist ein Datenrahmen mit einem vorgefertigten Schema:

pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field] 

Die Json/Parkett/ORC-Dateien-Schema. So kann ich verstehen, dass dies ein Feature der Spark-Version ist: 2x, was die Sache einfacher machte, da wir in diesem Fall direkt einen DataFrame bekommen und für eine normale Textdatei eine Datenmenge, wo es kein sinnvolles Schema gibt. Was ich gerne wissen würde ist, wie kann ich ein Schema zu einem Dataset hinzufügen, die eine Folge von Laden einer Textdatei in Spark ist. Für eine RDD gibt es die Option case class/StructType, um das Schema hinzuzufügen und in einen DataFrame zu konvertieren. Kann mir jemand sagen, wie kann ich es tun?

Antwort

4

Wenn Sie textFile, jede Zeile der Datei wird eine String-Zeile in Ihrem Dataset sein. Zur Umrechnung in Datenrahmen mit einem Schema, können Sie toDF verwenden:

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

import sqlContext.implicits._ 
val df = partFile.toDF("string_column") 

In diesem Fall wird der Datenrahmen ein Schema einer einzigen Spalte vom Typ String.

Wenn Ihre Datei ein komplexeres Schema enthält, können Sie entweder die CSV-Leser (wenn die Datei in einem strukturierten CSV-Format ist):

val partFile = spark.read.option("header", "true").option("delimiter", ";").csv("hdfs://quickstart:8020/user/cloudera/partfile") 

Oder Sie können Ihre Dataset mit Karte verarbeiten, dann mit toDF in DataFrame konvertieren.Zum Beispiel: Angenommen, Sie eine Spalte das erste Zeichen der Zeile sein sollen (als Int) und die anderen Spalt das vierte Zeichen (auch als Int) zu sein:

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[(Int, Int)] = partFile.map { 
    line: String => (line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF("value0", "value3") 

Sie können aber auch eine Definition Fall-Klasse, die das endgültige Schema für Ihren Datenrahmen darstellen:

case class MyRow(value0: Int, value3: Int) 

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile") 

val processedDataset: Dataset[MyRow] = partFile.map { 
    line: String => MyRow(line(0).toInt, line(3).toInt) 
} 

import sqlContext.implicits._ 
val df = processedDataset.toDF 

In beiden oben genannten Fällen df.printSchema Aufruf zeigen würde:

root 
|-- value0: integer (nullable = true) 
|-- value3: integer (nullable = true) 
+0

Basierend auf Ihrer Antwort, ich hatte es ein wenig zwicken. Teilen des Datasets basierend auf einem Delimiter: val partdata = partFile.map (p => p.split (",")) Ich musste auch diese Anweisung ändern: val prdt = partdata.map {line => rows (line (0) .toInt, Zeile (1) .toString, Zeile (2) .toInt, Zeile (3) .toString, Zeile (4) .toString)} weil die nicht-numerischen Daten im 'char' Format sind und ich hatte um sie in 'String' umzuwandeln. Es funktioniert jetzt. – Sidhartha

+1

@Sidhartha, Gut zu wissen, dass es funktioniert hat. Wenn es sich um eine kommagetrennte Datei handelt, könnte man meinen ersten Vorschlag, 'spark.read.csv' zu verwenden, vielleicht einfacher finden. –

0

case class eine dataset/dataframe zu erstellen ist sehr einfach

können sagen, dass Sie eine Textdatei haben Daten enthalten name, age als

x1,32 
x2,32 
x3,32 

Sie haben case class außerhalb der Haupt-Ausführung Klasse zu definieren, wie

case class Info(name: String, 
       age: Int) 

Dann r die Datei eading sparkContext.textFile und oben Fall-Klasse sollten wir eine wie unten

val data = sc.textFile("path to text file") 

    import sqlContext.implicits._ 
    data.map(line => line.split(",")).map(array => Info(array(0), array(1).toInt)).toDF.show(false) 

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 

schema Verwendung ist dataframe haben, wo Sie brauchen rdd[Row] und schema zu erstellen und sqlContext als

val data = sc.textFile("path to text file") 
    .map(line=> line.split(",")).map(array => Row(array(0), array(1).toInt)) 

val schema = StructType(
    Array(
    StructField("name", StringType, true), 
    StructField("age", IntegerType, true) 
) 
) 
sqlContext.createDataFrame(data, schema).show(false) 

Ausgabe ist die gleiche verwenden, wie oben

+----+---+ 
|name|age| 
+----+---+ 
|x1 |32 | 
|x2 |32 | 
|x3 |32 | 
+----+---+ 
Verwandte Themen