2016-09-08 1 views
0

Verwenden von Azure Service Bus als Transport, geplante Nachrichten funktionieren jedoch nur beim Aufruf von einem IConsumer aus.MassTransitStateMachine Zeitpläne gebrochen?

Ich verbrachte Stunden und Tage und habe immer noch wenig Ahnung was los ist.

Kann mir jemand erklären, was ich tun muss, um Fahrpläne von der Zustandsmaschine mit dem azure service bus zu bekommen? Und vielleicht, warum planen Sie planen, funktioniert von IConsumer Kontext, aber nirgendwo anders.

public class BatchCollector : MassTransitStateMachine<BufferSaga> 
{ 
    public BatchCollector(IBatchFactory batchFactory) 
    { 
     InstanceState(saga => saga.State); 
     Event(() => BufferedCommandDetected, 
      _ => _.CorrelateById(context => context.Message.GetBatchId())); 

     Schedule(() => WindowElapsed, x => x.BatchCompletionId, x => 
     { 
      x.Delay = TimeSpan.FromSeconds(5); 
      x.Received = e => e.CorrelateById(context => context.Message.CorrelationId); 
     }); 


     Initially(
      When(BufferedCommandDetected) 
       .Then(
        context => 
        { 
         context.Instance.CorrelationId = context.Data.GetBatchId(); 
         context.Instance.Id = Guid.NewGuid().ToString("N"); 
         context.Instance.Buffer.Add(context.Data); 
         context.Instance.BatchStartTime = DateTimeOffset.Now; 
         context.Instance.AbsoluteDeadLine = DateTimeOffset.Now + context.Data.AbsoluteWindowSpan; 
         context.Instance.SlidingDeadLine = DateTimeOffset.Now + context.Data.SlidingWindowSpan; 
        }) 
       .Schedule(WindowElapsed, 
        context => new WindowElapsed {CorrelationId = context.Instance.CorrelationId }, 
        delayProvider: scheduleDelayProvider => scheduleDelayProvider.Data.SlidingWindowSpan < scheduleDelayProvider.Data.AbsoluteWindowSpan ? scheduleDelayProvider.Data.SlidingWindowSpan : scheduleDelayProvider.Data.AbsoluteWindowSpan) 
       .TransitionTo(Waiting)); 

     During(Waiting, 
      When(BufferedCommandDetected) 
       .Then(context => 
       { 
        context.Instance.SlidingDeadLine += context.Data.SlidingWindowSpan; 
        context.Instance.Buffer.Add(context.Data); 
       }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine > DateTimeOffset.Now && context.Instance.AbsoluteDeadLine > DateTimeOffset.Now) 
       .Schedule(WindowElapsed, context => new WindowElapsed { CorrelationId = context.Instance.CorrelationId }), 
      When(WindowElapsed.Received, context => context.Instance.SlidingDeadLine <= DateTimeOffset.Now || context.Instance.AbsoluteDeadLine <= DateTimeOffset.Now) 
       //.Unschedule(WindowElapsed) 
       .Publish(context => new Batch() 
       { 
        BatchId = context.Instance.BatchCompletionId ?? Guid.NewGuid(), 
        Content = context.Instance.Buffer, 
        StartTime = context.Instance.BatchStartTime, 
        EndTime = DateTimeOffset.Now 
       }) 
       .Finalize() 
       .TransitionTo(BufferCompleted)); 

     SetCompletedWhenFinalized(); 
    } 

    public Event<BufferedCommand> BufferedCommandDetected { get; private set; } 


    public Schedule<BufferSaga, WindowElapsed> WindowElapsed { get; private set; } 

    public State Waiting { get; private set; } 

    public State BufferCompleted { get; private set; } 
} 

Der Bus init:

container.RegisterType<IBusControl>(
      new HierarchicalLifetimeManager(), 
      new InjectionFactory(c => 
      { 
       var bus = Bus.Factory.CreateUsingAzureServiceBus(
        cfg => 
        { 
         var azSbHost = cfg.Host(new Uri(CloudConfigurationManager.GetSetting("ServiceBus.Url")) 
          , host => 
          { 
           host.TokenProvider = TokenProvider 
            .CreateSharedAccessSignatureTokenProvider 
            (CloudConfigurationManager.GetSetting("ServiceBus.SharedAccessKeyName"), 
             CloudConfigurationManager.GetSetting("ServiceBus.AccessKey"), 
             TokenScope.Namespace); 
          }); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          "Quartz.Scheduler", 
          sbConfig => 
           { 
            cfg.UseMessageScheduler(sbConfig.InputAddress); 
            sbConfig.Consumer(() => new ScheduleMessageConsumer(c.Resolve<IScheduler>())); 
           } 
         ); 

         cfg.ReceiveEndpoint(
          azSbHost, 
          Assembly.GetExecutingAssembly().GetName().Name, 
          sbConfig => 
          { 
           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", 
               StringComparison.OrdinalIgnoreCase) ?? false) 
              && 
              @class.GetParentClasses() 
               .Any(
                parent => 
                  parent.Name.StartsWith("MassTransitStateMachine`1"))) 
            .ForEach(@class => 
            { 
             //dynamic cast to avoid having to deal with generic typing when type is not known until runtime.             
             dynamic stateMachineExtension = 
              new DynamicStaticWrapper(typeof(StateMachineSubscriptionExtensions)); 
             stateMachineExtension 
              .StateMachineSaga(
               sbConfig, 
               c.Resolve(@class), 
               c.Resolve(typeof(ISagaRepository<>).MakeGenericType(
                @class.GetParentClasses().First(parent => 
                   parent.Name.StartsWith("MassTransitStateMachine`1")) 
                 .GetGenericArguments().First()))); 
            }); 



           AllClasses.FromAssembliesInBasePath() 
            .Where(
             @class => 
              (@class?.Namespace?.StartsWith("bcn", StringComparison.OrdinalIgnoreCase) ?? 
              false) 
              && @class.GetInterfaces().Any(
               @interface => 
                @interface?.FullName?.StartsWith("MassTransit.IConsumer`1") ?? 
                false)) 
            .ForEach(@class => 
            { 
             var factoryType = typeof(UnityConsumerFactory<>).MakeGenericType(@class); 
             //Automatically register consumers. 
             dynamic consumerFactory = Activator.CreateInstance(
              factoryType, 
              container); 
             var consumingMethod = typeof(ConsumerExtensions). 
              GetMethods() 
              .First(
               m => 
                m.Name == "Consumer" && m.IsGenericMethod && 
                m.GetGenericArguments().Length == 1 && 
                m.GetParameters().Length == 3) 
              .MakeGenericMethod(@class) 
              .Invoke(null, new object[] {sbConfig, consumerFactory, null}); 

             //Automatically detect which payload contains message data. This message data is stored in blob. 
             @class.GetInterfaces().Where(
               @interface => 
                 @interface.FullName.StartsWith("MassTransit.IConsumer`1")) 
              .Select(@interface => @interface.GetGenericArguments().First()) 
              .Where(payload => payload.GetProperties() 
               .Any(prop => prop.PropertyType.Name.StartsWith("MessageData`1"))) 
              .ForEach(
               BlobType => 
                typeof(MessageDataConfiguratorExtensions) 
                 .GetMethods() 
                 .First(
                  method => 
                   method.GetParameters().First().ParameterType == 
                   typeof(IConsumePipeConfigurator) 
                   && 
                   method.GetParameters().Last().ParameterType == 
                   typeof(IMessageDataRepository)) 
                 .MakeGenericMethod(BlobType) 
                 .Invoke(null, 
                  new object[] 
                   {sbConfig, c.Resolve<IMessageDataRepository>()})); 
            }); 
          }); 

         cfg.UseServiceBusMessageScheduler(); 
         //azSbHost. 
        }); 

       return bus; 
      })); 
     container.RegisterType<IBus, IBusControl>(); 
     container.RegisterType<IBus, IBusControl>(new ContainerControlledLifetimeManager()); 

und begann dann:

var container = UnityConfig.GetConfiguredContainer(); 
     var bus = container.Resolve<IBusControl>(); 
     bus.Start(); 

     var scheduler = container.Resolve<IScheduler>(); 
     scheduler.Start(); 

     bus.Publish<BufferedCommand>(new BufferedCommandAdapter<decimal>(10m, TimeSpan.FromSeconds(5), 
      TimeSpan.FromSeconds(5))); 

Antwort

0

Sie setzen die Arbeit Fabrik für Quarz up? Werfen Sie einen Blick darauf, wie die QuartzIntegration Bibliothek das Setup:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/QuartzIntegrationExtensions.cs

Verwenden Sie auch die Beobachter rund um den Bus, so dass Quarz gestartet/Wiedergabe/Pause inline mit dem Bus.

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit.QuartzIntegration/Configuration/SchedulerBusObserver.cs

+0

Chris nach langer Zeit scheint, ist es die RavenDB Saga Anbieter gebrochen ist :(ich es. Es funktioniert gut in Erinnerung arbeiten nicht bekommen kann. Am Projekt sucht es in nicht aktualisiert wurde 4 Jahre ... das ist nicht vertrauenerweckend – Alwyn

+0

Ich denke, es gibt ein neueres vielleicht für MT3 entwickelt. –

+0

Es gibt dieses Projekt https://github.com/alexeyzimarev/MassTransit.RavenDbIntegration Das nugget-Paket ist beschädigt, so dass ich nur herunterladen Quelle und Referenz direkt, scheint bisher besser zu funktionieren, die Sperre ist immer noch langsamer als ich es möchte und daher die Tendenz, parallel zu laufen, wenn Ereignisse zusammengefügt werden sollten. – Alwyn