2017-04-24 3 views
0

verarbeitet Ich möchte einen Verbraucher schaffen, die ankommenden Nachrichten stapelt und wartet:Wie empfangenen Nachrichten stapeln und im Batch-

  • bis n Nachrichten eingetroffen sind.
  • t Sekunden sind verstrichen.

, um den gesamten Nachrichtenstapel zu verarbeiten.

Pre-fetching ist nicht das, was ich suche. Was ich wirklich brauche, ist Prozess Nachrichten zusammen.

class MyListener(stomp.ConnectionListener): 

    def on_message(self, headers, body): 

     print ("Just received ONE message\n" 
       "I should wait for n-1 others\n" 
       "or t seconds before processing") 
+0

Ich denke, was Sie brauchen, ist eine Warteschlange Implementierung in Ihrem Verbraucher, ich bin nicht sicher, wie es in Python getan wird, sondern versuchen, für Warteschlange Implementierung suchen und verwenden Sie es als pro Ihre Anforderung. – hagrawal

Antwort

0

hier ein Beispiel

import java.util.LinkedList; 
import java.util.List; 

import javax.jms.Connection; 
import javax.jms.JMSException; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.ActiveMQMessageConsumer; 

public class SimpleConsumerClientAcknowledge { 

    public static void main(String[] args) throws JMSException { 
     List<TextMessage> messages = new LinkedList<>(); 
     Connection conn = null; 
     try { 
      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
        "tcp://localhost:61617?jms.prefetchPolicy.all=200"); 
      conn = cf.createConnection("admin", "admin"); 
      Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
      ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) session 
        .createConsumer(session.createQueue("Q")); 
      conn.start(); 
      TextMessage msg = null; 
      // MAX_MESSAGES have to be < prefetchSize/2 --> 
      // jms.prefetchPolicy.all=200 
      // Once the broker has dispatched a prefetch limit number of 
      // messages to a consumer it will not dispatch any more messages to 
      // that consumer until the consumer has acknowledged at least 50% of 
      // the prefetched messages 
      int MAX_MESSAGES = 100; 
      long MAX_WAIT = 60000; 
      long millis = System.currentTimeMillis(); 
      while ((msg = (TextMessage) consumer.receive(5000)) != null) { 
       if (msg != null) { 
        messages.add(msg); 
       } 
       if (messages.size() == MAX_MESSAGES || (System.currentTimeMillis() - millis >= MAX_WAIT)) { 
        millis = System.currentTimeMillis(); 
        treatMessages(messages); 
        // because session is created with 
        // Session.CLIENT_ACKNOWLEDGE as an acknowledgeMode consumer 
        // need to acknowledge manually received messages 
        consumer.acknowledge(); 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      if (conn != null) { 
       try { 
        conn.close(); 
       } catch (Exception e) { 
       } 
      } 
     } 
    } 

    private static void treatMessages(List<TextMessage> messages) { 
     // TODO Auto-generated method stub 
     messages.clear(); 
    } 
} 
Verwandte Themen