2015-07-10 3 views
15


Ich versuche, Spark Kafka Direct Stream Ansatz zu verwenden. Es vereinfacht die Parallelität, indem es so viele RDD-Partitionen wie die Kafka-Topic-Partition erstellt, wie in dieser doc angegeben. Basierend auf meinem Verständnis wird Spark einen Executor für jede RDD-Partition erstellen, um die Berechnung durchzuführen. SoSpark Kafka Direct DStream - Wie viele Executoren und RDD-Partitionen im Garn-Cluster-Modus, wenn num-Executors gesetzt ist?

wenn ich die Anwendung in Garn-Cluster-Modus einreichen, und geben Sie Option num-Testamentsvollstrecker auf einen anderen Wert für die Anzahl der Partitionen, wie viele Vollstrecker wird es sein?

Zum Beispiel gibt es einen kafka Thema mit 2-Partition ist, und ich angeben num-Testamentsvollstrecker-4:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf 

./bin/spark-submit \ 
--class playground.MainClass \ 
--master yarn-cluster \ 
--num-executors 4 \ 
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \ 
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1 

ich ihr einen Versuch geben und die Anzahl der Testamentsvollstrecker herauszufinden 4, und jeder Executor liest und verarbeitet Daten von Kafka. Warum? Es gibt nur 2 Partitionen im Kafka-Thema, Wie lesen 4 Executoren aus dem Kafka-Thema, das nur 2 Partitionen hat?

Im Folgenden finden Sie die Details der Funkenanwendung und Protokolle.

Meine Funken Anwendung, die empfangenen Nachrichten druckt (in flatMap-Methode) von kafka in jedem Vollstrecker:

... 
    String brokers = args[0]; 
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(","))); 
    kafkaParams.put("metadata.broker.list", brokers); 

    JavaPairInputDStream<String, String> messages = 
     KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, 
      kafkaParams, topicsSet); 

    JavaPairDStream<String, Integer> wordCounts = 
     messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() 
     { 
      public Iterable<String> call(Tuple2<String, String> tuple) throws Exception 
      { 
       System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(), 
        tuple._2())); // print the kafka message received in executor 
       return Arrays.asList(SPACE.split(tuple._2())); 
      } 

     }).mapToPair(new PairFunction<String, String, Integer>() 
     { 
      public Tuple2<String, Integer> call(String word) throws Exception 
      { 
       System.out.println(String.format("[word]: %s", word)); 
       return new Tuple2<String, Integer>(word, 1); 
      } 

     }).reduceByKey(new Function2<Integer, Integer, Integer>() 
     { 
      public Integer call(Integer v1, Integer v2) throws Exception 
      { 
       return v1 + v2; 
      } 

     }); 

    wordCounts.print(); 

    Runtime.getRuntime().addShutdownHook(new Thread(){ 
     @Override 
     public void run(){ 
      System.out.println("gracefully shutdown Spark!"); 
      jssc.stop(true, true); 
     } 
    }); 
    jssc.start(); 
    jssc.awaitTermination(); 

Mein Kafka Thema, mit 2 Partitionen. String "Hallo Hallo Wort 1", "Hallo Hallo Wort 2", "Hallo Hallo Wort 3", ... werden an das Thema gesendet.

Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs: 
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 

Webconsle: enter image description here

Konsolausgabe von Exekutor 1:

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 12 
... 

Konsolausgabe von Exekutor 2:

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 2 
... 

Konsolenausgabe von Testamentsvollstrecker 3:

... 
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3 
[word]: hello 
[word]: hello 
[word]: world 
[word]: 3 
... 
+0

Ich drucke die Anzahl der Partitionen in jeder RDD. Es hat den gleichen Wert wie die Partitionsnummern von kafka topic, was in meinem Fall 2 ist. Wie können 3 Executoren parallel eine Reihe RDDs verarbeiten, die insgesamt zwei Partitionen haben? Basierend auf der Konsolenausgabe jedes Executors verarbeiten alle Executors Daten von RDD. – yzandrew

+0

Da DStream eine Reihe von RDDs ist, werden die RDDs für einige Zeitfenster in 2 der 3 Executoren verarbeitet.Und in einem anderen Zeitfenster werden die RDDs in 2 weiteren der 3 Executoren bearbeitet? Habe ich recht? – yzandrew

Antwort

5

Jede Partition wird durch einen Testamentsvollstrecker zu einem Zeitpunkt betrieben (vorausgesetzt, Sie haben nicht die spekulative Ausführung eingeschaltet).

Wenn Sie mehr Executoren als Partitionen haben, werden nicht alle von ihnen an einer bestimmten RDD arbeiten. Aber wie du angemerkt hast, da jeder DStream eine Folge von RDDs ist, wird jeder Executor im Laufe der Zeit etwas arbeiten.

+0

Was passiert, wenn die Anzahl der Partitionen mehr als die Anzahl der Executoren beträgt? – Knight71

+0

Sobald ein Executor mit der Arbeit an einer Partition fertig ist, wird ihm eine weitere zugewiesen. –

+0

@CodyKoeninger Hallo, ich stoße auf ein Problem: Angenommen, es gibt 15 Kafka-Partitionen und 15 Executoren mit je 8 Kernen, manchmal (in den meisten Fällen funktioniert es gut) nur 3 Executoren bekommen Aufgaben wie 3 * 8> 15. Aber ich möchte damit jeder Executor sich um eine Partition von Kafka kümmert. Ist es möglich? (Auch wenn ich die rdd auf 196 umpartitioniere, bekommen andere Executoren keine Aufgabe. Ich benutze Spark 1.6.2) –

Verwandte Themen