2016-04-28 5 views
1

Neu in MassTransit und immer noch mit einigen der Tutorial-Projekte herumspielen. Ich werde einen Service haben, der für vielleicht 20 Minuten läuft und ich muss etwas tun, wenn es fertig ist. Weil es so lange dauern kann, möchte ich dem Anfrage/Antwort-Muster nicht folgen und auf die Antwort warten, indem ich den Thread hochhebe. Ich denke, meine andere Option ist das Erstellen einer anderen Warteschlange nur für den Verbraucher zu veröffentlichen, wenn der Job erledigt ist. Ich habe diesen Beitrag angesehen: MassTransit3 how to make request from consumer, aber ich bin mir nicht sicher, wie dies zu implementieren ist. Meine Projekte, wieder aus this Tutorial, sieht wie folgt aus:MassTransit: Erstellen einer Rückrufwarteschlange in Consumer

Verlag:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => {})); 
    var busHandle = bus.Start(); 
    var text = ""' 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     } 
     bus.Publish(message); 
    } 
    busHandle.Stop(); 
} 

Abonnenten:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost"), h => {}); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

Verbraucher:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + DateTime.Now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 
     return Task.FromResult(0); 
    } 
} 

Wie würde ich mich über eine Schaffung Rückruf-Warteschlange im Verbraucher?

Antwort

1

In Ihrem Verbraucher, nur Bus.Publish(new ResponseMessage()); (oder was auch immer Sie Ihre Antwort nennen) und lassen Sie Ihren Verleger einen Verbraucher für diesen Nachrichtentyp registrieren. Ihr Publisher scheint nicht an eine Warteschlange gebunden zu sein. Erstellen Sie einfach einen Warteschlangennamen und binden Sie ihn auch an eine Warteschlange.

1

Nochmals vielen Dank an @Travis für die Hilfe. Ich wollte nur den endgültigen Code zeigen, für den ich in der Zukunft jemanden gefunden habe. Das Messaging sieht für die Antwort lustig aus, es wird jedoch korrekt zurück an den Publisher gepostet.

Verlag:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestPublisher", e => 
      e.Consumer<ResponseConsumer>()); 
    }); 
    var busHandle = bus.Start(); 
    var text = ""; 
    Console.WriteLine("Publisher"); 
    while(text != "quit") 
    { 
     Console.Write("Enter a message: "); 
     text = Console.ReadLine(); 

     var message = new SomethingHappenedMessage() 
     { 
      What = text, 
      When = DateTime.Now 
     }; 
     bus.Publish(message); 
    } 

    busHandle.Stop(); 
} 

Antwort Verbraucher:

class ResponseConsumer : IConsumer<IResponse> 
{ 
    public Task Consume(ConsumeContext<IResponse> context) 
    { 
     Console.WriteLine("RESPONSE MESSAGE: " + context.Message.Message); 
     return Task.FromResult(0); 
    } 
} 

Abonnenten:

static void Main(string[] args) 
{ 
    var bus = Bus.Factory.CreateUsingRabbitMq(x => 
    { 
     var host = x.Host(new Uri("rabbitmq://localhost/"), h => { }); 
     x.ReceiveEndpoint(host, "MtPubSubExample_TestSubscriber", e => 
      e.Consumer<SomethingHappenedConsumer>()); 
    }); 
    Console.WriteLine("Subscriber"); 
    var busHandle = bus.Start(); 
    Console.ReadKey(); 
    busHandle.Stop(); 
} 

Subscriber Verbraucher:

class SomethingHappenedConsumer : IConsumer<ISomethingHappened> 
{ 
    private IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x => 
     x.Host(new Uri("rabbitmq://localhost/"), h => { })); 

    public Task Consume(ConsumeContext<ISomethingHappened> context) 
    { 
     var now = DateTime.Now; 
     Console.Write("TXT: " + context.Message.What); 
     Console.Write(" SENT: " + context.Message.When); 
     Console.Write(" PROCESSED: " + now); 
     Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")"); 

     var response = new ResponseMessage() 
     { 
      Message = "The request was processed at " + now 
     }; 

     bus.Publish(response); 
     return Task.FromResult(0); 
    } 
} 
Verwandte Themen