2017-02-01 1 views
0

Ich habe ein kleines Dilemma. Ich habe eine Funktion, die eine Liste von Abfragen in einem Dataset mit ihrer eigenen Verbindungszeichenfolge durchläuft (könnte gleich oder verschieden sein) und füllt alle DataTables aus und gibt ein gefülltes DataSet mit allen ausgefüllten Abfragen zurück.C# Multithread-Datenbank füllen Tabelle

Momentan Dieser Prozess wird nacheinander ausgeführt. Wenn eine der Abfragen 10 Minuten dauert und die anderen 3 Abfragen jeweils 2 Minuten dauern, beträgt die Laufzeit 16 Minuten.

Ich frage mich, ob es in dieser Situation möglich ist, Multithreading zu verwenden. Dies sollte FillTable alle auf separaten Thread aufrufen und sollte die Laufzeit auf 10 Minuten herunter. Dies sind explizit nur Fill DataTable-Aufrufe (es wird keine Aktualisierung geben oder Anrufe löschen).

Das ist, was ich bisher habe:

public void FillDataSet(ref DataSet Source) 
    { 
     foreach (var r in Source.Tables["queries"].Rows) 
     { 
      string query = r["QueryStatement"].ToString(); 
      string qSource = r["QuerySource"].ToString(); 
      string tableName = r["TableName"].ToString(); 

      DBConnection db = new DBConnection(); 
      var TempSource = Source; 
      taskConnection = Task.Factory.StartNew(() => callDB(db, query, tableName, ref TempSource)); 
      Source = TempSource; 
     } 
     Task.WaitAll(taskConnection); 
    } 

    private void callDB(DBConnection db, string query, string tableName, ref DataSet Source) 
    { 
     using (var sql = new SqlConnection(db.ConnectionString)) 
     { 
      sql.Open(); 
      using (var adp = new SqlDataAdapter(query, sql)) 
      { 
       adp.SelectCommand.CommandTimeout = 0; 
       adp.Fill(Source, tableName); 
      } 
     } 
    } 

ich einen TempSource zu schaffen hatte, weil Lambda-Ausdruck in ref der Parameter nicht vorbei mag (ich das nicht ändern kann). Derzeit funktioniert das nicht, nicht sicher, was ich falsch mache.

+0

Ein ** Dilemma ** ist eine Wahl zwischen zwei gleich schlechten (oder gleich guten) Ergebnissen. Du scheinst kein Dilemma zu haben, nur ein Problem, das du lösen willst. – Enigmativity

+1

In jedem Fall ist es unwahrscheinlich, dass Sie beim Threading eine Leistungssteigerung erzielen, da Datenbankaufrufe IO sind und Threads am besten für Daten im Arbeitsspeicher funktionieren. Sie sollten uns wirklich fragen, ob es eine Möglichkeit gibt, Ihren bestehenden Code zu beschleunigen und uns nicht zu fragen, ob das Threading eine Situation verbessern könnte, die wir nicht sehen können. – Enigmativity

+0

Ich habe keine Kontrolle über die Abfragen. Lange Rede, kurzer Sinn Ich möchte, dass jeder Thread seine eigene Abfrage zur gleichen Zeit ausführt, damit sie nicht im Grunde darauf warten, dass alle Abfragen abgeschlossen sind. Zum Beispiel, wenn ich 5 Abfragen habe, die jeweils 5 Minuten durch Threading dauerte, sollte in der Lage sein, dies innerhalb von 5 Minuten vs 25 Minuten zu tun –

Antwort

1

Hier ist ein Grundbaustein, den Sie verwenden können. in der Bit-Füllen, wo ich einen Kommentar verfasst habe:

// Set your connectionstring and execute the query and fill your data here 

Das ist einfach - ich habe Threads statt Threadpool verwendet, weil auf die Menge der Arbeit, im Vergleich getan der Aufwand einen neuen Thread von Laich minimal ist. Sie können dies erweitern, wenn Sie möchten, indem Sie die Threads verfolgen und Thread-Signale usw. verwenden, um ein weit fortgeschritteneres Verhalten zu implementieren.

Wenn Sie zusätzliche Parameter an das Codeelement übergeben möchten, das die Arbeit ausführt, fügen Sie diese zur Arbeitselementdefinitionsklasse hinzu.

Hinweis: Dies unterstützt nicht mehrere parallele Ausführungen der RunParallel-Hauptmethode, aber Sie können es problemlos erweitern, um dies zu tun.

public static class RunParallel 
{ 
    const int NumThreadsToRunInParallel = 8;// Tune this for your DB server performance characteristics 
    public static void FillDataSet(ref DataSet Source) 
    { 
     WorkItemDefinition Work; 
     foreach (DataRow r in Source.Tables["queries"].Rows) 
     { 
      Work = new WorkItemDefinition(); 
      Work.Query = r["QueryStatement"].ToString(); 
      Work.QSource = r["QuerySource"].ToString(); 
      Work.TableName = r["TableName"].ToString(); 
      EnQueueWork(Work); 
     } 
     System.Threading.ThreadStart NewThreadStart; 
     NewThreadStart = new System.Threading.ThreadStart(ProcessPendingWork); 
     for (int I = 0; I < NumThreadsToRunInParallel; I ++) 
     { 
      System.Threading.Thread NewThread; 
      NewThread = new System.Threading.Thread(NewThreadStart); 
      //NewThread.IsBackground = true; //Do this if you want to allow the application to quit before these threads finish all their work and exit 
      ThreadCounterInc(); 
      NewThread.Start(); 
     } 
     while (ThreadCounterValue > 0) 
     { 
      System.Threading.Thread.Sleep(1000); 
     } 
    } 

    private static void ProcessPendingWork() 
    { 
     try 
     { 
      WorkItemDefinition Work; 
      Work = DeQueueWork(); 
      while (Work != null) 
      { 
       Work = DeQueueWork(); 
       DbConnection db = new OdbcConnection(); 
       // Set your connectionstring and execute the query and fill your data here 
      } 
     } 
     finally 
     { 
      ThreadCounterDec(); 
     } 
    } 

    private static int ThreadCounter = 0; 
    private static void ThreadCounterInc() 
    { 
     lock(SyncRoot) 
     { 
      ThreadCounter += 1; 
     } 
    } 
    private static void ThreadCounterDec() 
    { 
     lock (SyncRoot) 
     { 
      ThreadCounter -= 1; 
     } 
    } 
    private static int ThreadCounterValue 
    { 
     get 
     { 
      lock (SyncRoot) 
      { 
       return ThreadCounter; 
      } 
     } 
    } 

    private static object SyncRoot = new object(); 
    private static Queue<WorkItemDefinition> m_PendingWork = new Queue<WorkItemDefinition>(); 
    private static Queue<WorkItemDefinition> PendingWork 
    { 
     get 
     { 
      return m_PendingWork; 
     } 
    } 

    private static WorkItemDefinition DeQueueWork() 
    { 
     lock (SyncRoot) 
     { 
      if (PendingWork.Count > 0) // Catch exception overhead is higher 
      { 
       return PendingWork.Dequeue(); 
      } 
     } 
     return null; 
    } 

    private static void EnQueueWork(WorkItemDefinition Work) 
    { 
     lock (SyncRoot) 
     { 
      PendingWork.Enqueue(Work); 
     } 
    } 

    public class WorkItemDefinition 
    { 
     public string Query { get; set; } 
     public string QSource { get; set; } 
     public string TableName { get; set; } 
    } 
} 
+0

Das hat funktioniert !! Danke vielmals! –

+0

@ civic.sir Ihre Begrüßung. Nur aus Neugier, wie viel Beschleunigung konnten Sie bekommen? Denn am Ende sind Physical IO und einige interne SQL-Ressourcen wie Hash-Buckets das harte Limit. Beachten Sie außerdem, dass SQL die IO-Concurrency-Ebene der Plattenspeicherschicht nicht immer kennt, insbesondere schnelle Arrays und SSDs. Dadurch können Sie noch mehr Leistung erzielen, indem Sie Tabellen über mehrere Dateien hinweg partitionieren, wodurch SQL gezwungen wird, separate IO-Threads zu verwenden. Wenn also SQL ausgereizt ist, aber der Performance Monitor zeigt, dass Ihr Datenträger über genügend freie Kapazität verfügt, können Sie diesen Ansatz versuchen. – tcwicks

+0

Die Abfragen sind nicht so gut, vor dieser Implementierung dauerte es etwa 45 Minuten, um die Anwendung auszuführen, da es sich um einen Synchronisierungsprozess handelte. Nach dieser Implementierung kam es zu einer enormen Leistungssteigerung von 15 Minuten. Nochmals vielen Dank –