1

Ich habe eine SQL-Tabelle Accounts in SQL Server, wo Daten in Bezug auf Datum sortiert ist. Die Tabelle enthält 20 Spalten einschließlich AccountId.
Ich möchte die Datensätze für jeden Tag (Rund 200K Datensätze) lesen. Auf diese Weise müsste ich die Daten von 6 Monaten für jeden Tag lesen.So lesen Sie eine große Anzahl von Datensätzen aus SQL-Tabelle in Stapeln mit Task Parallele Bibliothek

Was ich tun muss, ist Datensätze aus Accounts Tabelle für 6 Monate Daten abzurufen. Also plante ich meinen Code, um die Daten von SQL Server Accounts Tabelle innerhalb einer Do While-Schleife für jeden Tag zu bekommen.

Jetzt besteht jeder Tag aus dem Abrufen von 200K-Datensätzen aus der Datenbank. Also breche ich diese 200K-Datensätze für einen Tag in Chargen (sagen wir 10000 oder 20000 Datensätze in einem Lesevorgang, was ungefähr 10 Chargen von Datensätzen für einen Tag ergibt). Sobald ich diese 10k oder 20k Datensätze bekomme, möchte ich diese Werte aus der Datenbank holen und diese in eine CSV-Datei konvertieren und die CSV-Datei an einen Ort exportieren.

Jetzt ist mein Problem, dass diese Prozedur zu viel Zeit in Anspruch nimmt (ca. 50 Minuten für das Abrufen von Datensätzen für einen Tag und ich muss die Datensätze für 6 Monate Daten abrufen. So können Sie sich vorstellen, wie viel Zeit es dauern wird).

Ich denke, TPL verwenden, um den Code und die Verarbeitung in Aufgaben zu brechen, aber nicht sicher, wie es geht.

Bitte schlagen Sie vor, wie verwende ich Task parallele Bibliothek, um die Leistung zu verbessern, so dass ich leicht 6 Monate Daten erhalten kann.

My C# Code sieht wie folgt:

public void Main() 
{ 
    do 
    { 
     done = true; 
     var accountsTableRecors = ReadsDatabaseForADay(lastId); 
     foreach (var accountsHistory in accountsTableRecors) 
     { 
      if (accountsHistory.accountsId != null) lastId = (long)accountsHistory.accountsId; 
      done = false; 
      recordCount++; 
     } 
     var flatFileDataList = ProcessRecords(); 
    } while (!done); 
} 

ProcessRecords Das obige Verfahren in Main() parst einige XML konvertiert und an abgerufenen Daten in CSV.

private IEnumerable<AccountsTable> ReadsDatabaseForADay(long lastId) 
{ 
    var transactionDataRecords = DatabaseHelper.GetTransactions(lastId, 10000); 
    var accountsTableData = transactionDataRecords as IList<AccountsTable> ?? transactionDataRecords.ToList(); 
    ListAccountsTable.AddRange(accountsTableData); 
    return accountsTableData; 
} 

DatabaseHelperClass:

internal static IEnumerable<AccountsTable> GetTransactions(long lastTransactionId, int count) 
{ 
    const string sql = "SELECT TOP(@count) [accounts_id],[some_columns],[some_other_columns]. .....[all_other_columns] " 
        + "FROM AccountsTable WHERE [accounts_id] > @LastTransactionId AND [creation_dt] > DATEADD(DAY,-1, GETDATE())" + 
        " ORDER BY [accounts_id]"; 
    return GetData(connection => 
    { 
     var command = new SqlCommand(sql, connection); 
     command.Parameters.AddWithValue("@count", count); 
     command.Parameters.AddWithValue("@LastTransactionId", lastTransactionId); 
     return command; 
    }, DataRecordToTransactionHistory); 
} 

private static IEnumerable<T> GetData<T>(Func<SqlConnection, SqlCommand> commandBuilder, Func<IDataRecord, T> dataFunc) 
{ 
    using (var connection = GetOpenConnection()) 
    { 
     using (var command = commandBuilder(connection)) 
     { 
      command.CommandTimeout = 0; 
      using (IDataReader reader = command.ExecuteReader()) 
      { 
       while (reader.Read()) 
       { 
        var record = dataFunc(reader); 
        yield return record; 
       } 
      } 
     } 
    } 
} 
+0

Sollten Sie dies nicht mit etwas wie SSIS anstelle von Code tun? Das Verwenden von Tasks löst das Problem möglicherweise nicht, da Sie den Server möglicherweise überlasten. Sind Sie nach unten gegangen, um zu sehen, ob die Abfragen mithilfe von Indizes usw. optimiert sind? Oder können Sie den Export in CSV auf eine ganz andere Art und Weise angehen? –

+0

Sicher sollte ich Peter, aber hier ist das Problem zweifach. Die Daten befinden sich auf dem Produktionsserver, auf den ich keinen direkten Zugriff habe, und zweitens gibt es viele Spalten, die nach dem Abrufen eine Datenmanipulation benötigen. Daher wählte ich den Code Weg –

+0

Holen 200k Zeilen sollte Sekunden dauern nicht eine Stunde, so dass Sie das Problem beheben müssen, bevor Sie beginnen, TPL zu werfen. – Phil

Antwort

0

Ihr Code hat in es einige Probleme, was zu einer schlechten Performance führen kann. Zunächst erstellen Sie jedes Mal eine Liste von IEnumerable, wenn Sie einen weiteren Stapel von Account-Tabellen erhalten. List<T> verwendet intern ein Array, das standardmäßig zum großen Objekt-Heap wechselt, und dieser Heap wird nicht häufig gecallt, sodass Sie nach einiger Zeit Probleme mit dem Speicher und der Leistung haben werden. Das zweite Problem ist, dass Ihr Code stark I/O-orientiert ist, aber Sie verwenden nicht die Methoden async, so dass Ihr Thread blockiert wird, indem Sie auf die Antwort der Datenbank warten. Sie sollten auf async Version wechseln und die Blockierungsmethoden nur innerhalb des Main Einstiegspunkts verwenden, aber in diesem Fall wären Sie nicht in der Lage yield Ihre Ergebnisse, da async Task<IEnumerable> kein Iterator sein kann und Sie Ihre DataRecordToTransactionHistory Funktion bearbeiten müssen oder erstellen Sie eine Liste lokal in Ihrem GetData Funktion:

private static async Task GetData(Func<SqlConnection, SqlCommand> commandBuilder, 
    Action<IDataRecord> dataFunc) 
{ 
    using (var connection = new SqlConnection()) 
    { 
     await connection.OpenAsync(); 
     using (var command = commandBuilder(connection)) 
     { 
      command.CommandTimeout = 0; 
      using (var reader = await command.ExecuteReaderAsync()) 
      { 
       while (await reader.ReadAsync()) 
       { 
        dataFunc(reader); 
       } 
      } 
     } 
    } 
} 
// OR 
private static async Task<IEnumerable<T>> GetData<T>(Func<SqlConnection, SqlCommand> commandBuilder, 
    Func<IDataRecord, T> dataFunc) 
{ 
    using (var connection = new SqlConnection()) 
    { 
     await connection.OpenAsync(); 
     using (var command = commandBuilder(connection)) 
     { 
      command.CommandTimeout = 0; 
      using (var reader = await command.ExecuteReaderAsync()) 
      { 
       // linked list to avoid allocation of an array 
       var result = new LinkedList<T>(); 
       while (await reader.ReadAsync()) 
       { 
        result.AddLast(dataFunc(reader)); 
       } 
       return result; 
      } 
     } 
    } 
} 

jedoch beide Optionen ihre Nachteile.Was mich betrifft, sollten Sie einen Versuch der TPL Dataflow Bibliothek geben Pipeline wie dies bei der Verarbeitung:

// default linking options 
var options = new DataflowLinkOptions { PropagateCompletion = true }; 

// store all the ids you need to process 
var idBlock = new BufferBlock<long>(); 

// transform id to list of AccountsTable 
var transformBlock = new TransformBlock<long, Task<IEnumerable<AccountsTable>>>(async id => 
{ 
    return await ReadsDatabaseForADay(id); 
}); 

// connect the blocks between each other 
idBlock.LinkTo(transformBlock, options); 

// flatten the already completed task to enumerable 
var flattenBlock = new TransformManyBlock<Task<IEnumerable<AccountsTable>>, AccountsTable>(async tables => 
{ 
    return (await tables).Select(t => t); 
}); 
transformBlock.LinkTo(flattenBlock, options); 

// gather results in batches of 10000 
var batchBlock = new BatchBlock<AccountsTable>(10000); 
flattenBlock.LinkTo(batchBlock); 

// export resulting array to csv file 
var processor = new ActionBlock<AccountsTable[]>(a => 
{ 
    ExportToCSV(a); 
}); 

// populate the pipeline with needed ids 
foreach (var id in GetAllIds()) 
{ 
    // await the adding for each id 
    await idBlock.SendAsync(id); 
} 
// notify the pipeline that all the data has been sent 
idBlock.Complete(); 

// await whole ids to be processed 
await processor.Completion; 

TPL Dataflow in Thread-Pool standardmäßig ausführt und nutzt alle Vorteile davon. Sie können die Blöcke with MaxDegreeOfParrallelism anpassen, so dass es mehr als eine ID gleichzeitig verarbeiten kann.

Also sollten Sie async Methoden verwenden und Sie sollten nicht zu viele Pufferlisten/Arrays nur zum Speichern der Daten erstellen. Stellen Sie Ihre Pipeline zusammen, um die vollen Vorteile von Iteratoren und async/await Funktionen zu nutzen.

Verwandte Themen