Ich versuche, Spark Kafka Direct Stream Ansatz zu verwenden. Es vereinfacht die Parallelität, indem es so viele RDD-Partitionen wie die Kafka-Topic-Partition erstellt, wie in dieser doc angegeben. Basierend auf meinem Verständnis wird Spark einen Executor für jede RDD-Partition erstellen, um die Berechnung durchzuführen. SoSpark Kafka Direct DStream - Wie viele Executoren und RDD-Partitionen im Garn-Cluster-Modus, wenn num-Executors gesetzt ist?
wenn ich die Anwendung in Garn-Cluster-Modus einreichen, und geben Sie Option num-Testamentsvollstrecker auf einen anderen Wert für die Anzahl der Partitionen, wie viele Vollstrecker wird es sein?
Zum Beispiel gibt es einen kafka Thema mit 2-Partition ist, und ich angeben num-Testamentsvollstrecker-4:
export YARN_CONF_DIR=$HADOOP_HOME/client_conf
./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
ich ihr einen Versuch geben und die Anzahl der Testamentsvollstrecker herauszufinden 4, und jeder Executor liest und verarbeitet Daten von Kafka. Warum? Es gibt nur 2 Partitionen im Kafka-Thema, Wie lesen 4 Executoren aus dem Kafka-Thema, das nur 2 Partitionen hat?
Im Folgenden finden Sie die Details der Funkenanwendung und Protokolle.
Meine Funken Anwendung, die empfangenen Nachrichten druckt (in flatMap-Methode) von kafka in jedem Vollstrecker:
...
String brokers = args[0];
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
JavaPairDStream<String, Integer> wordCounts =
messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
{
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
{
System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
tuple._2())); // print the kafka message received in executor
return Arrays.asList(SPACE.split(tuple._2()));
}
}).mapToPair(new PairFunction<String, String, Integer>()
{
public Tuple2<String, Integer> call(String word) throws Exception
{
System.out.println(String.format("[word]: %s", word));
return new Tuple2<String, Integer>(word, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>()
{
public Integer call(Integer v1, Integer v2) throws Exception
{
return v1 + v2;
}
});
wordCounts.print();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
System.out.println("gracefully shutdown Spark!");
jssc.stop(true, true);
}
});
jssc.start();
jssc.awaitTermination();
Mein Kafka Thema, mit 2 Partitionen. String "Hallo Hallo Wort 1", "Hallo Hallo Wort 2", "Hallo Hallo Wort 3", ... werden an das Thema gesendet.
Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Webconsle:
Konsolausgabe von Exekutor 1:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...
Konsolausgabe von Exekutor 2:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...
Konsolenausgabe von Testamentsvollstrecker 3:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...
Ich drucke die Anzahl der Partitionen in jeder RDD. Es hat den gleichen Wert wie die Partitionsnummern von kafka topic, was in meinem Fall 2 ist. Wie können 3 Executoren parallel eine Reihe RDDs verarbeiten, die insgesamt zwei Partitionen haben? Basierend auf der Konsolenausgabe jedes Executors verarbeiten alle Executors Daten von RDD. – yzandrew
Da DStream eine Reihe von RDDs ist, werden die RDDs für einige Zeitfenster in 2 der 3 Executoren verarbeitet.Und in einem anderen Zeitfenster werden die RDDs in 2 weiteren der 3 Executoren bearbeitet? Habe ich recht? – yzandrew