2016-10-03 2 views
2

Ich erhalte einen Kompilierungsfehler beim Konvertieren der Pre-LDA-Umwandlung in Daten Rahmen mit SCALA in SPARK 2.0. Der spezifische Code, der einen Fehler wirft ist nach unten:Wert toDF ist kein Mitglied von org.apache.spark.rdd.RDD [(Long, org.apache.spark.ml.linalg.Vector)]

val documents = PreLDAmodel.transform(mp_listing_lda_df) 
    .select("docId","features") 
    .rdd 
    .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) } 
    .toDF() 

Der kompletten Übersetzungsfehler ist:

Error:(132, 8) value toDF is not a member of org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 
possible cause: maybe a semicolon is missing before `value toDF'? 
     .toDF() 

der komplette Code:

import java.io.FileInputStream 
import java.sql.{DriverManager, ResultSet} 
import java.util.Properties 

import org.apache.spark.SparkConf 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.clustering.LDA 
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel, RegexTokenizer, StopWordsRemover} 
import org.apache.spark.ml.linalg.{Vector => MLVector} 
import org.apache.spark.mllib.clustering.{LDA => oldLDA} 
import org.apache.spark.rdd.JdbcRDD 
import org.apache.spark.sql.types.{StringType, StructField, StructType} 
import org.apache.spark.sql.{Row, SparkSession} 

object MPClassificationLDA { 
    /*Start: Configuration variable initialization*/ 
    val props = new Properties 
    val fileStream = new FileInputStream("U:\\JIRA\\MP_Classification\\target\\classes\\mpclassification.properties") 
    props.load(fileStream) 
    val mpExtract = props.getProperty("mpExtract").toString 
    val shard6_db_server_name = props.getProperty("shard6_db_server_name").toString 
    val shard6_db_user_id = props.getProperty("shard6_db_user_id").toString 
    val shard6_db_user_pwd = props.getProperty("shard6_db_user_pwd").toString 
    val mp_output_file = props.getProperty("mp_output_file").toString 
    val spark_warehouse_path = props.getProperty("spark_warehouse_path").toString 
    val rf_model_file_path = props.getProperty("rf_model_file_path").toString 
    val windows_hadoop_home = props.getProperty("windows_hadoop_home").toString 
    val lda_vocabulary_size = props.getProperty("lda_vocabulary_size").toInt 
    val pre_lda_model_file_path = props.getProperty("pre_lda_model_file_path").toString 
    val lda_model_file_path = props.getProperty("lda_model_file_path").toString 
    fileStream.close() 
    /*End: Configuration variable initialization*/ 

    val conf = new SparkConf().set("spark.sql.warehouse.dir", spark_warehouse_path) 

    def main(arg: Array[String]): Unit = { 
    //SQL Query definition and parameter values as parameter upon executing the Object 
    val cont_id = "14211599" 
    val top = "100000" 
    val start_date = "2016-05-01" 
    val end_date = "2016-06-01" 

    val mp_spark = SparkSession 
     .builder() 
     .master("local[*]") 
     .appName("MPClassificationLoadLDA") 
     .config(conf) 
     .getOrCreate() 
    MPClassificationLDACalculation(mp_spark, cont_id, top, start_date, end_date) 
    mp_spark.stop() 
    } 

    private def MPClassificationLDACalculation 
    (mp_spark: SparkSession 
    ,cont_id: String 
    ,top: String 
    ,start_date: String 
    ,end_date: String 
): Unit = { 

    //DB connection definition 
    def createConnection() = { 
     Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance(); 
     DriverManager.getConnection("jdbc:sqlserver://" + shard6_db_server_name + ";user=" + shard6_db_user_id + ";password=" + shard6_db_user_pwd); 
    } 

    //DB Field Names definition 
    def extractvalues(r: ResultSet) = { 
     Row(r.getString(1),r.getString(2)) 
    } 

    //Prepare SQL Statement with parameter value replacement 
    val query = """SELECT docId = audt_id, text = auction_title FROM brands6.dbo.uf_ds_marketplace_classification_listing(@cont_id, @top, '@start_date', '@end_date') WHERE ? < ? OPTION(RECOMPILE);""" 
     .replaceAll("@cont_id", cont_id) 
     .replaceAll("@top", top) 
     .replaceAll("@start_date", start_date) 
     .replaceAll("@end_date", end_date) 
     .stripMargin 

    //Connect to Source DB and execute the Prepared SQL Steatement 
    val mpDataRDD = new JdbcRDD(mp_spark.sparkContext 
     ,createConnection 
     ,query 
     ,lowerBound = 0 
     ,upperBound = 10000000 
     ,numPartitions = 1 
     ,mapRow = extractvalues) 

    val schema_string = "docId,text" 
    val fields = StructType(schema_string.split(",") 
     .map(fieldname => StructField(fieldname, StringType, true))) 

    //Create Data Frame using format identified through schema_string 
    val mpDF = mp_spark.createDataFrame(mpDataRDD, fields) 
    mpDF.collect() 

    val mp_listing_tmp = mpDF.selectExpr("cast(docId as long) docId", "text") 
    mp_listing_tmp.printSchema() 
    println(mp_listing_tmp.first) 

    val mp_listing_lda_df = mp_listing_tmp.withColumn("docId", mp_listing_tmp("docId")) 
    mp_listing_lda_df.printSchema() 

    val tokenizer = new RegexTokenizer() 
     .setInputCol("text") 
     .setOutputCol("rawTokens") 
     .setMinTokenLength(2) 

    val stopWordsRemover = new StopWordsRemover() 
     .setInputCol("rawTokens") 
     .setOutputCol("tokens") 

    val vocabSize = 4000 

    val countVectorizer = new CountVectorizer() 
     .setVocabSize(vocabSize) 
     .setInputCol("tokens") 
     .setOutputCol("features") 

    val PreLDApipeline = new Pipeline() 
     .setStages(Array(tokenizer, stopWordsRemover, countVectorizer)) 

    val PreLDAmodel = PreLDApipeline.fit(mp_listing_lda_df) 
    //comment out after saving it the first time 
    PreLDAmodel.write.overwrite().save(pre_lda_model_file_path) 

    val documents = PreLDAmodel.transform(mp_listing_lda_df) 
     .select("docId","features") 
     .rdd 
     .map{ case Row(row_num: Long, features: MLVector) => (row_num, features) } 
     .toDF() 

    //documents.printSchema() 
    val numTopics: Int = 20 
    val maxIterations: Int = 100 

    //note the FeaturesCol need to be set 
    val lda = new LDA() 
     .setOptimizer("em") 
     .setK(numTopics) 
     .setMaxIter(maxIterations) 
     .setFeaturesCol(("_2")) 

    val vocabArray = PreLDAmodel.stages(2).asInstanceOf[CountVectorizerModel].vocabulary 
    } 
} 

Am dass sie denken bezieht sich auf Konflikte im Abschnitt "Imports" des Codes. Schätze jede Hilfe.

Antwort

2

2 Dinge getan werden müssen:

Import implicits: Beachten Sie, dass diese erst nach einer Instanz von org.apache.spark.sql.SQLContext getan werden sollen, wird erstellt. außerhalb des Verfahrens

val sqlContext= new org.apache.spark.sql.SQLContext(sc) 

import sqlContext.implicits._ 

verschieben Fall Klasse:: Es sollte geschrieben werden als Fallklasse, durch die Verwendung von denen Sie das Schema der Datenrahmen zu definieren, sollte es außerhalb des Verfahrens definiert werden müssen. Sie können mehr darüber hier lesen: https://issues.scala-lang.org/browse/SI-6649

+0

Auch wenn Sie bereits eine SparkSession haben, können Sie darauf verweisen. Importieren Sie sc.sqlContext.implicits._ – Michael

Verwandte Themen