0

ich Kafka bin mit mit Frühlings-boot:Kafka Offset nicht erhöht

Kafka Produzent Klasse:

@Service 
public class MyKafkaProducer { 

    @Autowired 
    private KafkaTemplate<String, String> kafkaTemplate; 

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class); 

    // Send Message 
    public void sendMessage(String topicName, String message) throws Exception { 
     LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message); 
     ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message); 
     result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { 
      @Override 
      public void onSuccess(SendResult<String, String> result) { 
       LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(Throwable ex) { 
       LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage()); 
      } 
     }); 
    } 
} 

Kafka-Konfiguration:

spring.kafka.producer.retries=0 
spring.kafka.producer.batch-size=100000 
spring.kafka.producer.request.timeout.ms=30000 
spring.kafka.producer.linger.ms=10 
spring.kafka.producer.acks=0 
spring.kafka.producer.buffer-memory=33554432 
spring.kafka.producer.max.block.ms=5000 
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093 

Problem:

Ich habe 5 Partitionen eines Themas, sagen wir my-topic.

Was passiert ist, bekomme ich Erfolg (d. H. Nachricht wird erfolgreich an Kafka gesendet) Protokolle aber Offset von keiner Partition des Themas my-topic erhalten erhöht. Wie Sie oben sehen können, habe ich die Protokolle onSuccess und onFailure hinzugefügt. Was ich erwarte ist, wenn Kafka keine Nachricht an Kafka senden kann, sollte ich einen Fehler bekommen, aber in diesem Fall bekomme ich keine Fehlermeldung.

Das obige Verhalten von Kafka passiert bei einem Verhältnis von 100: 5 (d. H. Nach jeder 100 erfolgreichen Nachricht, die an kafka gesendet wurde).

Edit1: für einen erfolgreichen Fall Kafka Produzent Protokolle hinzufügen (dh Nachricht auf Verbraucherseite erfolgreich empfangen)

ProducerConfig - logAll:180] ProducerConfig values: 
    acks = 0 
    batch.size = 1000 
    block.on.buffer.full = false 
    bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094] 
    buffer.memory = 33554432 
    client.id = 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    linger.ms = 10 
    max.block.ms = 5000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 60000 
    retries = 0 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 

2017-10-24 14:30:09, [INFO] [karma-unified-notification-manager - ProducerConfig - logAll:180] ProducerConfig values: 
    acks = 0 
    batch.size = 1000 
    block.on.buffer.full = false 
    bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094] 
    buffer.memory = 33554432 
    client.id = producer-1 
    compression.type = none 
    connections.max.idle.ms = 540000 
    interceptor.classes = null 
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    linger.ms = 10 
    max.block.ms = 5000 
    max.in.flight.requests.per.connection = 5 
    max.request.size = 1048576 
    metadata.fetch.timeout.ms = 60000 
    metadata.max.age.ms = 300000 
    metric.reporters = [] 
    metrics.num.samples = 2 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    receive.buffer.bytes = 32768 
    reconnect.backoff.ms = 50 
    request.timeout.ms = 60000 
    retries = 0 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.min.time.before.relogin = 60000 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    sasl.mechanism = GSSAPI 
    security.protocol = PLAINTEXT 
    send.buffer.bytes = 131072 
    ssl.cipher.suites = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.endpoint.identification.algorithm = null 
    ssl.key.password = null 
    ssl.keymanager.algorithm = SunX509 
    ssl.keystore.location = null 
    ssl.keystore.password = null 
    ssl.keystore.type = JKS 
    ssl.protocol = TLS 
    ssl.provider = null 
    ssl.secure.random.implementation = null 
    ssl.trustmanager.algorithm = PKIX 
    ssl.truststore.location = null 
    ssl.truststore.password = null 
    ssl.truststore.type = JKS 
    timeout.ms = 30000 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 

Antwort

1

Ihr Code funktioniert gut für mich ...

@SpringBootApplication 
public class So46892185Application { 

    public static void main(String[] args) { 
     SpringApplication.run(So46892185Application.class, args); 
    } 

    private static final Logger LOGGER = LoggerFactory.getLogger(So46892185Application.class); 

    @Bean 
    public ApplicationRunner runner(KafkaTemplate<String, String> template) { 
     return args -> { 
      for (int i = 0; i < 10; i++) { 
       send(template, "foo" + i); 
      } 
     }; 
    } 

    public void send(KafkaTemplate<String, String> template, String message) { 
     ListenableFuture<SendResult<String, String>> result = template.send(topic().name(), message); 
     result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { 

      @Override 
      public void onSuccess(SendResult<String, String> result) { 
       LOGGER.info("sent message='{}'" 
         + " to partition={}" 
         + " with offset={}", message, result.getRecordMetadata().partition(), 
         result.getRecordMetadata().offset()); 
      } 

      @Override 
      public void onFailure(Throwable ex) { 
       LOGGER.error("Ex : " + ex.getMessage()); 
      } 

     }); 
    } 

    @Bean 
    public NewTopic topic() { 
     return new NewTopic("so46892185-3", 5, (short) 1); 
    } 

} 

Ergebnis

2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo3' to partition=1 with offset=0 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo8' to partition=1 with offset=1 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo1' to partition=2 with offset=0 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo6' to partition=2 with offset=1 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo0' to partition=0 with offset=0 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo5' to partition=0 with offset=1 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo4' to partition=3 with offset=0 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo9' to partition=3 with offset=1 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo2' to partition=4 with offset=0 
2017-10-23 11:12:05.907 INFO 86390 --- [ad | producer-1] com.example.So46892185Application   
: sent message='foo7' to partition=4 with offset=1 
+0

Das Problem kommt nicht jedes Mal. Ich kann auch erfolgreich eine Nachricht an Kafka senden. Aber sagen wir nach 2 bis 3 Stunden, die Nachricht wird erfolgreich von Kafka gesendet, aber keiner der Partitionsoffsets wird erhöht –

+0

Dann müssen Sie in Ihrer Problembeschreibung viel genauer sein. Sie müssen auch Beweise vorlegen; Worte allein sind nicht genug. Da Sie die Partition nicht in Ihre Protokollnachricht einschließen, ist es schwer zu erkennen, wie Sie ein Problem feststellen können. –

+0

Ich habe eine Nachricht in Kafka erzeugt, für die ich diese Protokolle erhalten habe, Erfolgsprotokolle 'onSuccess: 36] Nachricht gesendet = '' meine Kafka Nachricht 'an Partition = 4 mit Offset = -1' aber ich habe es auf meinem Consumer nicht erhalten wenn ich den Offset-Zähler der Partition 4 überprüfte, wird er nicht inkrementiert. Ich benutze Yahoo kafka-manager als Überwachungswerkzeug. –

1

Es zeigt nicht die Fehler, weil Sie habe spring.kafka.producer.acks als 0 gesetzt. Setze es auf 1 und deine Callback-Funktion sollte funktionieren. Dann können Sie sehen, ob der Offset erhöht wird oder nicht.

+0

Ihre Antwort war wirklich hilfreich. Ich danke dir sehr. –

Verwandte Themen