0

Ich benutze azure Service Bus Thema. wie ich große Nachricht habe, so spalte ich große Nachricht und sende sie in kleine - kleine Nachrichten mit sessionid und split-Reihenfolge. Ich möchte, dass mein Receiver eine ereignisgesteuerte Architektur hat. da ich alle Nachrichten mit der gleichen SessionID empfangen muss und sie mit der richtigen Split-Order aggregieren muss. aber nur beim ersten Mal bekomme ich eine Nachricht von meinem Code. in der zweiten Nachricht Timeouts.Azure Service Bus Thema empfangen Nachricht mit Sitzung mit ereignisgesteuerter Architektur Modell

  public class CRMESBListener : RoleEntryPoint 
      { 
       private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
       private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

      public override void Run() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running"); 
       try 
       { 
        DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner(); 
        dbMessageListener.Listen(); 
        runCompleteEvent.WaitOne(); 
        //this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
       } 
       finally 
       { 
        this.runCompleteEvent.Set(); 
       } 
      } 

      public override bool OnStart() 
      { 
       // Set the maximum number of concurrent connections 
       ServicePointManager.DefaultConnectionLimit = 12; 

       // For information on handling configuration changes 
       // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 


       bool result = base.OnStart(); 
       Bootstrapper.Init(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started"); 

       return result; 
      } 

      public override void OnStop() 
      { 
       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping"); 

       this.cancellationTokenSource.Cancel(); 
       this.runCompleteEvent.WaitOne(); 

       base.OnStop(); 

       Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped"); 
      } 

      private async Task RunAsync(CancellationToken cancellationToken) 
      { 
       // TODO: Replace the following with your own logic. 
       while (!cancellationToken.IsCancellationRequested) 
       { 
        Trace.TraceInformation("Working"); 
        await Task.Delay(1000); 
       } 
      } 
     } 













     public class DBMessageListener 
     { 
      #region Member Variables 

      private static DBMessageListener dbMessageListner; 
      private static object lockObject = new object(); 
      private TopicSubscribeClientWrapper accountTopicClient; 

      private NamespaceManager namespaceManager; 
      private OnMessageOptions eventDrivenMessagingOptions; 

      private int crmIntegrationUserID = Common.CrmCurrentUser.UserID; 

      #endregion Member Variables 

      #region Constructors 

      private DBMessageListener() 
      { 
       string subscriptionName = "AllMessages"; 
       namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString); 

       if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName)) 
       { 
        namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName); 
       } 
       accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath); 
       accountTopicClient.SubscriptionName = subscriptionName; 



       eventDrivenMessagingOptions = new OnMessageOptions 
       { 
        AutoComplete = true 
       }; 

       eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived; 
       eventDrivenMessagingOptions.MaxConcurrentCalls = 5; 
      } 

      #endregion Constructors 

      #region Methods 

      private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message) 
      { 
       if (message != null) 
       { 
        try 
        { 
         await ProcessDBMessage(message.GetBody<ServiceBusMessage>()); 
        } 
        catch (Exception ex) 
        { 
         //log exception 
        } 
       } 

      } 

      private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e) 
      { 
       if (e != null && e.Exception != null) 
       { 

       } 
      } 

      private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message) 
      { 

    //process message   
      } 

      public static DBMessageListener GetDBMessageListner() 
      { 
       if (dbMessageListner == null) 
       { 
        lock (lockObject) 
        { 
         if (dbMessageListner == null) 
         { 
          dbMessageListner = new DBMessageListener(); 
         } 
        } 
       } 

       return dbMessageListner; 
      } 

      public void Listen() 
      { 
       accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions); 

      } 

      #endregion Methods 
     } 


public class TopicSubscribeClientWrapper : IServiceBusClientWrapper 
    { 
     #region Member Variables 

     private readonly string _connectionString; 
     private readonly string _topicName; 
     private readonly TopicClient _topicClient; 
     private SubscriptionClient _subscriptionClient; 

     #endregion Member Variables 

     #region Properties 

     public string SubscriptionName { get; set; } 

     #endregion Properties 

     #region Constructors 

     public TopicSubscribeClientWrapper(string connectionString, string topicName) 
     { 
      _connectionString = connectionString; 
      _topicName = topicName; 
      _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName); 
     } 

     #endregion Constructors 

     #region Event Handlers 

     public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions) 
     { 

      _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName); 

      // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

      MemoryStream largeMessageStream = new MemoryStream(); 
      MessageSession session = _subscriptionClient.AcceptMessageSession(); 

      while (true) 
      { 
       BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5)); 

       if (subMessage != null) 
       { 
        Stream subMessageStream = subMessage.GetBody<Stream>(); 
        subMessageStream.CopyTo(largeMessageStream); 

        subMessage.Complete(); 
        //Console.Write("."); 
       } 
       else 
       { 
        //Console.WriteLine("Done!"); 
        break; 
       } 
      } 

      BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true); 
      var message = onMessageCallback.Method.GetParameters(); 
      message.SetValue(largeMessage, 1); 
      _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions); 

     } 

     #endregion Event Handlers 

     #region Methods 

     public Task SendAsync(BrokeredMessage message) 
     { 
      return _topicClient.SendAsync(message); 
     } 

     public void Close() 
     { 
      if (_subscriptionClient != null) 
      { 
       _subscriptionClient.Close(); 
      } 

      _topicClient.Close(); 
     } 

     #endregion Methods 
    } 

Antwort

1

Ich würde vorschlagen, einen anderen Weg zu nehmen. Anstatt zu versuchen, eine Nachrichtensitzung zu erstellen, die eine große Nachricht weitergibt, verwenden Sie claim check pattern, die speziell dieses Problem anspricht - große Anhänge. Schreiben Sie Daten in ein Speicher-Blob und lassen Sie den URI mit der Nachricht senden. Es wäre viel einfacher, ein Blob zu speichern/wiederherzustellen, als zu versuchen, die Nutzdaten in Blöcken zu senden. Außerdem ist es so einfacher, Ihr System zu überwachen (eine fehlgeschlagene Nachricht mit einem oder mehreren Blobs). Ich muss Sitzungen oder etwas Besonderes verwenden.

Verwandte Themen