2016-07-21 6 views
1

Ich verwende ein Programm, das Apache Spark verwendet, um Daten vom Apache Kafka-Cluster abzurufen und die Daten in eine Hadoop-Datei zu stellen. Mein Programm ist unter:Hadoop OutputFormat RunTimeException beim Ausführen von Apache Spark Kafka Stream

public final class SparkKafkaConsumer { 
    public static void main(String[] args) { 
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     String[] topics = "Topic1, Topic2, Topic3".split(","); 
     for (String topic: topics) { 
      topicMap.put(topic, 3); 
     } 
     JavaPairReceiverInputDStream<String, String> messages = 
       KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap); 
     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
      public Iterable<String> call(String x) { 
       return Lists.newArrayList(",".split(x)); 
      } 
     }); 
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() { 
        public Tuple2<String, Integer> call(String s) { 
         return new Tuple2<String, Integer>(s, 1); 
        } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
        public Integer call(Integer i1, Integer i2) { 
         return i1 + i2; 
        } 
       }); 
     wordCounts.print(); 
     wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt"); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

ich diesen Befehl bin mit dem Antrag einzureichen: C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar

Ich erhalte diese Fehlermeldung: java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)

Was diesen Fehler verursacht und wie löse ich es ?

+0

Das ist wie ein Problem in Funken sieht http://stackoverflow.com/questions/29007085/saveasnewapihadoopfile-giving-error-when-used- as-output-format .. –

+0

Könnten Sie 'saveAsHadoopFiles (" hdfs: // localhost: 8020/user/spark/stream/"," txt ", Text.class, IntWritable.class, TextOutputFormat.class)' stattdessen versuchen? –

+0

@Hawknight Was ist das vollständige Paket von 'Text.class' und' TextOutputFormat.class'? – khateeb

Antwort

2

Ich stimme zu, dass der Fehler nicht wirklich evokativ ist, aber es ist normalerweise besser, das Format der Daten anzugeben, die in einer der saveAsHadoopFile Methoden ausgegeben werden sollen, um sich vor dieser Art von Ausnahme zu schützen.

Hier ist der Prototyp Ihrer bestimmten Methode in der Dokumentation:

saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass) 

In Ihrem Beispiel, dass entsprechen würde:

wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class) 

Basierend auf dem Format Ihrer wordCounts PairDStream, wählte ich Text Der Schlüssel ist vom Typ String und , da der dem Schlüssel zugeordnete Wert vom Typ Integer ist.

Verwenden Sie TextOutputFormat, wenn Sie nur einfache Textdateien benötigen, aber Sie können in die Unterklassen FileOutputFormat nach weiteren Ausgabeoptionen suchen.

Da dies wurde auch gefragt, kommt die Text Klasse aus dem org.apache.hadoop.io Paket und die TextOutputFormat stammt aus dem org.apache.hadoop.mapred Paket.

1

Nur der Vollständigkeit halber (@ Jonathan die richtige Antwort gab)

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.TextOutputFormat; 

... 
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class) 
Verwandte Themen