2017-03-23 5 views
1

Ich empfange Nachrichten von einem Dienst (S), der jede einzelne Eigenschaftsänderung für eine Entität als separate Nachricht veröffentlicht. Ein konstruiertes Beispiel würde ein Unternehmen wie diese:Gruppe hat Nachrichten in RabbitMQ empfangen, vorzugsweise mit Spring AMQP?

Person { 
    id: 123 
    name: "Something", 
    address: {...} 
} 

Wenn Name und Adresse in der gleichen Transaktion aktualisiert werden dann (S) zwei Nachrichten veröffentlichen, PersonNameCorrected und PersonMoved. Das Problem ist auf der Empfangsseite, wo ich eine Projektion dieser Person Entität speichern und jede Eigenschaftsänderung verursacht ein Schreiben in die Datenbank. In diesem Beispiel gäbe es also zwei Schreibvorgänge in der Datenbank, aber wenn ich die Nachrichten für kurze Zeit stapelweise gruppieren und nach ID gruppieren könnte, müsste ich nur einen einzigen Schreibvorgang in der Datenbank durchführen.

Wie geht das normalerweise in RabbitMQ? Bietet Spring AMQP eine einfachere Abstraktion?

Beachten Sie, dass ich kurz auf prefetch geschaut habe, aber ich bin mir nicht sicher, ob dies der richtige Weg ist. Auch Prefetch, wenn ich es richtig verstehe, ist pro Verbindung Basis. Ich versuche, dies auf einer per-Warteschlange Basis zu erreichen, denn wenn Batching (und damit Latenz hinzugefügt) ist der Weg zu gehen möchte ich nicht diese Latenz zu allen Warteschlangen von meinem Dienst verbraucht hinzufügen (aber nur zu diejenigen, die die "Group-by-ID" -Funktionen benötigen.

Antwort

2

Prefetch wird für einen Fall wie diesem nicht helfen.

Verwenden Sie Spring Integration mit Adaptern, die auf Spring AMQP sitzen; Es bietet auch einen Aggregator, mit dem Nachrichten zusammengefasst werden können, bevor sie an die nächste Stufe in der Pipeline gesendet werden.

EDIT

Hier ist ein schnelles Boot-App zu demostrate ...

@SpringBootApplication 
public class So42969130Application implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(So42969130Application.class, args) 
      .close(); 
    } 

    @Autowired 
    private RabbitTemplate template; 

    @Autowired 
    private Handler handler; 

    @Override 
    public void run(String... args) throws Exception { 
     this.template.convertAndSend("so9130", new PersonNameChanged(123)); 
     this.template.convertAndSend("so9130", new PersonMoved(123)); 
     this.handler.latch.await(10, TimeUnit.SECONDS); 
    } 

    @Bean 
    public IntegrationFlow flow(ConnectionFactory connectionFactory) { 
     return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "so9130") 
         .messageConverter(converter())) 
       .aggregate(a -> a 
         .correlationExpression("payload.id") 
         .releaseExpression("false") // open-ended release, timeout only 
         .sendPartialResultOnExpiry(true) 
         .groupTimeout(2000)) 
       .handle(handler()) 
       .get(); 
    } 

    @Bean 
    public Jackson2JsonMessageConverter converter() { 
     return new Jackson2JsonMessageConverter(); 
    } 

    @Bean 
    public Handler handler() { 
     return new Handler(); 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("so9130", false, false, true); 
    } 

    public static class Handler { 

     private final CountDownLatch latch = new CountDownLatch(1); 

     @ServiceActivator 
     public void handle(Collection<?> aggregatedData) { 
      System.out.println(aggregatedData); 
      this.latch.countDown(); 
     } 

    } 

    public static class PersonNameChanged { 

     private int id; 

     PersonNameChanged() { 
     } 

     PersonNameChanged(int id) { 
      this.id = id; 
     } 

     public int getId() { 
      return this.id; 
     } 

     public void setId(int id) { 
      this.id = id; 
     } 

     @Override 
     public String toString() { 
      return "PersonNameChanged [id=" + this.id + "]"; 
     } 

    } 

    public static class PersonMoved { 

     private int id; 

     PersonMoved() { 
     } 

     PersonMoved(int id) { 
      this.id = id; 
     } 

     public int getId() { 
      return this.id; 
     } 

     public void setId(int id) { 
      this.id = id; 
     } 

     @Override 
     public String toString() { 
      return "PersonMoved [id=" + this.id + "]"; 
     } 

    } 

} 

Pom:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>com.example</groupId> 
    <artifactId>so42969130</artifactId> 
    <version>2.0.0-BUILD-SNAPSHOT</version> 
    <packaging>jar</packaging> 

    <name>so42969130</name> 
    <description>Demo project for Spring Boot</description> 

    <parent> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter-parent</artifactId> 
     <version>1.5.2.RELEASE</version> 
     <relativePath/> <!-- lookup parent from repository --> 
    </parent> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
     <java.version>1.8</java.version> 
    </properties> 

    <dependencies> 
     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-integration</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-amqp</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.integration</groupId> 
      <artifactId>spring-integration-java-dsl</artifactId> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework.boot</groupId> 
      <artifactId>spring-boot-starter-test</artifactId> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.springframework.boot</groupId> 
       <artifactId>spring-boot-maven-plugin</artifactId> 
      </plugin> 
     </plugins> 
    </build> 


</project> 

Ergebnis:

2017-03-23 09:56:57.501 INFO 75217 --- [ask-scheduler-2] .s.i.a.AbstractCorrelatingMessageHandler : 
    Expiring MessageGroup with correlationKey[123] 
[PersonNameChanged [id=123], PersonMoved [id=123]] 
+0

Danke für die p outer zur Frühlingsintegration. Wenn es nicht zu viel verlangt ist, würden Sie zufällig ein Beispiel kennen, das ich als Kickstarter für meinen Anwendungsfall verwenden könnte? – Johan

+1

Ich habe meine Antwort mit einem Beispiel aktualisiert, das ausreichen sollte, um loszulegen. –

+0

Ich weiß, dass ich SO kurz sein sollte, aber muss sagen, dass das großartig ist, ich hätte keine bessere Antwort verlangen können, also danke :). – Johan

Verwandte Themen