2017-11-28 2 views
0

Ich möchte ro laufen einfache Arbeit ecample mit Apache Spark. Mit lokalen JAR-Dateien in $SPARK_HOME/jars es richtig läuft, aber Maven Abhängigkeiten es Fehler:Spark Version Mismatch mit Maven Abhängigkeiten

java.lang.NoSuchMethodError: org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics()Lorg/apache/hadoop/fs/FileSystem$Statistics$StatisticsData; 
at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(SparkHadoopUtil.scala:149) 
at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(SparkHadoopUtil.scala:149) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
at scala.collection.AbstractTraversable.map(Traversable.scala:104) 
at org.apache.spark.deploy.SparkHadoopUtil$$anonfun$1.apply$mcJ$sp(SparkHadoopUtil.scala:149) 
at org.apache.spark.deploy.SparkHadoopUtil.getFSBytesReadOnThreadCallback(SparkHadoopUtil.scala:150) 
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:224) 
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:203) 
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:94) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 
at org.apache.spark.scheduler.Task.run(Task.scala:108) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:748) 

Hier ist der Code:

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaPairRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import scala.Tuple2; 

import java.util.Arrays; 

public class SparkTest { 
    public static void main(String[] args){ 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkTest"); 
     JavaSparkContext sc = new JavaSparkContext(conf); 

     JavaRDD<String> rdd = sc.textFile("file:///usr/local/spark/LICENSE"); 
     JavaPairRDD<String, Integer> counts = rdd 
       .flatMap(s -> Arrays.asList(s.split(" ")).iterator()) 
       .mapToPair(word -> new Tuple2<>(word, 1)) 
       .reduceByKey((a, b) -> a + b); 

     counts.coalesce(1).saveAsTextFile("file:///home/XXX/Desktop/Processing/spark"); 

    } 
} 

Hier ist POM.xml Datei:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>Processing</groupId> 
    <artifactId>Streaming</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-java</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-clients_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.10.0.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.2.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.7.3</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-filesystem_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
    </dependencies> 
</project> 

Es ist auch enthalten einige andere Apache-Software wie Hadoop und Flink.

Spark-Version installiert: 2.2.0 Download-Link: https://www.apache.org/dyn/closer.lua/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz

Hadoop installde version = 2.7.3

Etwas hier stimmen nicht überein!

+0

Post Hadoop verwandte Abhängigkeiten von Ihrem Pom zu. – philantrovert

+0

@philantrovert mein Code ist abhängig von Hadoop und nach offiziellen Spark-Site 'Zusätzlich, wenn Sie auf einen HDFS-Cluster zugreifen möchten, müssen Sie eine Abhängigkeit von Hadoop-Client für Ihre Version von HDFS hinzufügen. – soheil

+0

@philantrovert Hadoop Abhängigkeit wird hinzugefügt, wird jedoch nicht verwendet. – soheil

Antwort

1

Anhand Ihrer Abhängigkeiten und der Darstellung, wie Java Ihre Klasse mit org.apache.hadoop.fs.FileSystem.class.getResource("FileSyste‌​m.class") lädt, scheint Ihr Jar von geladen zu sein. Wenn Abhängigkeitsbaum zeigt mit mvn dependency:tree sehen wir seine eine transitive Abhängigkeit von flink-java: und flink-streaming-java_2.11

[INFO] +- org.apache.flink:flink-java:jar:1.3.2:compile 
[INFO] | +- ... 
[INFO] | +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.2:compile 
[INFO] +- org.apache.flink:flink-streaming-java_2.11:jar:1.3.2:compile 
[INFO] | +- org.apache.flink:flink-runtime_2.11:jar:1.3.2:compile 
[INFO] | | +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.2:compile 

Dieses Glas enthält das gesamte org.apache.hadoop.fs Paket die richtige Definition überschrieben und Ihr Problem verursacht. Sie können versuchen, die flink-java-Abhängigkeit zu entfernen oder flink-shaded-hadoop2auszuschließen, aber das kann zu Problemen mit Ihrem Code führen, da andere erforderliche Flink-Klassen möglicherweise fehlen. Zum Beispiel:

 <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-java</artifactId> 
      <version>1.3.2</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-shaded-hadoop2</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-java_2.11</artifactId> 
      <version>1.3.2</version> 
      <exclusions> 
       <exclusion> 
        <groupId>org.apache.flink</groupId> 
        <artifactId>flink-shaded-hadoop2</artifactId> 
       </exclusion> 
      </exclusions> 
     </dependency> 

Andernfalls müssen Sie eine andere Lösung je nach Projektanforderungen finden: mit Klassenladen spielen, um Ihre Klassen richtig, um sicherzustellen, geladen werden, Ihren Abhängigkeiten Versionen aktualisieren, so dass die Hadoop-Klassen mit Übereinstimmen Flink, etc.

0

Schließlich Erstellen eines anderen dedizierten Maven-Projekt für Spark mit nur spark-core maven Abhängigkeit funktioniert.

Kann jemand sagen warum?

+0

Verwenden Sie Ihre Abhängigkeiten und zeigen Sie, wie Java Ihre Klasse mit 'org.apache.hadoop.fs.FileSystem.class.getResource (" FileSystem.class ") lädt' es erscheint 'org.apache.flink: flink-java: jar: 1.3 .2' definiert eine transitive Abhängigkeit 'org.apache.flink: flink-shaded-hadoop2: jar: 1.3.2', die selbst das gesamte' org.apache.hadoop.fs'-Paket enthält ... Ihre App lädt die Klasse aus Dieses gefälschte Glas statt der richtigen, daher Ihr Fehler. Die Verwendung von Spark Core als Principal-Abhängigkeit scheint ein richtiger Workaround zu sein, aber wenn Sie 'flink-shaded-hadoop2' komplett ausschließen können ohne Probleme, die besser sind –

+0

@PierreB. Groß! Post eine Antwort und ich werde es richtig beantworten – soheil

+0

Froh, dass helfen würde, habe ich eine Antwort geschrieben –

0

Ab Flink 1.4 (Release ausstehend) kann Flink ohne jegliche Abhängigkeiten von hadoop laufen, und wenn Sie hadoop benötigen, ist es ausreichend, im Klassenpfad einen Hadoop zu haben. Dies sollte Ihr Leben erleichtern.