5

Ich stehe vor einem Problem, das ich seit Jahren nicht mehr überwinden kann.Spark DataFrame, der das Schema nicht respektiert und alles als String betrachtet

1) Ich bin auf Spark 1.4 und Scala 2.10. Ich kann nicht in diesem Moment (große verteilte Infrastruktur) upgraden

2) Ich habe eine Datei mit ein paar hundert Spalten, von denen nur 2 String und Rest alle Long sind. Ich möchte diese Daten in einen Label/Features-Datenrahmen konvertieren.

3) Ich konnte es in das LibSVM-Format bekommen.

4) Ich kann es einfach nicht in ein Label/Features Format bekommen.

Der Grund dafür ist

a) ich in der Lage bin nicht die toDF() zu verwenden, wie hier https://spark.apache.org/docs/1.5.1/ml-ensembles.html

Daten val gezeigt = MLUtils.loadLibSVMFile (sc, „data/mllib/sample_libsvm_data.txt .toDF“)()

wie sie es unterstützt nicht in 1.4

b) Also habe ich konvertiert zuerst die txtfile in einen Datenrahmen, wo ich so etwas wie dieses

verwendet
def getColumnDType(columnName:String):StructField = { 

     if((columnName== "strcol1") || (columnName== "strcol2")) 
      return StructField(columnName, StringType, false) 
     else 
      return StructField(columnName, LongType, false) 
    } 
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = { 
     val sfRDD = sc.textFile(staticfeatures_filepath)// 
     val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
     // reads a space delimited string from application.properties file 
     val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("") 

     // Generate the schema based on the string of schema 
     val schema = 
      StructType(
      schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) 

     val data = sfRDD 
     .map(line => line.split(",")) 
     .map(p => Row.fromSeq(p.toSeq)) 

     var df = sqlContext.createDataFrame(data, schema) 

     //schemaString.split(" ").drop(4) 
     //.map(s => df = convertColumn(df, s, "int")) 

     return df 
    } 

Wenn ich ein df.na.drop() df.printSchema() mit diesem Datenrahmen zurück ich perfekt Schema Gefallen Sie diese

root 
|-- rand_entry: long (nullable = false) 
|-- strcol1: string (nullable = false) 
|-- label: long (nullable = false) 
|-- strcol2: string (nullable = false) 
|-- f1: long (nullable = false) 
|-- f2: long (nullable = false) 
|-- f3: long (nullable = false) 
and so on till around f300 

Aber - der traurige Teil ist, was ich versuche, mit dem df zu tun (siehe unten), habe ich immer bin immer ein Classcast (java.lang.String kann nicht auf java.lang.Long gegossen werden)

val featureColumns = Array("f1","f2",....."f300") 
assertEquals(-99,df.select("f1").head().getLong(0)) 
assertEquals(-99,df.first().get(4)) 
val transformeddf = new VectorAssembler() 
     .setInputCols(featureColumns) 
     .setOutputCol("features") 
     .transform(df) 

so - das ist schlecht - auch wenn das Schema sagt Lange - die df noch intern alles als String berücksichtigen.

Könnt ihr bitte mir helfen, darüber hinwegzukommen?

Grüße

EDIT 1

ein einfaches Beispiel Hinzufügen

Sagen, ich habe eine Datei wie diese

1, A, 20, P, -99,1,0,0 8,1,1,1,1,131153,

1, B, 23, P, -99,0,1,0,7,1,1,0,1,65543

1, C, 24, P -99,0,1,0,9,1,1,1,1,262149

1, D, 7, P, -99,0,0,0,8,1,1,1, 1,458759

Und

sf-schema = f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11 (Spaltennamen wirklich nicht Sache so können Sie diese Details außer Acht lassen)

Alle Ich versuche zu erstellen ist ein Label/Features Art von Datenrahmen, wo meine 3. Spalte wird zu einem Label und die 5. bis 11. Spalten werden zu einem Feature [Vector] -Spalte. So dass ich den Rest der Schritte in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles folgen kann.

Ich habe die Spalten auch wie vorgeschlagen abgegebenen zero323

val types = Map("strCol1" -> "string", "strCol2" -> "string") 
     .withDefault(_ => "bigint") 
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*) 
df = df.drop("f0") 
df = df.drop("strCol1") 
df = df.drop("strCol2") 

Aber die assert und VectorAssembler immer noch nicht. featureColumns = Array ("F2", "F3", ..... "f11") Dies ist ganze Folge ich tun möchte, nachdem ich meine df

var transformeddf = new VectorAssembler() 
    .setInputCols(featureColumns) 
    .setOutputCol("features") 
    .transform(df) 

    transformeddf.show(2) 

    transformeddf = new StringIndexer() 
    .setInputCol("f1") 
    .setOutputCol("indexedF1") 
    .fit(transformeddf) 
    .transform(transformeddf) 

    transformeddf.show(2) 

    transformeddf = new VectorIndexer() 
    .setInputCol("features") 
    .setOutputCol("indexedFeatures") 
    .setMaxCategories(5) 
    .fit(transformeddf) 
    .transform(transformeddf) 

Die Ausnahme Spur von ScalaIDE haben - nur wenn es die VectorAssembler Hits wie unten

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long 
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110) 
    at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117) 
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) 
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364) 
    at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436) 
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75) 
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70) 
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960) 
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68) 
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) 
    at org.apache.spark.scheduler.Task.run(Task.scala:70) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

Antwort

5

Sie erhalten ClassCastException weil dies genau das, was geschehen soll. Schema-Argument wird nicht für automatisches Casting verwendet (einige DataSources können Schema auf diese Weise verwenden, aber keine Methoden wie createDataFrame). Es erklärt nur, welche Arten von Werten in den Zeilen gespeichert sind. Es liegt in Ihrer Verantwortung, Daten zu übergeben, die mit dem Schema übereinstimmen, und nicht umgekehrt.

Während DataFrame zeigt Schema Sie haben deklariert, es ist nur zur Laufzeit validiert, daher die Laufzeit Ausnahme.Wenn Sie Daten in bestimmte umwandeln möchten, haben Sie cast Daten explizit.

  1. zuerst alle Spalten als StringType lesen:

    val rows = sc.textFile(staticfeatures_filepath) 
        .map(line => Row.fromSeq(line.split(","))) 
    
    val schema = StructType(
        schemaString.split(" ").map(
        columnName => StructField(columnName, StringType, false) 
    ) 
    ) 
    
    val df = sqlContext.createDataFrame(rows, schema) 
    
  2. Next gegossen ausgewählten Spalten gewünschten Typ:

    import org.apache.spark.sql.types.{LongType, StringType} 
    
    val types = Map("strcol1" -> StringType, "strcol2" -> StringType) 
        .withDefault(_ => LongType) 
    
    val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*) 
    
  3. Verwenden Assembler:

    val transformeddf = new VectorAssembler() 
        .setInputCols(featureColumns) 
        .setOutputCol("features") 
        .transform(casted) 
    

können Sie die Schritte 1 und 2 einfach mit spark-csv:

// As originally 
val schema = StructType(
    schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName))) 


val df = sqlContext 
    .read.schema(schema) 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .load(staticfeatures_filepath) 

Siehe auch Correctly reading the types from file in PySpark

+0

Hmm. Die Behauptung funktioniert auch nicht. Angenommen, ich habe eine einfache Datei wie diese: 1, A, 20, P, -99,1,0,0,8,1,1,1,1,131153 \ n 1, B, 23, P, -99, 0,1,0,7,1,1,0,1,65543 \ n 1, C, 24, P, -99,0,1,0,9,1,1,1,1,262149 \ n 1, D, 7, P, -99,0,0,0,8,1,1,1,1,458759 \ n. (4 Zeilen angegeben als zB) Alles, was ich versuche zu tun, ist die dritte Spalte als Label und die fünfte bis elfte Spalte als Features in einem Label/Features Art von Datenrahmen, so dass ich die in https: // spark genannten Schritte folgen kann. apache.org/docs/latest/ml-classification-regression.html#tree-ensembles und es scheint so schwer !!! . Gibt es einen Ausweg? Getestet das obige Beispiel - der gleiche Fehler. – Run2

+0

zero323 Ich habe Informationen in der Frage hinzugefügt. Der Fehler ist immer noch gleich, auch wenn Sie die Schritte 1 und 2, die Sie erwähnt haben, verwendet haben. Hoffe bald von dir zu hören. Wird nach einiger Zeit zurückkommen. – Run2

+0

Ich habe das gerade live auf 1.4.0 und Ihren Daten getestet und funktioniert ganz gut. Bitte überprüfen Sie Ihren Code/Einstellungen. – zero323

Verwandte Themen