2017-06-24 3 views
0

Ich habe ein Dropdown-Produkt und die Auswahl des Produkts verbindet sich mit einem Websocket und erhalten Sie die Feed-Nachrichten für dieses Produkt. Sobald die (1) Feed-Nachrichten kommen, muss ich dann (2) das Auftragsbuch bekommen und dann (3) die Feed-Nachrichten verarbeiten. Die erste und die letzte Aufgabe würden also asynchron ausgeführt werden. Dazu habe ich folgenden Code geschrieben:C# Daten verarbeiten nicht korrekt in Multithreading

void OnReceivingFeedMessage() 
{ 
    concurrentQueue.Enqueue(message); 

    if (!messageStreamStarted) // only first time get order book 
    { 
     messageStreamStarted = true; 
     GetOrderBookData(); 
    } 
} 

private void GetOrderBookData() 
{ 
    MarketData m = new MarketData(); 
    ProductOrderBook p = m.GetProductOrderBook(productId);   
    bidsList = p.bids; 
    asksList = p.asks; 
    isOrderBookUpdated = true; 

    Task task3 = Task.Run(() => KickStartToProcessQueue());   
} 

private void KickStartToProcessQueue() 
{ 
    while (threadProcessQueueExist) 
    { 
     int recordCountNew = concurrentQueue.Count(); 
     if (recordCountNew != 0) 
     { 
      if (isOrderBookUpdated) 
      { 
       ProcessQueueMessages(); 
      } 
     } 
    } 
} 

private void ProcessQueueMessages() 
{ 
    if (!concurrentQueue.IsEmpty) 
    { 
     string jsonString; 
     while (concurrentQueue.TryDequeue(out jsonString)) 
     { 
      // have to insert the record in existing order book 
     } 
    } 
} 

Das funktioniert perfekt zum ersten Mal. Aber wenn ich das Produkt ändere und die Dinge wieder verbinde, werden die Daten nicht richtig verarbeitet. Der Code auf Produkt geschrieben selectedIndex

private void CloseAndReconnectToGetWebsocketFeed() 
{ 
    w.CloseWebsocketConnection();     
    messageStreamStarted = false;    
    isOrderBookUpdated = false; 
    ConcurrentQueue<string> wssMessagesQueue = new ConcurrentQueue<string>(); 
    concurrentQueue = wssMessagesQueue; 

    ConnectAndGetWebsocketFeedMessages(); // this calls OnReceivingFeedMessage 
} 

: Ich bin so ändern, nicht sicher Multi-Threading, wenn wir sperren oder Asynchron verwenden müssen,/erwarten oder etwas anderes. Was mache ich falsch im obigen Code?

Es läuft gut, wenn Sie das erste Mal laufen, aber in dem Moment, in dem ich das Produkt ändere und die gleiche Verarbeitung wieder mache, gibt es Probleme. Kann jemand bitte beraten, wie kann ich alle Ressourcen löschen, bevor Sie die gleichen Schritte immer wieder tun

+1

Sie scheinen sehr unnötigen komplizierten Code zu haben. Verstehe ich Sie richtig, dass Sie die Nachrichten (parallel) einfach verarbeiten möchten, sobald sie eingehen? – georch

+0

Und Sie möchten die alten Nachrichten abbrechen, sobald neue Nachrichten eintreffen? – georch

+0

Nein, ich möchte das Auftragsbuch bekommen, sobald die Nachrichten kommen und dann, wenn ich das Auftragsbuch bekommen habe, muss ich die Nachrichten verarbeiten. So werden die ersten Nachrichten kommen dann bestellen und dann Verarbeitung – user1254053

Antwort

1

Ich denke, Sie schreiben unnötig komplizierten Code. Ich bin mir nicht 100% sicher, was dein Problem ist, aber hier sind einige Dinge, die dir helfen könnten.

Verwenden Sie ein BlockingCollection<T>

Mit dieser Klasse können Sie Ihre Verbraucher-Thread stoppen, bis neue Nachrichten in kommen Hier ist ein einfaches Beispiel, wie diese Dinge funktionieren.

BlockingCollection<string> collection = new BlockingCollection<string>(new ConcurrentQueue<string>()); 
Task t = Task.Run(() => 
{ 
    while (collection.TryTake(out string item, Timeout.Infinite)) 
    { 
     Console.WriteLine($"Started reading {item}..."); 
     Thread.Sleep(1000); //simulate intense work 
     Console.WriteLine($"Done reading {item}"); 
    } 
}); 
while (true) 
{ 
    //This could be your OnReceivingFeedMessage() 
    string input = Console.ReadLine(); 
    if (input == "stop") 
    { 
     Console.WriteLine("Stopping..."); 
     collection.CompleteAdding(); 
     break; 
    } 
    else 
    { 
     collection.Add(input); 
    } 
} 
t.Wait(); 

Die Aufgabe t wird warten, bis sich Artikel in collection befinden. Wenn Elemente "empfangen" werden (hier einfach über die Konsoleneingabe), werden sie zu Ihrer Liste hinzugefügt.

Versand neue Aufgaben am Eingang arbeiten

Ganz einfach:

while (true) 
{ 
    string item = Console.ReadLine(); 
    Task.Run(() => 
    { 
     Console.WriteLine($"Started reading {item}..."); 
     Thread.Sleep(1000); //simulate intense work 
     Console.WriteLine($"Done reading {item}"); 
    }); 
} 

Dies hat auch den Vorteil (oder Nachteil), dass die Aufgaben alle parallel laufen. Das bedeutet, dass Sie sich nicht auf die Reihenfolge verlassen können, in der sie bearbeitet werden, aber sie werden viel schneller verarbeitet.

Übrigens: Beide Ansätze haben den Vorteil, dass Sie nicht viel zu tun haben. Aus Ihrer Frage:

while (threadProcessQueueExist) 
{ 
    int recordCountNew = concurrentQueue.Count(); 
    if (recordCountNew != 0) 
    { 
     if (isOrderBookUpdated) 
     { 
      ProcessQueueMessages(); 
     } 
    } 
} 

Dieser Code wird beschäftigt warten so lange erstellen, wie nichts in der Warteschlange ist, was bedeutet, dass ein Kern des Prozessors bei sehr hohen Belastung sein wird, ohne tatsächlich etwas zu tun. Es wird als schlechte Praxis angesehen.