2017-05-31 1 views
1

In Apache Camel 2.19.0, möchte ich Nachrichten produzieren und das Ergebnis asynchron in einer konkurrierenden seda-Warteschlange konsumieren und gleichzeitig blockieren, wenn die Executoren in der seda-Warteschlange voll sind. Der Anwendungsfall dahinter: Ich muss große Dateien mit vielen Zeilen verarbeiten und muss dafür Stapel erstellen, da eine einzelne Nachricht für jede einzelne Zeile zu viel Aufwand bedeutet, während ich die gesamte Datei nicht in den Heap einpassen kann. Aber am Ende muss ich wissen, ob alle Chargen, die ich ausgelöst habe, erfolgreich abgeschlossen wurden. So effektiv, ich brauche einen Back-Pressure-Mechanismus, um die Warteschlange zu spammen und gleichzeitig Multithread-Verarbeitung zu nutzen.Apache Camel: Async-Betrieb und Gegendruck

Hier ist ein kurzes Beispiel in Camel und Spring. Der Weg, den ich konfiguriert:

package com.test; 

import org.apache.camel.builder.RouteBuilder; 
import org.springframework.stereotype.Component; 

@Component 
public class AsyncCamelRoute extends RouteBuilder { 

    public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true"; 

    @Override 
    public void configure() throws Exception { 
     from(ENDPOINT) 
       .process(exchange -> { 
        System.out.println("Processing message " + (String)exchange.getIn().getBody()); 
        Thread.sleep(10_000); 
       }); 
    } 
} 

Der Hersteller sieht wie folgt aus:

package com.test; 

import org.apache.camel.ProducerTemplate; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.event.ContextRefreshedEvent; 
import org.springframework.context.event.EventListener; 
import org.springframework.stereotype.Component; 

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.CompletableFuture; 

@Component 
public class AsyncProducer { 

    public static final int MAX_MESSAGES = 100; 

    @Autowired 
    private ProducerTemplate producerTemplate; 

    @EventListener 
    public void handleContextRefresh(ContextRefreshedEvent event) throws Exception { 
     new Thread(() -> { 
      // Just wait a bit so everything is initialized 
      try { 
       Thread.sleep(5_000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      List<CompletableFuture> futures = new ArrayList<>(); 

      System.out.println("Producing messages"); 
      for (int i = 0; i < MAX_MESSAGES; i++) { 
       CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i)); 
       futures.add(future); 
      } 
      System.out.println("All messages produced"); 

      System.out.println("Waiting for subtasks to finish"); 
      futures.forEach(CompletableFuture::join); 
      System.out.println("Subtasks finished"); 
     }).start(); 

    } 
} 

Der Ausgang dieser Code wie folgt aussieht:

Producing messages 
All messages produced 
Waiting for subtasks to finish 
Processing message 6 
Processing message 1 
Processing message 2 
Processing message 5 
Processing message 8 
Processing message 7 
Processing message 9 
... 
Subtasks finished 

So scheint es, dass blockIfFull ignoriert und alle Nachrichten werden erstellt und vor der Verarbeitung in die Warteschlange gestellt.

Gibt es eine Möglichkeit, Nachrichten zu erstellen, so dass ich die Async-Verarbeitung in Camel verwenden kann, während gleichzeitig sichergestellt wird, dass Elemente in der Warteschlange blockiert werden, wenn zu viele unverarbeitete Elemente vorhanden sind?

+1

Können Sie 'requestBody (..)' anstelle von 'asyncRequestBody (..)' versuchen? Es kann sein, dass Sie am Ende viele blockierte Threads in einem Pool haben, der für die asynchrone Nachrichtenübertragung verwendet wird. Anstatt den Client-Thread zu blockieren. – Ralf

+0

Hallo @Ralf, ich verstehe Ihre Vorgehensweise nicht ganz - requestBody lässt den Client (Produzent) blockieren, bis der Consumer fertig ist. Während ich den Kunden blockieren möchte, wenn er die Verbraucher spammt, sollte er Nachrichten erstellen, solange es Verbraucher gibt. Ich löste es jedoch mit einem anderen Ansatz. –

+1

Das ist richtig. Aber wenn Sie etwas async machen, macht ein anderer Thread die Aufgabe, sich an seda zu wenden und auf die Antwort zu warten. Der Thread, in dem Sie die Schleife ausführen und asyncRequestBody (..) aufrufen, wird nicht blockiert, es sei denn, der Threadpool, der die asynchrone Task verarbeitet, ist erschöpft. Wenn jedoch Threads nach Bedarf im Pool erstellt werden, wird Ihr Looping-Thread nie blockiert. – Ralf

Antwort

0

Ich löste das Problem mit Streaming und einem benutzerdefinierten Splitter. Dadurch kann ich die Quellzeilen in Blöcke unterteilen, indem ich einen Iterator verwende, der eine Liste von Zeilen und nicht nur eine einzelne Zeile zurückgibt. Damit scheint mir, dass ich Camel nach Bedarf verwenden kann.

So enthält die Strecke den folgenden Abschnitt:

.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService) 

Mit einem maßgeschneiderten Teiler mit dem wie oben Verhalten.