2017-03-01 4 views
0

Ich benutze Spring Cloud Stream mit Kafka-Binder. Es funktioniert gut, aber der Client erhält doppelte Nachrichten. Habe schon alle Kafka Consumer Properties ohne Ergebnis ausprobiert.Doppelte Nachrichtenverarbeitung mit Spring Cloud Stream mit Kafka

Überprüfen Sie 2 Klassen in meinem Beispiel der Anwendung - AggregateApplication und EventFilterApplication. Falls ich EventFilterApplication starte - habe ich nur 1 Nachricht, im Fall von AggregateApplication - 2 gleiche Nachrichten. Hier


ist mein Code unten:

1) Aggregator

import com.example.EventFilterApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder; 

@SpringBootApplication 
public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args) 
      .from(EventFilterApplication.class) 
      .run(args); 
    } 
} 

2) EventFilterApplication

@SpringBootApplication 
@EnableBinding(EventFilterApplication.LiveProcessor.class) 
public class EventFilterApplication { 

    @Autowired 
    LiveProcessor source; 

    @StreamListener(LiveProcessor.INPUT) 
    public void handle(byte[] event) { 
     try { 

      System.out.println(new Date().getTime() + ": event was processed:" + Arrays.toString(event)); 

     } catch (Exception e) { 
      System.out.println(String.format("Error={%s} on processing message=%s", e.getMessage(), Arrays.toString(event))); 
     } 
    } 
    public static void main(String[] args) { 
     SpringApplication.run(EventFilterApplication.class, args); 
    } 

    interface LiveProcessor extends Source { 

     String INPUT = "liveSource"; 

     @Input(INPUT) 
     SubscribableChannel input(); 
    } 
} 

3) application.yml

spring: 
cloud: 
    stream: 
     kafka: 
      binder: 
       brokers: kafka-broker.example.com:9092 
       defaultBrokerPort: 9092 
       defaultZkPort: 2181 
       zkNodes: kafka-zookeeper.example.com 
     type: kafka 
     bindings: 
      liveSource: 
       binder: kafka 
       consumer: 
        headerMode: raw 
        autoCommitOffset: true 
       destination: topic_example_name 

4) build.gradle

buildscript { 
    ext { springBootVersion = '1.4.2.RELEASE' } 
    repositories { 
     jcenter() 
     maven { url 'http://repo.spring.io/plugins-release' } 
    } 
    dependencies { 
     classpath("org.springframework.build.gradle:propdeps-plugin:0.0.7") 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:$springBootVersion") 
     classpath("io.spring.gradle:dependency-management-plugin:0.5.2.RELEASE") 
    } 
} 

ext['logstashLogbackEncoderV'] = '4.8' 
ext['springCloudV'] = 'Camden.SR1' 
ext['springCloudStreamV'] = 'Brooklyn.SR2' 
ext['springIntegrationKafkaV'] = '1.3.1.RELEASE' 

subprojects { 
    apply plugin: 'java' 
    apply plugin: 'propdeps' 
    apply plugin: 'propdeps-idea' 
    apply plugin: "io.spring.dependency-management" 

    sourceCompatibility = 1.8 

    dependencyManagement { 
     imports { 
      mavenBom "org.springframework.cloud:spring-cloud-dependencies:Camden.SR1" 
      mavenBom "org.springframework.cloud:spring-cloud-stream-dependencies:Brooklyn.SR2" 
      mavenBom "org.springframework.cloud.stream.app:spring-cloud-stream-app-dependencies:1.0.4.RELEASE" 
     } 
    } 

    dependencies { 
     compile("org.springframework.boot:spring-boot-starter-web:$springBootVersion") { 
      exclude module: "spring-boot-starter-tomcat" 
      exclude group: 'log4j' 
     } 

     compile("org.springframework.cloud:spring-cloud-starter-stream-kafka") 

     compile("org.springframework.integration:spring-integration-kafka:$springIntegrationKafkaV") { 
      exclude group: "org.slf4j" 
     } 

     compile("org.springframework.cloud:spring-cloud-stream:") 

     compile("org.springframework.cloud:spring-cloud-starter-sleuth") 

     compile("net.logstash.logback:logstash-logback-encoder:${logstashLogbackEncoderV}") 

     testCompile("org.springframework.boot:spring-boot-starter-test:$springBootVersion") { 
      exclude group: "org.slf4j" 
     } 
    } 
} 

Antwort

0

Die Vervielfältigung von EventFilterApplication als Mutter root verursacht wird:

public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(new Object[]{EventFilterApplication.class}, args) 
      .from(EventFilterApplication.class) 
      .run(args); 
    } 
} 

Das ist sehr wahrscheinlich, zwei Abonnements zu erstellen. Stattdessen EventFilterApplication als Wurzel hinzugefügt haben, können Sie einfach tun:

public class AggregateApplication { 
    public static void main(String[] args) { 
     new AggregateApplicationBuilder(args) 
      .from(EventFilterApplication.class) 
      // rest of the pipeline 
      .run(args); 
    } 
} 

Wenn Sie nicht ein Aggregat erstellen müssen, das genug sein sollte:

public static void main(String[] args) { 
     SpringApplication.run(EventFilterApplication.class, args); 
} 

Edit: hat ein zusätzliches Beispiel und geklärt Antworten.

+0

Eigentlich brauche ich Aggregation, um mehrere Komponenten meiner Anwendung (zusammen mit EventFilterApplication) zu verwenden. Das Beispiel wurde viel vereinfacht, es gibt auch EventCheckerApplication und viele andere, die bei AggregateApplication assembled sind. –

+0

Das ist in Ordnung, aber fügen Sie 'EventFilterApplication' nicht als Argument für' AggregateApplicationBuilder' hinzu. –

+0

Vielen Dank für Ihre Hilfe, es funktioniert. Es bleibt zu verstehen, warum es so ist) –

Verwandte Themen