2017-09-21 1 views
0

Ich versuche, Heller für die Befehlsquellenermittlung zu verwenden. Ich habe eine Lösung, die einen .NET Core Web API-Dienst zum Einreihen von Nachrichten in eine Warteschlange und eine andere Lösung mit einem .NET Core-Konsolenprojekt zum Abrufen von Nachrichten aus der Warteschlange enthält. Die Dienste sind isoliert und nicht in der gleichen Lösung.Hellere Consumer-Nachrichten, die nicht an Handler weitergeleitet werden

Der Nachrichten-Dispatcher zieht die Nachrichten aus Rabbit und leitet sie an den MessageMapper weiter, aber die Nachricht findet ihren Weg zum Handler nicht, um verarbeitet zu werden.

Visual Studio 2015, .NET Core 1.1, Paramore.Brighter.MessagingGateway.RMQ 7.1.5, Paramore.Brighter.ServiceActivator 7.1.5, StructureMap.Microsoft.DependencyInjection 1.4.0.

Die Konfiguration in der Konsole App:

public static void Main(string[] args) 
{ 
    RetryPolicy retryPolicy = Policy.Handle<Exception>().WaitAndRetry(new List<TimeSpan>() 
    { 
     TimeSpan.FromMilliseconds(50), 
     TimeSpan.FromMilliseconds(100), 
     TimeSpan.FromMilliseconds(150) 
    }); 

    CircuitBreakerPolicy circuitBreakerPolicy = Policy.Handle<Exception>().CircuitBreaker(1, TimeSpan.FromMilliseconds(500)); 
    PolicyRegistry policyRegistry = new PolicyRegistry() { { CommandProcessor.RETRYPOLICY, retryPolicy }, { CommandProcessor.CIRCUITBREAKER, circuitBreakerPolicy } }; 

    var subscriberRegistry = new SubscriberRegistry(); 
    subscriberRegistry.Register<ApplicationUpdateCommand, ApplicationUpdateCommandHandler>(); 

    var rmqConnnection = new RmqMessagingGatewayConnection 
    { 
     AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:[email protected]:5672/%2f")), 
     Exchange = new Exchange("api.coverage.exchange"), 
    }; 

    var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnnection); 
    var rmqMessageProducerFactory = new RmqMessageProducerFactory(rmqConnnection); 

    Dispatcher dispatcher = null; 
    var container = new Container(); 
    container.Configure(config => 
    { 
     config.For<IHandleRequests<ApplicationUpdateCommand>>().Use<ApplicationUpdateCommandHandler>(); 
     var servicesMessageMapperFactory = new ServicesMessageMapperFactory(container); 
     var messageMapperRegistry = new MessageMapperRegistry(servicesMessageMapperFactory) 
     { 
      {typeof(ApplicationUpdateCommand), typeof(ApplicationUpdateCommandMessageMapper) } 
     }; 

     var servicesHandlerFactory = new ServicesHandlerFactory(container); 

     var commandProcessor = CommandProcessorBuilder.With() 
      .Handlers(new HandlerConfiguration(subscriberRegistry, servicesHandlerFactory)) 
      .Policies(policyRegistry) 
      .NoTaskQueues() 
      .RequestContextFactory(new InMemoryRequestContextFactory()) 
      .Build(); 

      dispatcher = DispatchBuilder.With() 
             .CommandProcessor(commandProcessor) 
             .MessageMappers(messageMapperRegistry) 
             .DefaultChannelFactory(new InputChannelFactory(rmqMessageConsumerFactory, rmqMessageProducerFactory)) 
             .Connections(new List<Connection>() 
             { 
              new Connection<ApplicationUpdateCommand> 
              (
               new ConnectionName("Application.Update"), 
               new ChannelName("Application.Update"), 
               new RoutingKey("Application.Update") 
              ) 
             }).Build(); 
     }); 

     dispatcher.Receive(); 

     Console.WriteLine("Press enter to stop ..."); 
     Console.ReadLine(); 

     dispatcher.End().Wait(); 
    } 

-Code für den MessageMapper, Befehl und Handler:

public class ApplicationUpdateCommandMessageMapper : IAmAMessageMapper<ApplicationUpdateCommand> 
{ 
    public Message MapToMessage(ApplicationUpdateCommand request) 
    { 
     var header = new MessageHeader(messageId: request.Id, topic: "Application.Update", messageType: MessageType.MT_EVENT); 
     var body = new MessageBody(JsonConvert.SerializeObject(request)); 
     var message = new Message(header, body); 
     return message; 
    } 

    public ApplicationUpdateCommand MapToRequest(Message message) 
    { 
     // dispatcher will route message here but that is it 
     ApplicationUpdateCommand command = JsonConvert.DeserializeObject<ApplicationUpdateCommand>(message.Body.Value); 
     return command; 
    } 
} 

public class ApplicationUpdateCommand : Command 
{ 
    public int ApplicationId { get; private set; } 
    public string ApplicantName { get; private set; } 

    public ApplicationUpdateCommand(Guid id, int applicationId, string applicantName) 
     : base(id) 
    { 
     ApplicationId = applicationId; 
     ApplicantName = applicantName; 
    } 
} 

public class ApplicationUpdateCommandHandler : RequestHandler<ApplicationUpdateCommand> 
{ 
    private readonly IAmACommandProcessor _commandProcessor; 

    public ApplicationUpdateCommandHandler(IAmACommandProcessor commandProcessor) 
    { 
     _commandProcessor = commandProcessor; 
    } 

    public override ApplicationUpdateCommand Handle(ApplicationUpdateCommand command) 
    { 
     // would like to get here to handle command 

     return base.Handle(command); 
    } 
} 

Antwort

1

Sie als MessageType.MT_EVENt im Header identifizieren, sondern Command sind abzuleiten. Die beiden sollten zustimmen, entweder von Event ableiten oder MT_COMMAND

verwenden
Verwandte Themen