2016-03-13 13 views
7

Ich habe versucht, PCA auf meine Daten anzuwenden und dann RandomForest auf die transformierten Daten anzuwenden. PCA.transform (data) hat mir jedoch einen DataFrame gegeben, aber ich brauche einen mllib LabeledPoints um mein RandomForest zu füttern. Wie kann ich das machen? Mein Code:Wie konvertiert man Funke DataFrame zu RDD mllib LabeledPoints?

import org.apache.spark.mllib.util.MLUtils 
    import org.apache.spark.{SparkConf, SparkContext} 
    import org.apache.spark.mllib.tree.RandomForest 
    import org.apache.spark.mllib.tree.model.RandomForestModel 
    import org.apache.spark.ml.feature.PCA 
    import org.apache.spark.mllib.regression.LabeledPoint 
    import org.apache.spark.mllib.linalg.Vectors 


    val dataset = MLUtils.loadLibSVMFile(sc, "data/mnist/mnist.bz2") 

    val splits = dataset.randomSplit(Array(0.7, 0.3)) 

    val (trainingData, testData) = (splits(0), splits(1)) 

    val trainingDf = trainingData.toDF() 

    val pca = new PCA() 
    .setInputCol("features") 
    .setOutputCol("pcaFeatures") 
    .setK(100) 
    .fit(trainingDf) 

    val pcaTrainingData = pca.transform(trainingDf) 

    val numClasses = 10 
    val categoricalFeaturesInfo = Map[Int, Int]() 
    val numTrees = 10 // Use more in practice. 
    val featureSubsetStrategy = "auto" // Let the algorithm choose. 
    val impurity = "gini" 
    val maxDepth = 20 
    val maxBins = 32 

    val model = RandomForest.trainClassifier(pcaTrainingData, numClasses, categoricalFeaturesInfo, 
     numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) 


    error: type mismatch; 
    found : org.apache.spark.sql.DataFrame 
    required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] 

habe ich versucht, die beiden folgenden möglichen Lösungen, aber sie hat nicht funktioniert:

scala> val pcaTrainingData = trainingData.map(p => p.copy(features = pca.transform(p.features))) 
<console>:39: error: overloaded method value transform with alternatives: 
    (dataset: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame <and> 
    (dataset: org.apache.spark.sql.DataFrame,paramMap: org.apache.spark.ml.param.ParamMap)org.apache.spark.sql.DataFrame <and> 
    (dataset: org.apache.spark.sql.DataFrame,firstParamPair: org.apache.spark.ml.param.ParamPair[_],otherParamPairs: org.apache.spark.ml.param.ParamPair[_]*)org.apache.spark.sql.DataFrame 
    cannot be applied to (org.apache.spark.mllib.linalg.Vector) 

Und:

 val labeled = pca 
    .transform(trainingDf) 
    .map(row => LabeledPoint(row.getDouble(0), row(4).asInstanceOf[Vector[Int]])) 

    error: type mismatch; 
    found : scala.collection.immutable.Vector[Int] 
    required: org.apache.spark.mllib.linalg.Vector 

(Ich habe importiert org.apache.spark .mllib.linalg.Vectors im obigen Fall)

Irgendwelche Hilfe?

+1

Ihr Code funktioniert nur für mich in Ordnung (wie sie ist, ohne dass die beiden Lösungsansätze). Ich nehme an, vielleicht haben Sie einen der Importe falsch verstanden? Ich verwende 'import org.apache.spark.ml.feature.PCA',' import org.apache.spark.mllib.util.MLUtils'. Ich lief es mit dieser Datei: https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass/mnist.bz2 –

+0

@TzachZohar Oh ich habe die gleichen Importe wie Ihre und ich bearbeitet meine Frage von Hinzufügen von ihnen. Ich habe auch die gleiche Datei verwendet. War es, weil ich eher in Shell als in Funke lief, also funktionierte es nicht? –

+2

Warum alle downvotes? Scheint eine vernünftige Frage. – javadba

Antwort

10

Der richtige Ansatz hier ist der zweite, den Sie versucht haben - Mapping jeder in eine LabeledPoint, um eine RDD[LabeledPoint] zu erhalten. Sie hat jedoch zwei Fehler:

  1. Die richtige Vector Klasse (org.apache.spark.mllib.linalg.Vector) nicht Typ Argumente (z Vector[Int]) - so, obwohl Sie das Recht, den Import hatte, der Compiler festgestellt, dass Sie scala.collection.immutable.Vector gemeint, die der Fall ist.
  2. Die Datenrahmen zurück von PCA.fit() 3 Spalten haben, und Sie haben versucht, Spaltennummer 4. Zum Beispiel zu extrahieren, zeigen erste 4 Zeilen:

    +-----+--------------------+--------------------+ 
    |label|   features|   pcaFeatures| 
    +-----+--------------------+--------------------+ 
    | 5.0|(780,[152,153,154...|[880.071111851977...| 
    | 1.0|(780,[158,159,160...|[-41.473039034112...| 
    | 2.0|(780,[155,156,157...|[931.444898405036...| 
    | 1.0|(780,[124,125,126...|[25.5114585648411...| 
    +-----+--------------------+--------------------+ 
    

    Um diesen einfacher zu machen - ich lieber mit dem Spalte Namen anstelle ihrer Indizes.

Also hier ist die Transformation Sie brauchen:

val labeled = pca.transform(trainingDf).rdd.map(row => LabeledPoint(
    row.getAs[Double]("label"), 
    row.getAs[org.apache.spark.mllib.linalg.Vector]("pcaFeatures") 
)) 
Verwandte Themen