2016-06-16 19 views
0

https://github.com/mdevilliers/SignalR.RabbitMq/issues/43SignalR + RabbitMQ Backplane: SignalR Client empfängt gleiche Nachricht wiederholt

ich wirkliche Schwierigkeit habe dies immer zu arbeiten, kann jemand bitte Blick auf diese und sehen, ob etwas im Setup falsch ist?

Hier ist mein Test:

ich zwei selbst gehosteten Servern spin up, auf den gleichen RabbitMqScaleoutConfiguration konfiguriert

[Test] 
    public void BasicBackplaneTest() 
    { 
     SubsciberTestServerNode nodeA = null; 
     SubsciberTestServerNode nodeB = null; 

     string messageA = null; 
     string messageB = null; 
     try 
     { 
      Log("Given I have a WorkCenter Dispatch Publisher"); 
      var publisher = new WcDispatchPublisher(ConnectionString); 

      Log("And I have multiple server nodes subscribed to the backplane"); 
      nodeA = new SubsciberTestServerNode("nodeA").Start().Result; 
      nodeB = new SubsciberTestServerNode("nodeB").Start().Result; 

      Log("And I wait 5 seconds"); 
      Thread.Sleep(5000); 

      Log("When I publish a message: {0}", TestPayload); 
      publisher.Publish(TestPayload); 

      Log("And I wait 60 seconds"); 
      Thread.Sleep(TimeSpan.FromSeconds(60)); 

      messageA = nodeA.Message; 
      messageB = nodeB.Message; 
     } 
     catch (AggregateException exception) 
     { 
      Log("Exception Occurred: {0}", exception.Flatten().Message); 
      Exception = exception; 
     } 
     catch (Exception exception) 
     { 
      Log("Exception Occurred: {0}", exception.Message); 
      Exception = exception; 
     } 
     finally 
     { 
      nodeA?.Dispose(); 
      nodeB?.Dispose(); 

      Log("Then no exceptions should have been thrown."); 
      Exception.Should().BeNull(); 

      Log("Then the message should have been added to the Message Queue"); 
      messageA.Should().NotBeNullOrWhiteSpace(); 
      messageB.Should().NotBeNullOrWhiteSpace(); 
     } 

Server:

internal class SubsciberTestServerNode : IDisposable 
{ 
    private readonly string _nodeName; 
    private readonly string _url; 
    private WcDispatchSubscriber _subscriber; 
    private IDisposable _webApp; 

    public SubsciberTestServerNode(string nodeName) 
    { 
     _nodeName = nodeName; 
     _url = $"http://localhost:9999/{nodeName}"; 
     MessageList = new List<string>(); 
    } 

    public string Message { get; set; } 
    public List<string> MessageList { get; set; } 

    public void Dispose() 
    { 
     if (_webApp != null) 
     { 
      _webApp.Dispose(); 
      _webApp = null; 
      _subscriber.Dispose(); 
      _subscriber = null; 
     } 
    } 

    public async Task<SubsciberTestServerNode> Start() 
    { 
     _webApp = WebApp.Start(_url, app => 
     { 
      new Startup(_nodeName).Configuration(app); 
      Thread.Sleep(TimeSpan.FromSeconds(5)); 
      //Place this code into your Application_Start() method. 
      var factory = new ConnectionFactory 
      { 
       UserName = "guest", 
       Password = "guest", 
       HostName = "localhost" 
      }; 

      var exchangeName = "WC_LeadDispatch_Exchange"; 

      var configuration = new RabbitMqScaleoutConfiguration(factory, exchangeName); 
      GlobalHost.DependencyResolver.UseRabbitMq(configuration); 
      GlobalHost.Configuration.TransportConnectTimeout = TimeSpan.FromSeconds(10); 


      Thread.Sleep(TimeSpan.FromSeconds(5)); 
     }); 

     _subscriber = new WcDispatchSubscriber(); 
     await _subscriber.Subscribe(_url, msg => 
     { 
      string message = $"Message received at Node: {_nodeName}. Message: {msg}."; 
      Console.WriteLine(message); 
      Message = message; 
      MessageList.Add(message); 
     }); 
     return this; 
    } 
} 

Abonnenten:

public class WcDispatchSubscriber : IDisposable 
{ 
    private const string HubName = "DispatchUpdateHub"; 
    private const string MessageEventName = "addMessage"; 
    private readonly int _connectionLimitInt; 
    private IDisposable _hubProxySubscription; 

    public WcDispatchSubscriber() 
    { 
     string connectionLimit = ConfigurationManager.AppSettings.Get("SignalRConnectionLimit"); 
     int.TryParse(connectionLimit, out _connectionLimitInt); 
     _connectionLimitInt = _connectionLimitInt == 0 ? 100 : _connectionLimitInt; 
    } 

    public void Dispose() 
    { 
     _hubProxySubscription.Dispose(); 
    } 

    public async Task Subscribe(string hubConnectionString, Action<string> messageReceived) 
    { 
     var hubConnection = new HubConnection(hubConnectionString); 
     IHubProxy dispatchHubProxy = hubConnection.CreateHubProxy(HubName); 
     _hubProxySubscription = dispatchHubProxy.On(MessageEventName, messageReceived); 
     ServicePointManager.DefaultConnectionLimit = _connectionLimitInt; 
     await hubConnection.Start(); 
    } 
} 

Verlag:

public class WcDispatchPublisher 
{ 
    private const string ExchangeName = "WC_LeadDispatch_Exchange"; 
    private readonly IHubContext _hubContext; 

    public WcDispatchPublisher(string connectionString) 
    { 
     //actual string will look like this. we may need to overload the other constructors in the Rabbit/SigR. 
     //_rabbitConnectionString = 
     // "host=cprmqsrvt02vn01:5672;publisherConfirms=true;username=unittest;password=Un1t735t;virtualhost=UnitTest-NotificationService"; 
     var configuration = new RabbitMqScaleoutConfiguration(connectionString, ExchangeName); 
     GlobalHost.DependencyResolver.UseRabbitMq(configuration); 

     _hubContext = GlobalHost.ConnectionManager.GetHubContext<DispatchUpdateHub>(); 
    } 

    /// <summary> 
    /// </summary> 
    /// <param name="payload"></param> 
    public void Publish(string payload) 
    { 
     Task.Factory.StartNew(() => 
     { 
      _hubContext.Clients.All.addMessage(payload); 
     }).Wait(); 
    } 
} 

das wird jeder sagen, 12. Lauf arbeiten oder so, dann ist normalerweise hier, was ich bekommen:

Given I have a WorkCenter Dispatch Publisher 
And I have multiple server nodes subscribed to the backplane 
And I wait 5 seconds 
When I publish a message: test-payload 
And I wait 60 seconds 
Message received at Node: nodeA. Message: test-payload. 
Message received at Node: nodeB. Message: test-payload. 
Message received at Node: nodeB. Message:   {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":" Leads Dispatch","DispatchDateTime":"2016-06- 16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":" Leads Dispatch","DispatchDateTime":"2016-06- 16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":" Leads Dispatch","DispatchDateTime":"2016-06- 16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":" Leads Dispatch","DispatchDateTime":"2016-06- 16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}. 

... WEITERHIN dir auch gefallen FÜR IMMER

Antwort

1

Aus der Spitze von meinem Kopf würde ich sagen, Versuch:

nodeA = new SubsciberTestServerNode("nodeA").Start().Wait(); 
nodeB = new SubsciberTestServerNode("nodeB").Start().Wait(); 

oder

nodeA = await new SubsciberTestServerNode("nodeA").Start(); 
nodeB = await new SubsciberTestServerNode("nodeB").Start(); 

Ergebnis ist ein Thread Blocking-Aufruf, so ist es wahrscheinlich blockieren NodeA von feuern.

+0

Diese Änderung zeigt immer noch das gleiche Verhalten, obwohl ich zu Ihrem erwarten Code geändert habe. Danke – Chazt3n

+0

Hmm ... interessant, warst du in der Lage, den Code überhaupt zu durchlaufen? –

+0

Oh ja, es scheint nur ein Problem zu sein, zwei Server in einem Testszenario zu betreiben – Chazt3n