2013-11-04 4 views
6

Ein Projekt, an dem ich arbeite, erfordert das Lesen von Nachrichten von SQS, und ich beschloss, Akka zu verwenden, um die Verarbeitung dieser Nachrichten zu verteilen.Consumer Poll Rate mit Akka, SQS und Camel

Da SQS von Camel unterstützt wird und die Funktionalität für Akka in der Consumer-Klasse integriert ist, dachte ich, es wäre am besten, den Endpunkt zu implementieren und Nachrichten auf diese Weise zu lesen, obwohl ich nicht viele Beispiele dafür gesehen habe Leute, die das tun.

Mein Problem ist, dass ich meine Warteschlange nicht schnell genug abfragen kann, um meine Warteschlange leer oder fast leer zu halten. Was ich ursprünglich dachte war, dass ich einen Consumer dazu bringen konnte, Nachrichten über Camel von SQS mit einer Rate von X/s zu empfangen. Von dort konnte ich einfach mehr Consumers erstellen, um die Geschwindigkeit zu erreichen, mit der ich die verarbeiteten Nachrichten benötigte.

Mein Verbraucher:

import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

Wie gezeigt, habe ich delay=1 sowie &maxMessagesPerPoll=10 setze die Rate von Nachrichten zu verbessern, aber ich bin nicht in der Lage mehr Verbraucher mit dem gleichen Endpunkt, um zu laichen.

las ich in der Dokumentation, dass By default endpoints are assumed not to support multiple consumers. und ich glaube, dass dies auch für die SQS-Endpunkte gilt, als mehrere Verbraucher Laichen wird mir nur einen Verbraucher geben, wo nach dem System für eine Minute ausgeführt wird, ist die Ausgangsnachricht Count for actor: x statt dem andere, die Count for actor: 0 ausgeben.

Wenn das überhaupt sinnvoll ist; Ich kann ungefähr 33 Nachrichten/Sekunde mit dieser aktuellen Implementierung auf dem einzelnen Verbraucher lesen.

Ist dies der richtige Weg, Nachrichten aus einer SQS-Warteschlange in Akka zu lesen? Wenn ja, kann ich das nach außen skalieren, so dass ich meine Nachrichtenverbrauchsrate näher an die von 900 Nachrichten/Sekunde erhöhen kann?

Antwort

5

Leider unterstützt Camel derzeit nicht den parallelen Verbrauch von Nachrichten auf SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

Um dem abzuhelfen ich meine eigenen Schauspieler geschrieben habe mit dem aws-java-sdk Batch-Nachrichten SQS abzufragen.

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

Obwohl ich nicht sicher bin, ob dies die beste Umsetzung ist, und es könnte natürlich verbessert werden, auf, so dass Anfragen nicht ständig eine leere Warteschlange schlagen, es ist meine aktuellen Anforderungen entspricht die Lage zu pollen Nachrichten aus derselben Warteschlange.

Von diesem Punkt 133 (Nachrichten/Sekunde)/Akteur von SQS.

1

Camel 2.15 unterstützt concurrentConsumers, obwohl nicht sicher, wie nützlich das ist, in dem ich nicht weiß, ob akka camel support 2.15 und ich weiß nicht, ob ein Consumer-Akteur einen Unterschied macht, selbst wenn es mehrere Verbraucher gibt.