2016-12-15 2 views
2

Wir müssen Warteschlangen in unserer Java EE-Anwendung verwenden und da es sich um eine Cloud-basierte Anwendung handelt (auf OpenShift Online), verwenden wir amazon sqs.Verwenden von Amazon sqs in einer @MessageDriven Bean - Pooling/Parallelverarbeitung

Wenn ich die Theorie des empfangenden Teils von JMS/Java EE richtig verstehe, wird eine Bean @MessageDriven vom Java EE Container verwaltet, so dass viele Bean Instanzen parallel erzeugt werden (entsprechend der maximalen Poolgröße) Anzahl der eingehenden Nachrichten ist hoch. Dies ist natürlich ein großer Vorteil, um hohe Lasten zu verarbeiten.

Allerdings sehe ich nicht, wie wir aws sqs auf diese Weise in einer Java EE-Anwendung integrieren können. Ich kenne die asynchronen Empfänger Beispiele aus http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

class MyListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     try { 
      // Cast the received message as TextMessage and print the text to screen. 
      if (message != null) { 
       System.out.println("Received: " + ((TextMessage) message).getText()); 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

und dann:

// Create a consumer for the 'TestQueue'. 
MessageConsumer consumer = session.createConsumer(queue); 

// Instantiate and set the message listener for the consumer. 
consumer.setMessageListener(new MyListener()); 

// Start receiving incoming messages. 
connection.start(); 

Dies ist das offizielle asynchrone Empfänger Beispiel -, die keine @MessageDriven Bohne ist. Es ist offensichtlich, dass wir irgendwo die Anmeldeinformationen benötigen, um sich zu authentifizieren (indem wir eine SQSConnectionFactory, dann eine Verbindung und dann eine Sitzung erstellen - was auch im Beispiel gut beschrieben ist).
Aber ich nehme stark an, dass dieses Beispiel die Nachrichten nicht parallel verarbeiten wird - d. H. Nur eine Bean-Instanz verarbeitet die Warteschlange und dies ist keine gute Lösung für skalierbare, hoch ausgelastete Anwendungen.

a) Wie können wir den echten Java EE Weg mit Amazon SQS gehen? Ich finde nur ein paar Spring-Beispiele. Aber es muss Java EE 7 sein.

b) Wir verwenden Wildfly (derzeit 8.2.1). Wäre es auch möglich, Wildfly intern die Verbindung zu AWS und der Anwendung verwalten zu lassen, könnten wir die Warteschlange so verwenden, als wäre sie eine vom Application Server verwaltete Warteschlange (der gleiche Ansatz wie Datenquellen für den DB-Zugriff)?

Fazit bekam nach einer Antwort von stdunbar:
Es ist nicht in einem ‚richtigen Weg‘ möglich zu sein scheint, was ich tun möchte. Also was soll ich tun? Implementieren Sie eine ManagedExecutorService als stdunbar beschrieben, um die Warteschlange zu "wickeln"? - Allerdings bedeutet dies, dass es auch eine lokale Warteschlange gibt, und das ist keine gute Situation für eine Anwendung, die skalierbar sein sollte ?! Was ist mit Alternativen? Wir führen die Anwendung auf OpenShift Online aus. Es wäre wahrscheinlich besser, einen eigenen Gang mit z.B. ApacheMQ Cartridge ... es gibt natürlich viele Nachteile wie Kosten und dass wir für die "Infrastruktur" verantwortlich sind.

Um ehrlich zu sein, ich bin wirklich enttäuscht von AWS in diesem Fall ...

Antwort

2

Ich glaube nicht, dass meine Lösung JAVA EE richtig ist, aber in meinem Fall funktioniert es.

Konfiguration:

@Singleton 
public class SqsMessageManager 
{ 
    private Integer numberOfReceivers = 3; 

    public static SQSConnection connection = null; 
    public static Queue queue = null; 

    @Inject 
    SqsMessageReceiver sqsMessageReceiver; 

    public void init() 
    { 
     try 
     { 
      SQSConnectionFactory connectionFactory = 
        SQSConnectionFactory.builder() 
          .withRegion(Region.getRegion(Regions.EU_WEST_1)) 
          .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider()) 
          .build(); 

      connection = connectionFactory.createConnection(); 

      queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue"); 

      for (int i = 0; i < numberOfReceivers; i++) 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver); 

      connection.start(); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

Dann wird der Absender:

@Dependent 
public class SqsMessageSender 
{ 
    MessageProducer producer = null; 
    Session senderSession = null; 

    @PostConstruct 
    public void createProducer(){ 
     try 
     { 
      // open new session and message producer 
      senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      producer = senderSession.createProducer(SqsMessageManager.queue); 
     }catch(JMSException | NullPointerException e){ 
      ; 
     } 
    } 

    @PreDestroy 
    public void destroy(){ 
     try 
     { 
      // close session 
      producer.close(); 
      senderSession.close(); 
     }catch(JMSException e){ 

     } 
    } 

    // sends a message to aws sqs queue 
    public void sendMessage(String txt) 
    { 
     try 
     { 
      TextMessage textMessage = senderSession.createTextMessage(txt); 
      producer.send(textMessage); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

Und der Empfänger:

@Dependent 
public class SqsMessageReceiver implements MessageListener 
{ 
    public void onMessage(Message inMessage) { 
     ... 
    } 
} 
+0

Ich sehe nicht genau, wie Ihre ** numberOfReceivers ** funktioniert. Sie erstellen mehrere Listener für dasselbe Objekt (Sie injizieren 'sqsMessageReceiver', was tatsächlich eine Instanz ist)? – badera

3

Nach einigen älteren docs I found

einem Behälter ermöglicht viele Instanzen eines Message-Driven Bean-Klasse zu sein gleichzeitig ausgeführt werden und somit die gleichzeitige Verarbeitung eines Nachrichtenstroms ermöglichen.

die Amazon JMS-Integration Durch die Verwendung in Verbindung mit einem deklarativen MDB, sollten Sie gut zu gehen. Ich würde die setMessageListener-Schnittstelle nicht verwenden. Sie können die deklarative Version von JMS verwenden, seit Sie auf Wildfly 8 sind.x/EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ }) 
public class MyListener implements MessageListener { 
    @Override 
    public void onMessage(Message message) { 
    } 
} 

Dies ist der Behälter so viele Instanzen erstellen kann je nach Bedarf. Beachten Sie, dass in Wildfly für die JMS-Parameter möglicherweise einige Anpassungen erforderlich sind.

Als eine Nebenbemerkung, lassen Sie die Amazon-Bibliotheken für das Lesen der SQS-Warteschlange sorgen. Ich habe angefangen, meinen eigenen Leser zu rollen, denkend, dass ich ihn fädeln könnte. Aber ich habe festgestellt, dass Sie die AWS Java-Bibliotheken nicht mit mehreren Threads verwenden können, die aus der Warteschlange lesen, da Sie fast immer doppelte Lesevorgänge erhalten. Ich hatte 4 Threads, die die SQS-Warteschlange lesen und würde 4 der gleichen Nachricht erhalten. Ich änderte schließlich zu einem einzelnen Leser, der die Mitteilung in ein LinkedBlockingDeque setzt, um durch einige andere Threads verbraucht zu werden.

Alles, was ich gezeigt habe, ist reines Java/EE.

EDIT
Nach dem Amazon SQS/JMS-Integration spielen um einige ich das Gefühl, dass Sie Ihre Zeit verschwenden, wenn Sie es verwenden. Es ist nur für JMS 1.1, so dass es auch die alte JMS-Syntax mit JNDI verwendet. Darüber hinaus funktioniert es nur für Warteschlangen, nicht auch für Themen.

Ich würde dringend empfehlen, Ihre eigene Implementierung zu erstellen. Ein ManagedExecutorService, der einen Warteschlangenlese-Thread mit einem kurzen SQS-Lesezeitlimit ausführt. Jede Schleife wird aus der SQS-Warteschlange gelesen und die Nachrichten in die JMS-Warteschlange oder das Thema eingefügt.

Entschuldigung, dass Sie sich Hoffnungen gemacht haben - Amazon hat es einfach nicht genug gepflegt, um sich zu lohnen.

+0

Vielen Dank, stdunbar. Ich bin froh zu hören, dass das funktionieren sollte - aber was ich nicht weiß ist, wie man die benötigten Parameter (Credentials) ins Spiel bringt. Wo stelle ich die AWS-Berechtigung ein? Und wie erreiche ich Ihre Notiz? Als Nebenbemerkung: Lassen Sie die Amazon-Bibliotheken sich um das Lesen der SQS-Warteschlange kümmern? - Ich denke, das Kommentieren mit '@ MessageDriven' wird den Anwendungsserver veranlassen, die Warteschlange zu lesen ?! - Ich weiß es zu schätzen, wenn Sie etwas konkreter darin sein könnten, wie Sie AWS SQS in Ihren Code integrieren. – badera

+0

Ich vermute, dass Sie Recht haben. Das sind keine guten Nachrichten. Wenn ich Sie richtig verstehe, ist es nur das SDK, das "nicht auf dem neuesten Stand" ist, und nicht der Dienst an sich? - Ich habe die Frage mit einer Schlussfolgerung und einer weiteren Frage aktualisiert. – badera

+0

Die mit b) gekennzeichnete Frage ist noch unbeantwortet. In der Hoffnung, noch ein paar Ideen zu bekommen, was zu tun ist, starte ich eine Bounty ... danke trotzdem so weit [+1]! – badera

0

Payara Cloud Connectors scheint ziemlich neu zu sein, aber sieht vielversprechend aus. Ich weiß nicht, ob das auch mit anderen Containern funktioniert. Soweit ich weiß, basiert es auf JCA-Adaptern.

Verwandte Themen