0

Für meine Demo-Anwendung muss ich einen Ruhe-Controller erstellen, um die Nachricht in der Kafka-Warteschlange zurückgeben. Ich habe die feder kafka Referenzhandbuch gelesen und implementiert, um die Verbraucher Konfiguration und erstellt Bohnen wie untenRuhe-Controller, um Datensätze in kafka über Frühling kafka

@Configuration 
@EnableKafka 
public class ConsumerConfiguration { 

    @Value("${kafka.bootstrap-servers}") 
    private String bootstrapServers; 

    @Bean 
    public Map<String, Object> consumerConfigs() { 
     Map<String, Object> props = new HashMap<>(); 
     // list of host:port pairs used for establishing the initial connections to the Kakfa cluster 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); 
     // allows a pool of processes to divide the work of consuming and processing records 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx"); 

     return props; 
    } 

    @Bean 
    public ConsumerFactory<String, Transaction> transactionConsumerFactory() { 
     return new DefaultKafkaConsumerFactory<>(
       consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Transaction.class) 
     ); 
    } 

    @Bean 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Transaction>> kafkaListenerContainerFactory() { 
     ConcurrentKafkaListenerContainerFactory<String, Transaction> factory = 
       new ConcurrentKafkaListenerContainerFactory<>(); 
     factory.setConsumerFactory(transactionConsumerFactory()); 

     return factory; 
    } 

    @Bean 
    public Consumer consumer() { 
     return new Consumer(); 
    } 

} 

und eine andere Klasse Verbraucher wie unter

public class Consumer { 

    private CountDownLatch latch = new CountDownLatch(1); 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

    @KafkaListener(topics = "${kafka.topic.name}") 
    public void receive(Transaction transaction) { 
     latch.countDown(); 
    } 
} 

Wie kann ich implementieren nun die logische Transaktion abzurufen aus der Verbraucher auf jedem wird auf dem Controller getroffen.

Vielen Dank im Voraus.

Antwort

2

Nun, die @KafkaListener produziert unabhängige langlebige Prozess zum Streamen von Datensätzen von Kafka auf den Rückruf. Da Sie über das REST GET-Ereignis sprechen, haben Sie keine Wahl, wenn Sie nicht die KafkaConsumer von ConsumerFactory abrufen und poll() manuell von der Controller-Methode aufrufen.

+0

Dank Artem für Ihre Antwort, können Sie mich bitte auf einige Anleitung oder Dokumentation für die gleiche verweisen. – maverick

+0

Vielleicht das: http://cloudutable.com/blog/kafka-tutorial-kafka-consumer/index.html? –

+0

Danke, ich habe mir den Code angeschaut und es scheint, dass er kein Spring-Kafka benutzt und direkt die Kafka API benutzt. Habe ich recht? – maverick

Verwandte Themen