2014-12-30 4 views
9

Ich habe gerade mit Spark Streaming begonnen und ich versuche, eine Beispielanwendung zu bauen, die Wörter aus einem Kafka-Stream zählt. Obwohl es kompiliert mit sbt package, wenn ich es ausführen, bekomme ich NoClassDefFoundError. Dieses post scheint das gleiche Problem zu haben, aber die Lösung ist für Maven und ich konnte es nicht mit sbt reproduzieren.KafkaUtils-Klasse nicht gefunden in Spark-Streaming

KafkaApp.scala:

import org.apache.spark._ 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 

object KafkaApp { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("kafkaApp").setMaster("local[*]") 
    val ssc = new StreamingContext(conf, Seconds(1)) 
    val kafkaParams = Map(
     "zookeeper.connect" -> "localhost:2181", 
     "zookeeper.connection.timeout.ms" -> "10000", 
     "group.id" -> "sparkGroup" 
    ) 

    val topics = Map(
     "test" -> 1 
    ) 

    // stream of (topic, ImpressionLog) 
    val messages = KafkaUtils.createStream(ssc, kafkaParams, topics, storage.StorageLevel.MEMORY_AND_DISK) 
    println(s"Number of words: %{messages.count()}") 
    } 
} 

build.sbt:

name := "Simple Project" 

version := "1.1" 

scalaVersion := "2.10.4" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming" % "1.1.1", 
    "org.apache.spark" %% "spark-streaming-kafka" % "1.1.1" 
) 

resolvers += "Akka Repository" at "http://repo.akka.io/releases/" 

Und ich eintragen mit:

bin/spark-submit \ 
    --class "KafkaApp" \ 
    --master local[4] \ 
    target/scala-2.10/simple-project_2.10-1.1.jar 

Fehler:

14/12/30 19:44:57 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://[email protected]:65077/user/HeartbeatReceiver 
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ 
    at KafkaApp$.main(KafkaApp.scala:28) 
    at KafkaApp.main(KafkaApp.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 

Antwort

14

spark-submit legt das Paket mit KafkaUtils nicht automatisch ab. Sie müssen in Ihrem Projekt JAR haben. Dazu müssen Sie ein All-Inclusive-Uber-Glas mit sbt assembly erstellen. Hier ist ein Beispiel build.sbt.

https://github.com/tdas/spark-streaming-external-projects/blob/master/kafka/build.sbt

Sie natürlich müssen auch die Montage Plugin SBT hinzuzufügen.

https://github.com/tdas/spark-streaming-external-projects/tree/master/kafka/project

+0

Ich bin auch gleiche Problem bekommen, während ich bin mit Maven. Danach habe ich "org.apache.maven.plugins" in meine pom.xml aufgenommen, aber das Problem ist ungelöst. Irgendwelche anderen Parameter muss ich überprüfen? –

+0

mit der Änderung, wenn ich Stb-Paket ausführen, habe ich Fehler. : Fehler: nicht gefunden: Objekt AssemblyKeys import AssemblyKeys._ ^ [Fehler] Geben Sie Fehler in Ausdruck – johnsam

+0

@johnsam Lassen Sie einfach die erste Import-Zeile und die Zeile "AssemblySettings", funktioniert für mich. – pederpansen

6

Bitte versuchen Sie es durch alle Abhängigkeits Gläser einschließlich während antrag:

./spark-submit --name "SampleApp" --deploy-mode client--master spark://host:7077 --class com.stackexchange.SampleApp --jars $SPARK_INSTALL_DIR/spark-streaming-kafka_2.10-1.3.0.jar,$KAFKA_INSTALL_DIR/libs/kafka_2.10-0.8.2.0.jar,$KAFKA_INSTALL_DIR/libs/metrics-core-2.2.0.jar,$KAFKA_INSTALL_DIR/libs/zkclient-0.3.jar spark-example-1.0-SNAPSHOT.jar

2

Nach build.sbt für mich gearbeitet. Dazu müssen Sie auch das sbt-assembly-Plugin in eine Datei unter dem Verzeichnis projects/ einfügen.

build.sbt

name := "NetworkStreaming" // https://github.com/sbt/sbt-assembly/blob/master/Migration.md#upgrading-with-bare-buildsbt 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming_2.10" % "1.4.1", 
    "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.4.1",   // kafka 
    "org.apache.hbase" % "hbase" % "0.92.1", 
    "org.apache.hadoop" % "hadoop-core" % "1.0.2", 
    "org.apache.spark" % "spark-mllib_2.10" % "1.3.0" 
) 

mergeStrategy in assembly := { 
    case m if m.toLowerCase.endsWith("manifest.mf")   => MergeStrategy.discard 
    case m if m.toLowerCase.matches("meta-inf.*\\.sf$")  => MergeStrategy.discard 
    case "log4j.properties"         => MergeStrategy.discard 
    case m if m.toLowerCase.startsWith("meta-inf/services/") => MergeStrategy.filterDistinctLines 
    case "reference.conf"         => MergeStrategy.concat 
    case _             => MergeStrategy.first 
} 

Projekt/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

0

erfüllen das gleiche Problem, ich löste es durch das Glas mit Abhängigkeiten aufzubauen.

fügen Sie den Code unten

<build> 
    <sourceDirectory>src/main/java</sourceDirectory> 
    <testSourceDirectory>src/test/java</testSourceDirectory> 
    <plugins> 
     <!-- 
        Bind the maven-assembly-plugin to the package phase 
     this will create a jar file without the storm dependencies 
     suitable for deployment to a cluster. 
     --> 
     <plugin> 
     <artifactId>maven-assembly-plugin</artifactId> 
     <configuration> 
      <descriptorRefs> 
      <descriptorRef>jar-with-dependencies</descriptorRef> 
      </descriptorRefs> 
      <archive> 
      <manifest> 
       <mainClass></mainClass> 
      </manifest> 
      </archive> 
     </configuration> 
     <executions> 
      <execution> 
      <id>make-assembly</id> 
      <phase>package</phase> 
      <goals> 
       <goal>single</goal> 
      </goals> 
      </execution> 
     </executions> 
     </plugin> 
    </plugins> 
</build>  

mvn Paket senden Sie das "example-jar-with-dependencies.jar"

0

Hinzugefügt wurde die Abhängigkeit von außen, Projekt pom.xml -> Eigenschaften- -> Java Build Path -> Bibliotheken -> Externe Jars hinzufügen und das benötigte jar hinzufügen.

das löste mein Problem.

0

Mit Spark-1.6 für mich die Arbeit erledigen, ohne den Aufwand so viele externe Gläser Handhabung ... Kann ganz verwalten bekommen erschweren ...