2016-07-01 7 views
0

Ich versuche, einige Tests zu schreiben, wo ich eine Nachricht an eine Warteschlange erstellen und sehen, ob die Nachricht in der Anwendung ordnungsgemäß verbraucht und verarbeitet wird.So verwenden Sie den In-Memory-Transport für Komponententests richtig

Dafür spiele ich mit der Kombu-Bibliothek und vor allem die In-Memory-Transport-Implementierung.

Noch kann ich es nicht funktionieren, dass die produzierte Nachricht verbraucht wird. daher

ist meine Fragen, ob jemand eine einfache Unit-Test zur Verfügung stellen kann, die erzeugt und verbraucht eine Nachricht im Speicher

Antwort

2

Sie finden diese auf den Code anpassen möchten Sie zu testen sind versucht, aber die grundlegende Was Sie suchen, ist die amqp URI für den In-Memory-Controller, die "Speicher: //" ist. Als ein wirklich einfaches Beispiel:

conn = kombu.Connection("memory://") 
queue = conn.SimpleQueue('myqueue') 
queue.put('test') 
msg = queue.get(timeout=1) 
msg.ack() 
print(msg.payload) 
Verwandte Themen