Ich stehe vor einem Problem, das ich seit Jahren nicht mehr überwinden kann.Spark DataFrame, der das Schema nicht respektiert und alles als String betrachtet
1) Ich bin auf Spark 1.4 und Scala 2.10. Ich kann nicht in diesem Moment (große verteilte Infrastruktur) upgraden
2) Ich habe eine Datei mit ein paar hundert Spalten, von denen nur 2 String und Rest alle Long sind. Ich möchte diese Daten in einen Label/Features-Datenrahmen konvertieren.
3) Ich konnte es in das LibSVM-Format bekommen.
4) Ich kann es einfach nicht in ein Label/Features Format bekommen.
Der Grund dafür ist
a) ich in der Lage bin nicht die toDF() zu verwenden, wie hier https://spark.apache.org/docs/1.5.1/ml-ensembles.html
Daten val gezeigt = MLUtils.loadLibSVMFile (sc, „data/mllib/sample_libsvm_data.txt .toDF“)()
wie sie es unterstützt nicht in 1.4
b) Also habe ich konvertiert zuerst die txtfile in einen Datenrahmen, wo ich so etwas wie dieses
verwendetdef getColumnDType(columnName:String):StructField = {
if((columnName== "strcol1") || (columnName== "strcol2"))
return StructField(columnName, StringType, false)
else
return StructField(columnName, LongType, false)
}
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
val sfRDD = sc.textFile(staticfeatures_filepath)//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// reads a space delimited string from application.properties file
val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val data = sfRDD
.map(line => line.split(","))
.map(p => Row.fromSeq(p.toSeq))
var df = sqlContext.createDataFrame(data, schema)
//schemaString.split(" ").drop(4)
//.map(s => df = convertColumn(df, s, "int"))
return df
}
Wenn ich ein df.na.drop() df.printSchema()
mit diesem Datenrahmen zurück ich perfekt Schema Gefallen Sie diese
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
Aber - der traurige Teil ist, was ich versuche, mit dem df zu tun (siehe unten), habe ich immer bin immer ein Classcast (java.lang.String kann nicht auf java.lang.Long gegossen werden)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
so - das ist schlecht - auch wenn das Schema sagt Lange - die df noch intern alles als String berücksichtigen.
Könnt ihr bitte mir helfen, darüber hinwegzukommen?
Grüße
EDIT 1
ein einfaches Beispiel Hinzufügen
Sagen, ich habe eine Datei wie diese
1, A, 20, P, -99,1,0,0 8,1,1,1,1,131153,
1, B, 23, P, -99,0,1,0,7,1,1,0,1,65543
1, C, 24, P -99,0,1,0,9,1,1,1,1,262149
1, D, 7, P, -99,0,0,0,8,1,1,1, 1,458759
Und
sf-schema = f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11 (Spaltennamen wirklich nicht Sache so können Sie diese Details außer Acht lassen)
Alle Ich versuche zu erstellen ist ein Label/Features Art von Datenrahmen, wo meine 3. Spalte wird zu einem Label und die 5. bis 11. Spalten werden zu einem Feature [Vector] -Spalte. So dass ich den Rest der Schritte in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles folgen kann.
Ich habe die Spalten auch wie vorgeschlagen abgegebenen zero323
val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
Aber die assert und VectorAssembler immer noch nicht. featureColumns = Array ("F2", "F3", ..... "f11") Dies ist ganze Folge ich tun möchte, nachdem ich meine df
var transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
Die Ausnahme Spur von ScalaIDE haben - nur wenn es die VectorAssembler Hits wie unten
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Hmm. Die Behauptung funktioniert auch nicht. Angenommen, ich habe eine einfache Datei wie diese: 1, A, 20, P, -99,1,0,0,8,1,1,1,1,131153 \ n 1, B, 23, P, -99, 0,1,0,7,1,1,0,1,65543 \ n 1, C, 24, P, -99,0,1,0,9,1,1,1,1,262149 \ n 1, D, 7, P, -99,0,0,0,8,1,1,1,1,458759 \ n. (4 Zeilen angegeben als zB) Alles, was ich versuche zu tun, ist die dritte Spalte als Label und die fünfte bis elfte Spalte als Features in einem Label/Features Art von Datenrahmen, so dass ich die in https: // spark genannten Schritte folgen kann. apache.org/docs/latest/ml-classification-regression.html#tree-ensembles und es scheint so schwer !!! . Gibt es einen Ausweg? Getestet das obige Beispiel - der gleiche Fehler. – Run2
zero323 Ich habe Informationen in der Frage hinzugefügt. Der Fehler ist immer noch gleich, auch wenn Sie die Schritte 1 und 2, die Sie erwähnt haben, verwendet haben. Hoffe bald von dir zu hören. Wird nach einiger Zeit zurückkommen. – Run2
Ich habe das gerade live auf 1.4.0 und Ihren Daten getestet und funktioniert ganz gut. Bitte überprüfen Sie Ihren Code/Einstellungen. – zero323