Ich benutze Spring Cloud Stream mit Kafka-Binder. Es funktioniert gut, aber der Client erhält doppelte Nachrichten. Habe schon alle Kafka Consumer Properties ohne Ergebnis ausprobiert.Doppelte Nachrichtenverarbeitung mit Spring Cloud Stream mit Kafka
Überprüfen Sie 2 Klassen in meinem Beispiel der Anwendung - AggregateApplication und EventFilterApplication. Falls ich EventFilterApplication starte - habe ich nur 1 Nachricht, im Fall von AggregateApplication - 2 gleiche Nachrichten. Hier
ist mein Code unten:
1) Aggregator
import com.example.EventFilterApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder;
@SpringBootApplication
public class AggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args)
.from(EventFilterApplication.class)
.run(args);
}
}
2) EventFilterApplication
@SpringBootApplication
@EnableBinding(EventFilterApplication.LiveProcessor.class)
public class EventFilterApplication {
@Autowired
LiveProcessor source;
@StreamListener(LiveProcessor.INPUT)
public void handle(byte[] event) {
try {
System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event));
} catch (Exception e) {
System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event)));
}
}
public static void main(String[] args) {
SpringApplication.run(EventFilterApplication.class, args);
}
interface LiveProcessor extends Source {
String INPUT = "liveSource";
@Input(INPUT)
SubscribableChannel input();
}
}
3) application.yml
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka-broker.example.com:9092
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: kafka-zookeeper.example.com
type: kafka
bindings:
liveSource:
binder: kafka
consumer:
headerMode: raw
autoCommitOffset: true
destination: topic_example_name
4) build.gradle
buildscript {
ext { springBootVersion = '1.4.2.RELEASE' }
repositories {
jcenter()
maven { url 'http://repo.spring.io/plugins-release' }
}
dependencies {
classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7")
classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion")
classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE")
}
}
ext['logstashLogbackEncoderV'] = '4.8'
ext['springCloudV'] = 'Camden.SR1'
ext['springCloudStreamV'] = 'Brooklyn.SR2'
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE'
subprojects {
apply plugin: 'java'
apply plugin: 'propdeps'
apply plugin: 'propdeps-idea'
apply plugin: "io.spring.dependency-management"
sourceCompatibility = 1.8
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1"
mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2"
mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE"
}
}
dependencies {
compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") {
exclude module: "spring-boot-starter-tomcat"
exclude group: 'log4j'
}
compile("org.springframework.cloud:spring-cloud-starter-stream-kafka")
compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") {
exclude group: "org.slf4j"
}
compile("org.springframework.cloud:spring-cloud-stream:")
compile("org.springframework.cloud:spring-cloud-starter-sleuth")
compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}")
testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") {
exclude group: "org.slf4j"
}
}
}
Eigentlich brauche ich Aggregation, um mehrere Komponenten meiner Anwendung (zusammen mit EventFilterApplication) zu verwenden. Das Beispiel wurde viel vereinfacht, es gibt auch EventCheckerApplication und viele andere, die bei AggregateApplication assembled sind. –
Das ist in Ordnung, aber fügen Sie 'EventFilterApplication' nicht als Argument für' AggregateApplicationBuilder' hinzu. –
Vielen Dank für Ihre Hilfe, es funktioniert. Es bleibt zu verstehen, warum es so ist) –