2016-05-20 3 views
0

Wenn ich gefolgt Code ausführen, fand ich, dass ich Nachricht aus der Warteschlange zu bekommen, aber die callback nichtPython Kombu Verbraucher get Nachrichten-Warteschlange aber der Rückruf auslösen kann nicht

ausgelöst
from kombu.mixins import ConsumerMixin 
from kombu import Exchange, Queue 

task_exchange = Exchange('nginx', type='direct') 
task_queues = [Queue(exchange=task_exchange, routing_key='nginx')] 

class Worker(ConsumerMixin): 

    def __init__(self, connection): 
     self.connection = connection 


    def get_consumers(self, Consumer, channel): 
     return [Consumer(queues=task_queues, 
         callbacks=[self.task] 
         )] 

    def task(self, body, message): 
     print body 
     message.ack() 
if __name__ == '__main__': 
    from kombu import Connection 
    with Connection('amqp://test:[email protected]:5672/test') as conn: 
     worker = Worker(conn) 
     worker.run() 

Ich versuche zu laufen python -m pdb test.py

170 ->  def run(self, _tokens=1): 
171    restart_limit = self.restart_limit 
172    errors = (self.connection.connection_errors + 
173      self.connection.channel_errors) 
174    while not self.should_stop: 
175     try: 
(Pdb) l 
176      if restart_limit.can_consume(_tokens): 
177       for _ in self.consume(limit=None): # pragma: no cover 
178        pass 
179      else: 
180       sleep(restart_limit.expected_time(_tokens)) 
181     except errors: 
182      warn(W_CONN_LOST, exc_info=1) 

Es Schleife bei

for _ in self.consume(limit=None): # pragma: no cover 
      pass 

Antwort

0

Run python -m pdb test.py, erhalten in den connection.drain_events() stellte die content.body Codierung binär ist,

   if (content and 
309      channel.auto_decode and 
310      hasattr(content, 'content_encoding')): 
311 ->    try: 
312      content.body = content.body.decode(content.content_encoding) #here get a error 
313     except Exception: 
314      pass 

fix it

def get_consumers(self, Consumer, channel): 
    return [Consumer(queues=task_queues, 
        accept=['json', 'pickle'], 
        callbacks=[self.task] 
        )] 
Verwandte Themen