4

Ich benutze Spark Streaming mit Kafka Thema. Thema wird mit 5 Partitionen erstellt. Alle meine Nachrichten werden unter Verwendung von Tabellenname als Schlüssel im kafka-Thema veröffentlicht. Gegeben, ich nehme an, alle Nachrichten für diese Tabelle sollten zu der gleichen Partition gehen. Aber ich merke in den Spark-Log-Nachrichten für die gleiche Tabelle geht manchmal zu Executor-Knoten-1 und irgendwann geht zum Executor-Knoten-2.Kafka Topic Partition und Spark Executor Mapping

Ich verwende Code in Garn-Cluster-Modus folgenden Befehl:

spark-submit --name DataProcessor --master yarn-cluster --files /opt/ETL_JAR/executor-log4j-spark.xml,/opt/ETL_JAR/driver-log4j-spark.xml,/opt/ETL_JAR/application.properties --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=driver-log4j-spark.xml" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=executor-log4j-spark.xml" --class com.test.DataProcessor /opt/ETL_JAR/etl-all-1.0.jar 

und diese Vorlage erstellt 1-Fahrer sagen auf node-1 und 2 Testamentsvollstrecker auf node-1 und Knoten-2 lassen.

Ich möchte nicht Node-1 und Node-2-Executoren die gleiche Partition lesen. aber das passiert

Auch versucht folgende Konfiguration, um Verbrauchergruppe, aber keinen Unterschied zu spezifizieren.

kafkaParams.put("group.id", "app1"); 

Dies ist, wie wir den Strom mit createDirectStream Methode * Nicht durch zookeeper erstellen.

HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    kafkaParams.put("auto.offset.reset", "largest"); 
    kafkaParams.put("group.id", "app1"); 

     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
       jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topicsSet 
     ); 

kompletter Code:

import java.io.Serializable; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Iterator; 

import org.apache.commons.lang3.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.VoidFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 

public class DataProcessor2 implements Serializable { 
    private static final long serialVersionUID = 3071125481526170241L; 

    private static Logger log = LoggerFactory.getLogger("DataProcessor"); 

    public static void main(String[] args) { 
     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 
     DataProcessorContextFactory3 factory = new DataProcessorContextFactory3(); 
     JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(sparkCheckPointDir, factory); 

     // Start the process 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

} 

class DataProcessorContextFactory3 implements JavaStreamingContextFactory, Serializable { 
    private static final long serialVersionUID = 6070911284191531450L; 

    private static Logger logger = LoggerFactory.getLogger(DataProcessorContextFactory.class); 

    DataProcessorContextFactory3() { 
    } 

    @Override 
    public JavaStreamingContext create() { 
     logger.debug("creating new context..!"); 

     final String brokers = ApplicationProperties.getProperty(Consts.KAFKA_BROKERS_NAME); 
     final String topic = ApplicationProperties.getProperty(Consts.KAFKA_TOPIC_NAME); 
     final String app = "app1"; 
     final String offset = ApplicationProperties.getProperty(Consts.KAFKA_CONSUMER_OFFSET, "largest"); 

     logger.debug("Data processing configuration. brokers={}, topic={}, app={}, offset={}", brokers, topic, app, 
       offset); 
     if (StringUtils.isBlank(brokers) || StringUtils.isBlank(topic) || StringUtils.isBlank(app)) { 
      System.err.println("Usage: DataProcessor <brokers> <topic>\n" + Consts.KAFKA_BROKERS_NAME 
        + " is a list of one or more Kafka brokers separated by comma\n" + Consts.KAFKA_TOPIC_NAME 
        + " is a kafka topic to consume from \n\n\n"); 
      System.exit(1); 
     } 
     final String majorVersion = "1.0"; 
     final String minorVersion = "3"; 
     final String version = majorVersion + "." + minorVersion; 
     final String applicationName = "DataProcessor-" + topic + "-" + version; 
     // for dev environment 
     SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName(applicationName); 
     // for cluster environment 
     //SparkConf sparkConf = new SparkConf().setAppName(applicationName); 
     final long sparkBatchDuration = Long 
       .valueOf(ApplicationProperties.getProperty(Consts.SPARK_BATCH_DURATION, "10")); 

     final String sparkCheckPointDir = ApplicationProperties.getProperty(Consts.SPARK_CHECKPOINTING_DIR); 

     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(sparkBatchDuration)); 
     logger.debug("setting checkpoint directory={}", sparkCheckPointDir); 
     jssc.checkpoint(sparkCheckPointDir); 

     HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(","))); 

     HashMap<String, String> kafkaParams = new HashMap<String, String>(); 
     kafkaParams.put("metadata.broker.list", brokers); 
     kafkaParams.put("auto.offset.reset", offset); 
     kafkaParams.put("group.id", "app1"); 

//   @formatter:off 
      JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc, 
        String.class, 
        String.class, 
        StringDecoder.class, 
        StringDecoder.class, 
        kafkaParams, 
        topicsSet 
      ); 
//   @formatter:on 
     processRDD(messages, app); 
     return jssc; 
    } 

    private void processRDD(JavaPairInputDStream<String, String> messages, final String app) { 
     JavaDStream<MsgStruct> rdd = messages.map(new MessageProcessFunction()); 

     rdd.foreachRDD(new Function<JavaRDD<MsgStruct>, Void>() { 

      private static final long serialVersionUID = 250647626267731218L; 

      @Override 
      public Void call(JavaRDD<MsgStruct> currentRdd) throws Exception { 
       if (!currentRdd.isEmpty()) { 
        logger.debug("Receive RDD. Create JobDispatcherFunction at HOST={}", FunctionUtil.getHostName()); 
        currentRdd.foreachPartition(new VoidFunction<Iterator<MsgStruct>>() { 

         @Override 
         public void call(Iterator<MsgStruct> arg0) throws Exception { 
          while(arg0.hasNext()){ 
           System.out.println(arg0.next().toString()); 
          } 
         } 
        }); 
       } else { 
        logger.debug("Current RDD is empty."); 
       } 
       return null; 
      } 
     }); 
    } 
    public static class MessageProcessFunction implements Function<Tuple2<String, String>, MsgStruct> { 
     @Override 
     public MsgStruct call(Tuple2<String, String> data) throws Exception { 
      String message = data._2(); 
      System.out.println("message:"+message); 
      return MsgStruct.parse(message); 
     } 

    } 
    public static class MsgStruct implements Serializable{ 
     private String message; 
     public static MsgStruct parse(String msg){ 
      MsgStruct m = new MsgStruct(); 
      m.message = msg; 
      return m; 
     } 
     public String toString(){ 
      return "content inside="+message; 
     } 
    } 

} 

Antwort

3

Mit dem DirectStream Ansatz ist es eine richtige Annahme, dass Nachrichten an eine Kafka-Partition gesendet in der gleichen Spark-Partition landen werden.

Wir können nicht davon ausgehen, dass jede Spark-Partition jedes Mal von demselben Spark-Mitarbeiter bearbeitet wird. In jedem Batchintervall wird die Spark-Task für jede Partition erstellt und an den Cluster zur Verarbeitung gesendet, wobei sie auf einem verfügbaren Worker landet.

Was suchen Sie nach Partitionslokalität? Der einzige partition locality that the direct kafka consumer supports ist der Kafka-Host, der den Versatzbereich enthält, der verarbeitet wird, wenn Spark- und Kafka-Deployments zusammengelegt werden; aber das ist eine Deployment-Topologie, die ich nicht oft sehe.

Für den Fall, dass Ihre Anforderungen das Vorhandensein eines Hostlokals vorschreiben, sollten Sie in Apache Samza oder Kafka Streams schauen.

3

Gemäß Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) können Sie eine explizite Zuordnung von Partitionen zu Hosts angeben.

Angenommen, Sie haben zwei Hosts (h1 und h2), und das Kafka-Thema topic-name hat drei Partitionen. Der folgende kritische Code zeigt Ihnen, wie Sie eine angegebene Partition einem Host in Java zuordnen.

Map<TopicPartition, String> partitionMapToHost = new HashMap<>(); 
// partition 0 -> h1, partition 1 and 2 -> h2 
partitionMapToHost.put(new TopicPartition("topic-name", 0), "h1"); 
partitionMapToHost.put(new TopicPartition("topic-name", 1), "h2"); 
partitionMapToHost.put(new TopicPartition("topic-name", 2), "h2"); 
List<String> topicCollection = Arrays.asList("topic-name"); 
Map<String, Object> kafkaParams = new HasMap<>(); 
kafkaParams.put("bootstrap.servers", "10.0.0.2:9092,10.0.0.3:9092"); 
kafkaParams.put("group.id", "group-id-name"); 
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
JavaInputDStream<ConsumerRecord<String, String>> records = KafkaUtils.createDirectStream(jssc, 
    LocationStrategies.PreferFixed(partitionMapToHost), // PreferFixed is the key 
    ConsumerStrategies.Subscribe(topicCollection, kafkaParams)); 

Sie können auch LocationStrategies.PreferConsistent(), die Partitionen zu verteilen verwenden gleichmäßig über verfügbar Testamentsvollstrecker, und sicherzustellen, dass eine angegebene Partition nur um einen bestimmten Testamentsvollstrecker verbraucht wird.

Verwandte Themen