0

Ich bin sehr neu zu Spark Machine Learning Ich möchte mehrere Spalten zu Funktionen übergeben, in meinem unteren Code Ich bin nur übergeben die Spalte Datum zu Funktionen, aber jetzt möchte ich UserID übergeben und Datumsspalten für Features Ich habe versucht, Vektor zu verwenden, aber es nur Double-Datentyp unterstützen, aber in meinem Fall habe ich Int und StringWie man mehrere Spalten zu SetInputCol()

ich dankbar wäre, wenn jemand eine Anregung/Lösung oder ein Codebeispiel bereitzustellen, die meine Anforderung

erfüllen Code:

case class LabeledDocument(Userid: Double, Date: String, label: Double) 
val training = spark.read.option("inferSchema", true).csv("/root/Predictiondata3.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument] 
import scala.beans.BeanInfo 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.sql.{Row, SQLContext} 
val tokenizer = new Tokenizer().setInputCol("Date").setOutputCol("words") 
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") 
import org.apache.spark.ml.regression.LinearRegression 
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001) 
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) 
val model = pipeline.fit(training.toDF()) 
case class Document(Userid: Integer, Date: String) 
val test = sc.parallelize(Seq(Document(4, "04-Jan-18"),Document(5, "01-Jan-17"),Document(2, "03-Jan-17"))) 
model.transform(test.toDF()).show() 

Eingabedaten mit Spalten

Userid,Date,SwipeIntime 
1, 1-Jan-2017,9.30 
1, 2-Jan-2017,9.35 
1, 3-Jan-2017,9.45 
1, 4-Jan-2017,9.26 
2, 1-Jan-2017,9.37 
2, 2-Jan-2017,9.35 
2, 3-Jan-2017,9.45 
2, 4-Jan-2017,9.46 
+0

Sie müssen vectorAssembler verwenden. Es erwartet Daten in numerischer, Vektor-, Boolescher Art. Sie können StringIndexer verwenden, um Strings in Indizes zu konvertieren. – hadooper

+0

danke, ich werde es ausprobieren und sehen – Bhavesh

+0

@hadooper können Sie einige Beispiel teilen Ich habe versucht, den untenstehenden Code val assembler1 = new VectorAssembler(). SetInputCols (Array ("Userid", "Date")). SetOutputCol ("vec1") val assembled1 = Assembler1.Transform (Training) java.lang.IllegalArgumentException: Der Datentyp StringType wird nicht unterstützt. – Bhavesh

Antwort

-1

Ich habe die Lösung, die ich konnte.

import scala.beans.BeanInfo 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.LogisticRegression 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.sql.{Row, SQLContext} 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.ml.attribute.NominalAttribute 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType,StructField,StringType} 
case class LabeledDocument(Userid: Double, Date: String, label: Double) 
val trainingData = spark.read.option("inferSchema", true).csv("/root/Predictiondata10.csv").toDF("Userid","Date","label").toDF().as[LabeledDocument] 
import org.apache.spark.ml.feature.StringIndexer 
import org.apache.spark.ml.feature.VectorAssembler 
val DateIndexer = new StringIndexer().setInputCol("Date").setOutputCol("DateCat") 
val indexed = DateIndexer.fit(trainingData).transform(trainingData) 
val assembler = new VectorAssembler().setInputCols(Array("DateCat", "Userid")).setOutputCol("rawfeatures") 
val output = assembler.transform(indexed) 
val rows = output.select("Userid","Date","label","DateCat","rawfeatures").collect() 
val asTuple=rows.map(a=>(a.getInt(0),a.getString(1),a.getDouble(2),a.getDouble(3),a(4).toString())) 
val r2 = sc.parallelize(asTuple).toDF("Userid","Date","label","DateCat","rawfeatures") 
val Array(training, testData) = r2.randomSplit(Array(0.7, 0.3)) 
import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
val tokenizer = new Tokenizer().setInputCol("rawfeatures").setOutputCol("words") 
val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") 
import org.apache.spark.ml.regression.LinearRegression 
val lr = new LinearRegression().setMaxIter(100).setRegParam(0.001).setElasticNetParam(0.0001) 
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) 
val model = pipeline.fit(training.toDF()) 
model.transform(testData.toDF()).show() 
Verwandte Themen