2016-08-31 4 views
0

Ich bin derzeit ein bisschen hilflos. Ich habe viele Beiträge über asynchrone Threads gelesen, aber niemand passt zu meiner Situation und ich verstehe es nicht zum vollständigen Bild..NET Async Queue Processing mit ein paar Extras

Ich habe eine DB mit einer Tabelle "Jobs", die kontinuierlich durch eine Anwendung "Importeur" wächst. Diese Tabelle hat in diesem Beispiel drei Spalten (Id, customer_id, DateOfEntry). Testdaten:

1,A,Date 
2,A,Date 
3,B,Date 
4,C,Date 
5,B,Date 
... 

Ich habe zweite Anwendung „JobWorker“, dass die Arbeitsplätze arbeiten sollte. ABER mit den folgenden Einschränkungen.

Ich möchte so viele async Aktionen in der Anwendung "JobWorker" starten, wie es Kunden gibt, in diesem Beispiel 3. Es wird immer schwieriger. Diese asynchronen Arbeiter sollten eine eigene Warteschlange haben, die immer parallel (synchron) an einem Job arbeiten kann. 1) Sie müssen den ältesten Job aus der Tabelle laden. 2) Die Arbeiten an sie 3) aus der Tabelle den nächsten Laden

Jetzt kommt der schwierige Teil, in Wirklichkeit gibt es 100 Kunden, sondern schicken sie nicht kontinuierlich Arbeitsplätze (Aber ich weiß nicht), aber ich möchte Arbeiten an maximal 10 Arbeitsplätzen parallel. (Aber vergessen Sie nicht nur 1 pro Kunde).

Wie kann ich dies erreichen? Ich weiß, dass die Puzzleteile (SemaphoreSlim, ActionBlock), aber ich mag es nicht zu einem fertigen Puzzle bekommen; (

[EDIT] Mein aktueller Versuch:

public class FakeJob 
{ 
    public int Id { get; set; } 

    public string ProjectName { get; set; } 

    public int Duration { get; set; } 
} 


public class JobMaster 
{ 
    private IConfigurationRoot _configuration; 

    private BufferBlock<ActionBlock<FakeJob>> _mainQueue; 

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues; 

    private Dictionary<Guid, CancellationTokenSource> _projectCancellationTokens; 


    public JobMaster() 
    { 
     _mainQueue = new BufferBlock<ActionBlock<FakeJob>>(new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); 
     _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>(); 
    } 


    public async Task WorkOnJobs() 
    { 
     List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" }; 
     List<Task> producerTasks = new List<Task>(); 
     List<FakeJob> jobs = new List<FakeJob>(); 


     jobs.Add(new FakeJob { Duration = 10, Id = 1, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 10, Id = 2, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 10, Id = 3, ProjectName = projectIds[0] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 4, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 5, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 4, Id = 6, ProjectName = projectIds[1] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 7, ProjectName = projectIds[2] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 8, ProjectName = projectIds[2] }); 
     jobs.Add(new FakeJob { Duration = 2, Id = 9, ProjectName = projectIds[2] }); 


     foreach (var loopProjectId in projectIds) 
     { 
      producerTasks.Add(WorkOnJobsForForProject(loopProjectId, jobs)); 
     } 


     await Task.WhenAll(producerTasks); 
    } 


    private async Task WorkOnJobsForForProject(string projectId, List<FakeJob> jobDB) 
    { 
     var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 

     while (true) 
     { 
      foreach (var loopJob in jobDB.Where(x => x.ProjectName == projectId)) 
      { 
       var consumer = new ActionBlock<FakeJob>(StartJob, consumerOptions); 
       _projectQueues[projectId] = consumer; 

       await _mainQueue.SendAsync(_projectQueues[projectId]); 
       await _projectQueues[projectId].SendAsync(loopJob); 
       await Task.WhenAll(_projectQueues[projectId].Completion); 
      } 

      break; 
     } 
    } 


    private async Task StartJob(FakeJob job) 
    { 
     Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName); 
     await Task.Delay(job.Duration * 1000); 
     _projectQueues[job.ProjectName].Complete(); 
     Log.Logger.Information("Finished job [{A}]", job.Id); 
    } 
} 

[EDIT 2] Mein aktuellen Versuch arbeitet mit MaxDegreeOfParallelism = 3 für _mainQueue Aber nicht für 2;. ( Wenn ich es 2, 9 Job eingestellt wird nicht ausgeführt; (

public class FakeJob 
{ 
    public int Id { get; set; } 

    public string ProjectName { get; set; } 

    public int Duration { get; set; } 

    public bool IsComplete { get; set; } 
} 


public class JobMaster_BackUp 
{ 
    private ActionBlock<CustomerQueue> _mainQueue; 

    private Dictionary<string, ActionBlock<FakeJob>> _projectQueues; 

    public static List<FakeJob> FakeJobDB = new List<FakeJob>(); 


    public JobMaster_BackUp() 
    { 
     _mainQueue = new ActionBlock<CustomerQueue>(MainQueueJob, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 }); 
     _projectQueues = new Dictionary<string, ActionBlock<FakeJob>>(); 
    } 


    public async Task WorkOnJobs() 
    { 
     List<string> projectIds = new List<string>() { "Testkunde 1", "Testkunde 2", "Testkunde 3" }; 
     List<Task> producerTasks = new List<Task>(); 



     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 1, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 2, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 3, ProjectName = projectIds[0] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 4, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 5, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 6, ProjectName = projectIds[1] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 7, ProjectName = projectIds[2] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 8, ProjectName = projectIds[2] }); 
     FakeJobDB.Add(new FakeJob { Duration = 3, Id = 9, ProjectName = projectIds[2] }); 


     foreach (var loopProjectId in projectIds) 
     { 
      CancellationTokenHandler.ProjectCancellationTokens[loopProjectId] = new CancellationTokenSource(); 
      producerTasks.Add(WorkOnJobsForForProject(loopProjectId, CancellationTokenHandler.ProjectCancellationTokens[loopProjectId].Token)); 
     } 


     await Task.WhenAll(producerTasks); 
    } 


    private FakeJob GetNextJob(string projectId) 
    { 
     FakeJob nextJob = FakeJobDB.Where(x => x.ProjectName == projectId && x.IsComplete == false).OrderBy(x => x.Id).FirstOrDefault(); 

     if (nextJob != null) 
     { 
      Log.Logger.Information("GetNextJob [" + nextJob.Id + "]"); 
     } 

     return nextJob; 
    } 


    private async Task WorkOnJobsForForProject(string projectId, CancellationToken cancellationToken) 
    { 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      FakeJob loopJob = GetNextJob(projectId); 

      if (loopJob != null) 
      { 
       CustomerQueue customerQueue = new CustomerQueue(loopJob); 

       await _mainQueue.SendAsync(customerQueue); 

       await customerQueue.WaitForCompletion(); 
      } 
     } 
    } 





    private async Task MainQueueJob(CustomerQueue consumer) 
    { 
     consumer.Start(); 
     await Task.WhenAll(consumer.WaitForCompletion()); 
    } 
} 


public class CustomerQueue 
{ 
    private ActionBlock<FakeJob> _queue; 

    private FakeJob _job; 


    public CustomerQueue(FakeJob job) 
    { 
     _job = job; 

     var consumerOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 
     _queue = new ActionBlock<FakeJob>(StartJob, consumerOptions); 
    } 


    public void Start() 
    { 
     _queue.SendAsync(_job); 
    } 


    public async Task WaitForCompletion() 
    { 
     await Task.WhenAll(_queue.Completion); 
    } 


    private async Task StartJob(FakeJob job) 
    { 
     //Log.Logger.Information("Start job [{A}] for [{B}]", job.Id, job.ProjectName); 
     await Task.Delay(job.Duration * 1000); 

     JobMaster_BackUp.FakeJobDB.Single(x => x.Id == job.Id).IsComplete = true; 

     _queue.Complete(); 

     Log.Logger.Information("Finished job [{A}]", job.Id); 
    } 
} 
+0

Sie können sich die 'Parallel.ForEach()' anschauen. –

+1

Sie sollten _something_ versuchen. Ihre "Puzzleteile" implizieren, dass Sie sich bereits für einen bestimmten Teil der .NET-API für die Implementierung entschieden haben, im Gegensatz zu anderen Mechanismen, die dasselbe erreichen könnten. Also, lass es uns sehen.Stellen Sie eine gute [mcve] zur Verfügung, die zeigt, wie weit Sie gekommen sind, und erklären Sie genau, welchen _speziellen_ Teil des Problems Sie gerade festmachen. –

+0

Die TPL Dataflow-API kann zwar Ihren Anforderungen entsprechen, aber ich bin nicht sehr damit vertraut. So würde ich wahrscheinlich einzelne Warteschlangen für jeden Kunden und dann eine einzelne Warteschlange dieser Warteschlangen verwalten. Da Sie zuerst die ältesten Jobs ausführen möchten, können die Warteschlangen Prioritätswarteschlangen sein (d. H. Sortiert), so dass Sie immer zuerst die ältesten aus der Warteschlange nehmen. Dann haben Sie einfach nicht mehr als 10 aktive Warteschlangen, die zur gleichen Zeit ausgeführt werden. Wenn Sie mit der Verarbeitung eines Jobs fertig sind, müssen Sie die Warteschlange des Kunden in der Primärwarteschlange (wiederum in der Reihenfolge des Auftragsalters) erneut anfordern und die nächste Warteschlange für die Verarbeitung aus der Warteschlange nehmen. –

Antwort

1

Dies tut eine Sache richtig. Es führt nur einen Job pro Kunde parallel aus. Aber die max 2 Jobs insgesamt funktioniert nicht.

Ich habe nicht ein klares Bild von genau das, was Sie erreichen wollen, aber ich denke, , was Sie fehlt Koordination der verschiedenen Blöcke Datenfluß ist.

Es gibt mehrere Möglichkeiten, dies zu erreichen. Eines ist zu verwenden SemaphoreSlim, wie Sie bereits bemerkt haben. Sie würden eine einzige SemaphoreSlim mit einer maximalen Anzahl von 10 erstellen, übergeben Sie sie in den CustomerQueue Konstruktor und lassen Sie Ihre StartJob eine await WaitAsync am Anfang und eine Release am Ende.

Der andere Weg ist durch die Aktionsblocks eines Planer zu geben - insbesondere die Concurrent Hälfte eineine ConcurrentExclusiveSchedulerPair mit der Gleichzeitigkeit auf 10. Sie würden dies in das ConsumerQueue Konstruktor übergeben und auf dem Block Optionen.

+0

DANKE SEHR SEHR! Das war die Idee, die ich brauchte, mein Versuch war viel zu kompliziert! :) Mann, das hat mir Zeit und Nerven gekostet :) Danke nochmal. Mit freundlichen Grüßen – SharpNoiZy