2016-06-19 10 views
1

Ich möchte Daten von einem Thread zu einem anderen bewegen, aber mein Code funktioniert nur für den ersten Wert ich die ersten 5 Werte in der Liste aus und drucken sie aus danach hier ist mein Code anstelle des Sparens passieren:Wie gebe ich Daten zwischen Threads weiter?

private readonly ConcurrentQueue<int> _queue = new ConcurrentQueue<int>(); 
    private readonly AutoResetEvent _signal = new AutoResetEvent(false); 

    public void Thread1() 
    { 
     List<int> values = new List<int>(); 
     int lastInput; 
     StringBuilder sb = new StringBuilder(); 

     while (values.Count < 5) 
     { 
      _signal.WaitOne(); 
      _queue.TryDequeue(out lastInput); 

      values.Add(lastInput); 
     } 
     for (int i = 0; i < values.Count; i++) 
     { 
      sb.Append(String.Format("{0}\n", values[i])); 
     } 
     MessageBox.Show(sb.ToString()); 
    } 

    private void button1_Click(object sender, EventArgs e) 
    { 
     Thread th1 = new Thread(Thread1); 
     th1.Start(); 

     for (int i = 0; i < 8; i++) 
     { 
      _queue.Enqueue(i); 
      _signal.Set(); 
     } 
    } 
+2

Haben Sie die "wichtigen" Hinweise im Abschnitt Anmerkungen [hier] (https://msdn.microsoft.com/en-us/library/system.threading.eventwaithandle.set (v = vs.110) gefunden? aspx)? Ich vermute, dass die meisten Ihrer 'Set()' Anrufe nichts tun –

+0

Es sieht so aus, als ob Sie versuchen, Ihre eigene Implementation von [BlockingCollection] zu erstellen (https://msdn.microsoft.com/en-us/library/dd267312 (v = vs.110) .aspx), benutze einfach den eingebauten. –

Antwort

3

Ich sehe, was Sie versuchen zu tun, und @ MarcGravell's Kommentar ist richtig, und auch was @mariosangiorgio sagt, ist wahr. Was Sie als Workaround tun können, verwenden Sie stattdessen einen Monitor Wait/Pulse Mechanismus. Versuchen Sie Folgendes:

private readonly Queue<int> _queue = new Queue<int>(); 
    private readonly object _locker = new object(); 

    public void Thread1() 
    { 
     List<int> values = new List<int>(); 
     int lastInput; 
     StringBuilder sb = new StringBuilder(); 

     while (values.Count < 5) 
     { 
      lock (this._locker) 
      { 
       // wait until there is something in the queue 
       if (this._queue.Count == 0) 
       { 
        Monitor.Wait(this._locker); 
       } 

       // get the item from the queue 
       _queue.Dequeue(out lastInput); 

       // add the item to the list 
       values.Add(lastInput); 
      } 
     } 

     for (int i = 0; i < values.Count; i++) 
     { 
      sb.Append(String.Format("{0}\n", values[i])); 
     } 
     MessageBox.Show(sb.ToString()); 
    } 

    private void button1_Click(object sender, EventArgs e) 
    { 
     Thread th1 = new Thread(Thread1); 
     th1.Start(); 

     for (int i = 0; i < 8; i++) 
     { 
      lock (this._locker) 
      { 
       // put something in the queue 
       _queue.Enqueue(i); 

       // notify that there is something in the queue 
       Monitor.Pulse(this._locker); 
      } 
     } 
    } 

Also, im Wesentlichen, was Sie tun werden ist eine Schleife aufrufen, die wird versuchen, und insgesamt 5 Elemente zu verbrauchen. Wenn der Consumer-Thread erkennt, dass keine Elemente in der Warteschlange zu konsumieren sind, wartet er, bis der Producer einige Elemente in die Warteschlange stellt. Sobald der Produzent die Artikel in die Warteschlange stellt, teilt er dem wartenden Consumer-Thread mit, dass er bereit ist zu gehen! Der Consumer-Thread wird dann die Blockierung aufheben und das Objekt verbrauchen, das sich in der Warteschlange befindet.

Jetzt zusätzlich, wenn Sie @ mariosangiorgio kommentieren, verwenden Sie tatsächlich eine gleichzeitige Sammlung. Also hat er recht, es gibt eigentlich keine Notwendigkeit zu blockieren. Wenn Sie also Ihr eigenes Blockierungs-/Entsperrexperiment durchführen möchten, können Sie mit meiner Implementierung fortfahren und einfach ein normales Queue (nicht gleichzeitig) verwenden. Oder, wie @mariosangiorgio sagte, entfernen Sie einfach die AutoResetEvent und lassen Sie die ConcurrentQueue ihr Ding machen.

Denken Sie daran, dass wenn Sie nicht blockieren, Sie kontinuierlich die CPU durchlaufen und laufen lassen, bis etwas tatsächlich Dequeue 'd.

0

Ich denke, das Problem ist in dir _signal.Set(); und _signal.WaitOne();.

Das funktioniert so, wie Sie nur wollen, wenn Sie ein Interleaving haben, wobei WaitOne immer vor einem Set aufgerufen wird. Ich vermute, dass in Ihrem Fall, dass Sie das folgende Ereignis Verschachtelung haben:

_signal.WaitOne(); // This waits for the first set 
_signal.Set(); // This notifies the first `WaitOne` 
_signal.Set(); // This doesn't notify anything 
_signal.Set(); // This doesn't notify anything 
_signal.Set(); // This doesn't notify anything 
_signal.Set(); // This doesn't notify anything 
_signal.WaitOne(); // Nothing is going to set this 

Da Sie eine ConcurrentQueue verwenden Sie müssen keine AutoResetEvent verwenden. Einfach entfernen und alles sollte funktionieren.

+1

Wenn sie nur das 'AutoResetEvent' entfernen, werden sie nicht looping und nur CPU essen bis etwas in die Warteschlange kommt? – Snoopy

+0

Ja, du hast recht – mariosangiorgio

+1

Als ich zum ersten Mal von Multi-Threading erfuhr, schrie mich ein Typ an und sagte, ich habe nichts gehört, was er gesagt habe ... Ich bin nicht stolz darauf, aber es blieb bei mir, denke ich ... – Snoopy

1

Ich dachte, ich würde eine andere Lösung für das Problem anbieten. Sie könnten sicherlich AutoResetEvent und ConcurrentQueue verwenden, aber es macht den Code schwer zu verstehen, bekommen Recht und Grund über.

Sie sollten im Allgemeinen versuchen, eine Bibliothek mit einer einfacheren Abstraktion zu verwenden. Ich mag das Reactive Framework von Microsoft (NuGet "Rx-Main", "Rx-WinForms" oder "Rx-WPF").

Mit Rx können Sie eine LINQ-ähnliche Pipeline von asynchron ausgeführten Operationen erstellen, in der Sie die Scheduler (Threading) angeben können, die Sie verwenden möchten.

Dies entspricht Ihren Code:

IDisposable subscription = 
    Observable 
     .Range(0, 5, Scheduler.Default) 
     .ToArray() 
     .Select(xs => String.Join(Environment.NewLine, xs)) 
     .ObserveOn(this) 
     .Subscribe(x => MessageBox.Show(x)); 

Die Verwendung von Scheduler.Default für eine Windows-Anwendung, die Berechnung auf einen neuen Thread schiebt. Die String-Generierung erfolgt also außerhalb des UI-Threads. Die .ObserveOn(this) schiebt die Berechnung zurück auf die Benutzeroberfläche (this bezieht sich auf das aktuelle Formular - Sie könnten ein beliebiges UI-Element anstelle von this verwenden).

subscription ist ein IDisposable, so dass Sie subscription.Dispose() jederzeit anrufen kann die Berechnung abzukürzen, wenn es lange Lauf ist und Sie wollen es zu stoppen.

Die Rx-Bibliothek ist sehr leistungsfähig und bietet Ihnen viele Operatoren, um einige sehr komplexe Berechnungen in einer relativ einfachen Form durchzuführen.

+1

Auch ein großer Fan von Rx! Ich denke du hast mir mit einer meiner Fragen einmal über Rx geholfen. – Snoopy

Verwandte Themen