Ich stehe vor einem seltsamen Problem, während ich versuche, die Felder von einem RDD[Array[String]]
auf die richtigen Werte in einem Schema für die Konvertierung in ein Spark SQL DataFrame
Dinamic konvertieren.Wie konvertiert man Array [String] in ein korrektes Schema?
Ich habe eine RDD[Array[String]]
und eine StructType
namens schema
, die die Typen für die verschiedenen Felder angibt. Was ich bisher getan ist:
sqlContext.createDataFrame(
inputLines.map(rowValues =>
RowFactory.create(rowValues.zip(schema.toSeq)
.map{ case (value, struct) =>
struct.dataType match {
case BinaryType => value.toCharArray().map(ch => ch.toByte)
case ByteType => value.toByte
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case FloatType => value.toFloat
case ShortType => value.toShort
case DateType => value
case IntegerType => value.toInt
case LongType => value.toLong
case _ => value
}
})), schema)
aber ich erhalte diese Ausnahme:
java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON
wenn die toJSON
Methode aufrufen ...
Haben Sie eine Vorstellung über den Grund haben Warum passiert das und was könnte ich tun, um es zu reparieren?
Als gefragt, hier haben wir ein Beispiel:
val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType)))
val inputLines=sc.parallelize(
Array("1","This is a line for testing"),
Array("2","The second line"))
eine Probe-Eingang ('schema',' inputLines') wäre hilfreich. –
Ich bekomme eine Ausnahme mit 'val inputLines = sc.parallelize (Array (" 1 "," Dies ist eine Zeile zum Testen "), Array (" 2 "," Die zweite Zeile "))' - Ich denke, es sollte be: 'val inputLines = sc.parallelize (Array ((" 1 "," Dies ist eine Zeile zum Testen "), (" 2 "," Die zweite Zeile "))) –