Für meine Demo-Anwendung muss ich einen Ruhe-Controller erstellen, um die Nachricht in der Kafka-Warteschlange zurückgeben. Ich habe die feder kafka Referenzhandbuch gelesen und implementiert, um die Verbraucher Konfiguration und erstellt Bohnen wie untenRuhe-Controller, um Datensätze in kafka über Frühling kafka
@Configuration
@EnableKafka
public class ConsumerConfiguration {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "trx");
return props;
}
@Bean
public ConsumerFactory<String, Transaction> transactionConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Transaction.class)
);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Transaction>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transactionConsumerFactory());
return factory;
}
@Bean
public Consumer consumer() {
return new Consumer();
}
}
und eine andere Klasse Verbraucher wie unter
public class Consumer {
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = "${kafka.topic.name}")
public void receive(Transaction transaction) {
latch.countDown();
}
}
Wie kann ich implementieren nun die logische Transaktion abzurufen aus der Verbraucher auf jedem wird auf dem Controller getroffen.
Vielen Dank im Voraus.
Dank Artem für Ihre Antwort, können Sie mich bitte auf einige Anleitung oder Dokumentation für die gleiche verweisen. – maverick
Vielleicht das: http://cloudutable.com/blog/kafka-tutorial-kafka-consumer/index.html? –
Danke, ich habe mir den Code angeschaut und es scheint, dass er kein Spring-Kafka benutzt und direkt die Kafka API benutzt. Habe ich recht? – maverick