2016-10-18 5 views
0

Ich versuche, DataFrame mit Spark sqlContext zu erstellen. Ich habe Spark 1.6.3 und Scala 2.10.5 verwendet. Unten ist mein Code zum Erstellen von DataFrames.Spark: Erstellen von DataFrame gibt Ausnahme

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.SQLContext 
import com.knoldus.pipeline.KMeansPipeLine 

object SimpleApp{ 

    def main(args:Array[String]){ 

    val conf = new SparkConf().setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    import sqlContext.implicits._ 

    val kMeans = new KMeansPipeLine() 
    val df = sqlContext.createDataFrame(Seq(
     ("[email protected]", 12000,"M"), 
     ("[email protected]", 43000,"M"), 
     ("[email protected]", 5000,"F"), 
     ("[email protected]", 60000,"M") 
    )).toDF("email", "income","gender") 

    val categoricalFeatures = List("gender","email") 
    val numberOfClusters = 2 
    val iterations = 10 
    val predictionResult = kMeans.predict(sqlContext,df,categoricalFeatures,numberOfClusters,iterations) 
    } 
} 

Es gibt mir die folgende Ausnahme. Welchen Fehler mache ich? Kann mir jemand helfen, das zu lösen?

Exception in thread "main" java.lang.NoSuchMethodError: 
    org.apache.spark.sql.SQLContext.createDataFrame(Lscala/collection/Seq;Lscala/ref lect/api/TypeTags$TypeTag;)Lorg/apache/spark/sql/Dataset; 
    at SimpleApp$.main(SimpleApp.scala:24) 
    at SimpleApp.main(SimpleApp.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

Die Abhängigkeiten ich verwendet habe, sind:

scalaVersion := "2.10.5" 
libraryDependencies ++= Seq( 
"org.apache.spark" % "spark-core_2.10" % "2.0.0" % "provided", 
"org.apache.spark" % "spark-sql_2.10" % "2.0.0" % "provided", 
"org.apache.spark" % "spark-mllib_2.10" % "2.0.0" % "provided", 
"knoldus" % "k-means-pipeline" % "0.0.1") 
+0

Ihr Code funktioniert gut für mich. Ich nehme an, dass Ihre Spark-Binärdateien mit Scala 2.11 kompiliert wurden, so dass sie nicht mit Ihrem Code mit Spark 2.10 laufen können, das umgekehrte Problem des hier beschriebenen: http://stackoverflow.com/questions/27728731/scala-code- throw-exception-in-spark –

+0

@TzachZohar Wie kann ich das beheben? – Balkrushn

+2

Erstens - Ihre Abhängigkeiten zeigen, dass Sie Spark 2.0.0 nicht 1.6.3 verwenden, wie Sie oben angeben. Spark 2.0.0 verwendet Scala 2.11 standardmäßig, soweit ich weiß, wenn Sie es mit Scala 2.10 verwenden möchten, müssen Sie es selbst erstellen, siehe http://spark.apache.org/docs/latest/building- spark.html # Gebäude-für-Scala-210. Verwenden Sie also entweder Scala 2.11 oder verwenden Sie eine Spark-Version, die gemäß dieser Anleitung kompiliert wurde. –

Antwort

1

Wie ich sehe in Ihrem createDataFrame zweite Argument verfehlt. Verfahren Muster hier beschrieben: https://spark.apache.org/docs/1.6.1/api/scala/index.html#[email protected](org.apache.spark.api.java.JavaRDD,%20java.lang.Class)

In Ihrem Fall wird es

def createDataFrame[A <: Product](data: Seq[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame 

:: Experimental sein :: Erstellt eine Datenrahmen von einem lokalen Seq von Produkt.

ODER Converting Seq in List/RDD und mit der Methode Muster mit zwei Argumenten

+3

ist überhaupt nicht wahr - das zweite Argument ist implizit, also sollte man es nicht explizit angeben müssen. Der Code in Frage stellt tatsächlich _works_ die richtigen Abhängigkeiten. –

+0

Hier sind meine Abhängigkeiten: – Balkrushn

+0

scalaVersion: = "2.10.5" libraryDependencies ++ = Seq ( "org.apache.spark"% "spark-core_2.10"% "2.0.0"% "bereitgestellt", "org.apache.spark"% "spark-sql_2.10"% "2.0.0"% "bereitgestellt", "org.apache.spark"% "spark-mllib_2.10"% "2.0.0"% "bereitgestellt", "knoldus"% "k-means-pipeline"% "0.0.1" ) – Balkrushn