1

Ich habe viele Artikel gelesen, aber nicht gefunden, wie Producer, die Thema mit mehreren Partition (Thema zur Laufzeit erstellt) mit Spring Integration Kafka.Wie kafka Produzent Thema mit mehr als einer Partition mit Spring Integration kafka konfigurieren

Ich verwende , um Kafka für meine Anwendung zu verstehen und zu konfigurieren.

bieten Bitte die Lösung für die gleiche

Eine weitere Sache, was die Verwendung von kafkaHeader.messageKey ist.

Update: Im Folgenden finden Sie Testcode, den ich
`public class OutboundTests { @Test public void mytest() entwickeln throws Exception {

KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>(); 
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic"); 
    meta.setValueClassType(SMSNotificationVO.class); 
    meta.setKeyClassType(String.class); 
    meta.setValueEncoder(new SMSObjectSerializer()); 

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10"); 
    meta1.setValueClassType(SMSNotificationVO.class); 
    meta1.setKeyClassType(String.class); 
    meta1.setValueEncoder(new SMSObjectSerializer()); 
    Properties producerProperties = new Properties(); 
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries" 
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092"); 
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties); 


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject()); 
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject()); 

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>(); 
    map.put("sms_topic", p); 
    map.put("sms_topic_10", p1); 

    ctx.setProducerConfigurations(map); 

    // java code to send message 
    Map<String, String> params = new HashMap<>(); 
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1"); 
    List<String> smsRecipients = new ArrayList<>(); 
    smsRecipients.add("9953225211"); 


    String message = "This Test message from junit topic sms_topic_1"; 
    Integer messageType =1; 

    SMSNotificationVO vo = new SMSNotificationVO(); 
    vo.setMessage(message); 
    vo.setMessageType(messageType); 
    vo.setParams(params); 
    vo.setSmsRecipients(smsRecipients); 


    try{ 
    KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx); 
    handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10")); 
    handler.handleMessage(MessageBuilder.withPayload(vo) 
      //if i remove the below comments I will get null pointer exception 
      //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key") 
      //.setHeader(KafkaHeaders.PARTITION_ID, "1") 
       .setHeader(KafkaHeaders.TOPIC, "sms_topic_10") 
       .build()); 

    }catch(Exception e){ 
     e.printStackTrace(); 
     Assert.fail("Kafka SMS Producer fail"); 
    } 

} 
}` 

Ich erhalte Null-Zeiger-Ausnahme. Unten ist die Erwähnung log:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springf[email protected]7b94089b]; nested exception is java.lang.NullPointerException 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84) 
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) 
at java.lang.reflect.Method.invoke(Unknown Source) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130) 
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127) 
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53) 
at kafka.producer.Producer.send(Producer.scala:77) 
at kafka.javaapi.producer.Producer.send(Producer.scala:33) 
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70) 
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197) 
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78) 
... 24 more` 

Dank

Antwort

1

Thema ist nicht auf die Producer verwandt. Sie sollten das Thema im Vorfeld vor dem Senden oder Empfangen von Nachrichten an/von ihm vorkonfiguriert haben.

Die Tatsache, dass Sie das Thema zur Laufzeit mit AdminUtils.createTopic() über die Spring Integration High-Level-API erstellen können, ist nur zum Testen Bequemlichkeit und sollte für die Produktion vermieden werden.

Die KafkaHeaders.messageKey ist vollständig der Norm Kafka messageKey abgebildet und ja verwendet werden kann, das Ziel partition im topic zu bestimmen.

Also, Sie sollten auf jeden Fall auf die offiziellen Apache Kafka Dokumentation gehen zu verstehen, wie Thema mit „mehreren Partitionen“ zu schaffen, und das, was von der Producer Seite zu tun in Bezug auf partition und messageKey Parameter.

+0

Danke für die Antwort Es gibt nur wenige Anfragen in meinem Kopf, die wie folgt lauten: 1. NachrichtTa zur Bestimmung der Partition im Thema zu verwenden ist (etwas wie Hashing), dann, wie man diese Taste zur Auswahl als i-Taste wählen will zufällig, so dass jedes Mal, wenn die Nachricht veröffentlicht wird, es in andere Partition geht, um Parallelität zu nutzen. 2. In der Link Erwähnung, "Message-Key-Ausdruck" und "Partition-ID-Ausdruck" auf, welche Grundlagen dieser Wert ist erwähnen 3. Wenn Message-Key-Ausdruck erwähnt wird, dann kann nur ich KafkaHeaders.messageKey verwenden. Danke – rahul

+0

Wenn Sie die bestimmte 'Partition' beim Senden angeben, wird der' messageKey' vom Partitionsbestimmungsalgorithmus umgangen. Dieser Ausdruck wird genau für die Kafka 'Producer' Operation verwendet. Siehe seine API. In der Tat kann der 'KafkaHeaders.messageKey' anstelle von 'Nachrichtenschlüsselausdruck' verwendet werden. –

+0

Aber wenn Sie meinen Code sehen, gab 'setHeader (KafkaHeaders.MESSAGE_KEY, "einige Schlüssel") 'Null Zeiger Ausnahme – rahul