2017-05-14 5 views
0

Ich habe die Bibliothek als Bus freigegeben, und ich versuche, Nachrichten von rabbitmq aber ConsumerOnReceived erhalten nie ausgelöst werden.Empfangen wird nicht ausgelöst

namespace Bus 
{ 
    public class MessageListener 
    { 

    private static IConnection _connection; 
    private static IModel _channel; 

    public void Start(string hostName, int port, string queueName) 
    { 
     var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
     using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      channel.QueueDeclare(queue: queueName, 
           durable: false, 
           exclusive: false, 
           autoDelete: false, 
           arguments: null); 

      var consumer = new EventingBasicConsumer(channel); 
      consumer.Received += ConsumerOnReceived; 

      channel.BasicConsume(queue: queueName, 
           noAck: true, 
           consumer: consumer); 
     } 
    } 

    public static void Stop() 
    { 
     _channel.Close(200, "Goodbye"); 
     _connection.Close(); 
    } 

    public virtual void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
    { 
     var body = ea.Body; 
     var message = Encoding.UTF8.GetString(body); 
    } 
} 

public static class MessageSender 
{ 
    public static void Send(string hostName, int port, string queueName, string message) 
    { 
     var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
     using (var connection = factory.CreateConnection()) 
     using (var channel = connection.CreateModel()) 
     { 
      channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); 

      var body = Encoding.UTF8.GetBytes(message.ToString()); 

      channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body); 
     } 
    } 
} 

} 

Kern

namespace Core 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      new MessageListener().Start("localhost", 5672, "MakePayment"); 

      Console.WriteLine("Core Service"); 
      string line = Console.ReadLine(); 
     } 

    } 
} 

namespace Core 
{ 
    public class MessageListener : Bus.MessageListener 
    { 
     public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) 
     { 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
     } 
    } 

} 

Antwort

1

Das Problem ist hier

channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); 

jedoch BasicConsume daher keine Sperr Methode ist, wenn Sie anrufen Start Sie eine Verbindung und einen Kanal erstellen, aber dann wird es sofort entsorgt.

Nachstehend ist NICHT eine Lösung, aber Sie können, indem Sie folgende bestätigen:

channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); 
Console.ReadKey();//←Added Line 

Ihr Programm funktioniert auf diese Weise.

Dies ist meine vorgeschlagene Lösung. Bitte beachten Sie, dass _channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer); auf einem anderen Thread starten, so müssen Sie while(...)

using System; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 

namespace Bus { 

    public abstract class BaseMessageListener { 
     private static IModel _channel; 
     private static IConnection _connection; 

     public abstract void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea); 

     public void Start(string hostName, int port, string queueName) { 
      var factory = new ConnectionFactory() { HostName = hostName, Port = port }; 
      _connection = factory.CreateConnection(); 
      _channel = _connection.CreateModel(); 
      _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false); 
      var consumer = new EventingBasicConsumer(_channel); 
      consumer.Received += ConsumerOnReceived; 
      _channel.BasicConsume(queue: queueName, noAck: true, consumer: consumer);//This will start another thread! 
     } 

     public void Stop() { 
      _channel.Close(200, "Goodbye"); 
      _connection.Close(); 
     } 
    } 
} 

namespace StackOverfFLow.RabbitMQSolution { 

    using Bus; 

    public class MessageListener : BaseMessageListener { 

     public override void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea) { 
      var body = ea.Body; 
      var message = Encoding.UTF8.GetString(body); 
      Console.WriteLine(message); 
     } 
    } 

    internal class Program { 

     private static void Main(string[] args) { 
      var listener = new MessageListener(); 
      listener.Start("localhost", 5672, "MakePayment"); 
      Console.WriteLine("Core Service Started!"); 
      Console.ReadKey(); 
      listener.Stop(); 
     } 
    } 
} 
+0

in die Tat Dies funktioniert aber eine bessere Praxis ist es nicht verwenden, dann ReadKey zu handhaben? – Mert

+0

Ich löste dies mit "while (_channel.IsOpen) {}" – Mert

+1

@Mert, ich bin nicht genau sicher, wo Sie 'while (_channel.IsOpen) {}' setzen, aber wenn Sie es hinter 'channel.BasicConsume (queue: queueName, noAck: true, consumer: consumer); 'diese Zeile; Ich glaube nicht, dass du das brauchst. Bitte sehen Sie meine aktualisierte Antwort. –

Verwandte Themen