2016-03-25 11 views
1

Ich stehe vor einem seltsamen Problem, während ich versuche, die Felder von einem RDD[Array[String]] auf die richtigen Werte in einem Schema für die Konvertierung in ein Spark SQL DataFrame Dinamic konvertieren.Wie konvertiert man Array [String] in ein korrektes Schema?

Ich habe eine RDD[Array[String]] und eine StructType namens schema, die die Typen für die verschiedenen Felder angibt. Was ich bisher getan ist:

sqlContext.createDataFrame(
    inputLines.map(rowValues => 
          RowFactory.create(rowValues.zip(schema.toSeq) 
                .map{ case (value, struct) => 
                struct.dataType match { 
                case BinaryType => value.toCharArray().map(ch => ch.toByte) 
                case ByteType => value.toByte 
                case BooleanType => value.toBoolean 
                case DoubleType => value.toDouble 
                case FloatType => value.toFloat 
                case ShortType => value.toShort 
                case DateType => value 
                case IntegerType => value.toInt 
                case LongType => value.toLong 
                case _ => value 
                } 
               })), schema) 

aber ich erhalte diese Ausnahme:

java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON 

wenn die toJSON Methode aufrufen ...

Haben Sie eine Vorstellung über den Grund haben Warum passiert das und was könnte ich tun, um es zu reparieren?

Als gefragt, hier haben wir ein Beispiel:

val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType))) 
val inputLines=sc.parallelize(
     Array("1","This is a line for testing"), 
     Array("2","The second line")) 
+0

eine Probe-Eingang ('schema',' inputLines') wäre hilfreich. –

+0

Ich bekomme eine Ausnahme mit 'val inputLines = sc.parallelize (Array (" 1 "," Dies ist eine Zeile zum Testen "), Array (" 2 "," Die zweite Zeile "))' - Ich denke, es sollte be: 'val inputLines = sc.parallelize (Array ((" 1 "," Dies ist eine Zeile zum Testen "), (" 2 "," Die zweite Zeile "))) –

Antwort

3

Du Array als einzigen Parameter zu RowFactory.create vorbei.

Wenn Sie seine Methode Unterschrift sehen:

public static Row create(Object ... values) 

es erwartet eine varargs Liste.

So müssen Sie nur das Array zu varargs Liste, mit :_* Syntax konvertieren.

sqlContext.createDataFrame(inputLines.map(rowValues => 
    Row(    // RowFactory.create is java api, use Row.apply instead 
     rowValues.zip(schema.toSeq) 
       .map{ case (value, struct) => struct.dataType match { 
        case BinaryType => value.toCharArray().map(ch => ch.toByte) 
        case ByteType => value.toByte 
        case BooleanType => value.toBoolean 
        case DoubleType => value.toDouble 
        case FloatType => value.toFloat 
        case ShortType => value.toShort 
        case DateType => value 
        case IntegerType => value.toInt 
        case LongType => value.toLong 
        case _ => value 
        } 
       } : _*   // <-- make varargs here 
    )), 
    schema) 

Im Code oben, ich habe RowFactory.create mit Row.apply ersetzt und als Argument übergeben varargs.

Alternativ verwenden Sie Row.fromSeq Methode.

Refactoring ein bisschen:

def convertTypes(value: String, struct: StructField): Any = struct.dataType match { 
    case BinaryType => value.toCharArray().map(ch => ch.toByte) 
    case ByteType => value.toByte 
    case BooleanType => value.toBoolean 
    case DoubleType => value.toDouble 
    case FloatType => value.toFloat 
    case ShortType => value.toShort 
    case DateType => value 
    case IntegerType => value.toInt 
    case LongType => value.toLong 
    case _ => value 
} 

val schema = StructType(Seq(StructField("id",IntegerType), 
          StructField("val",StringType))) 

val inputLines = sc.parallelize(Array(Array("1","This is a line for testing"), 
             Array("2","The second line"))) 

val rowRdd = inputLines.map{ array => 
    Row.fromSeq(array.zip(schema.toSeq) 
        .map{ case (value, struct) => 
          convertTypes(value, struct) }) 
} 

val df = sqlContext.createDataFrame(rowRdd, schema) 

df.toJSON.collect 
// Array({"id":1,"val":"This is a line for testing"}, 
//  {"id":2,"val":"The second line"}) 
Verwandte Themen