2015-06-24 3 views
5

Ich habe den folgenden Code in C# geschrieben, aber nach dem würde ich 4-5 Tage brauchen, um die Daten von der Oracle-Datenbank nach Elasticsearch zu migrieren. Ich füge die Datensätze in 100er-Schritten ein. Gibt es eine andere Möglichkeit, die Migration der 4 Millionen Datensätze schneller durchzuführen (wahrscheinlich in weniger als einem Tag, wenn möglich)?Wie fügen Sie 4 Millionen Datensätze von Oracle zur Elasticsearch-Tabelle schneller mit C# ein?

public static void Selection() 
     { 
      for(int i = 1; i < 4000000; i += 1000) 
      { 
       for(int j = i; j < (i+1000); j += 100) 
       { 
        OracleCommand cmd = new OracleCommand(BuildQuery(j), 
                oracle_connection); 
        OracleDataReader reader = cmd.ExecuteReader(); 
        List<Record> list=CreateRecordList(reader); 
        insert(list); 
       } 
      } 
     } 

    private static List<Record> CreateRecordList(OracleDataReader reader) 
     { 
      List<Record> l = new List<Record>(); 
      string[] str = new string[7]; 
      try 
      { 
       while (reader.Read()) 
       { 
        for (int i = 0; i < 7; i++) 
        { 
         str[i] = reader[i].ToString(); 
        } 

        Record r = new Record(str[0], str[1], str[2], str[3],        
           str[4], str[5], str[6]); 
        l.Add(r); 
       } 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
      return l; 
     } 

    private static string BuildQuery(int from) 
     { 
      int to = from + change - 1; 
      StringBuilder builder = new StringBuilder(); 
      builder.AppendLine(@"select * from"); 
      builder.AppendLine("("); 
      builder.AppendLine("select FIELD_1, FIELD_2, 
      FIELD_3, FIELD_4, FIELD_5, FIELD_6, 
      FIELD_7, "); 
      builder.Append(" row_number() over(order by FIELD_1) 
      rn"); 
      builder.AppendLine(" from tablename"); 
      builder.AppendLine(")"); 
      builder.AppendLine(string.Format("where rn between {0} and {1}", 
      from, to)); 
      builder.AppendLine("order by rn"); 
      return builder.ToString(); 
     } 

    public static void insert(List<Record> l) 
     { 
      try 
      { 
       foreach(Record r in l) 
        client.Index<Record>(r, "index", "type"); 
      } 
      catch (Exception er) 
      { 
       string msg = er.Message; 
      } 
     } 
+0

Ersetzen 'client.Index' mit 'client.IndexMany (..)' und versuchen Sie herauszufinden, optimale Chunk-Größe für Bulk einfügen https://www.elastic.co/guide/en/elasticsearch/guide/current/bulk.html#_how_big_is_too_big – Rob

+1

* würde es Nimm mich 4-5 Tage * .. hast du ran und sehen, ob es wirklich 4/5 Tage dauert, 4M Reihen zu migrieren? – Rahul

+2

Die 'ROW_NUMBER()' Funktion wird sich negativ auf die Leistung auswirken, und Sie führen es tausende Male aus. Sie verwenden bereits einen 'OracleDataReader' - er wird nicht alle vier Millionen Zeilen gleichzeitig auf Ihren Rechner laden, sondern im Prinzip einen oder mehrere gleichzeitig streamen. Sie sollten stattdessen eine einzige Abfrage haben, und während Sie Ihre 'Record' Objekte erstellen, alle 100 oder 500 oder 1000 (zB behalten Sie einen' count', der jede Schleife inkrementiert), committen Sie sie (zB bei 'count% 500 == 0'). Dies muss in Minuten oder Stunden machbar sein, nicht in Tagen. –

Antwort

4

Die ROW_NUMBER() Funktion negativ auf die Leistung auswirken wird, und du bist es tausende Male ausgeführt wird. Sie verwenden bereits eine - es wird nicht alle vier Millionen Zeilen auf einmal auf Ihre Maschine übertragen, es wird im Prinzip nur eine oder mehrere gleichzeitig gestreamt.

Dies muss in Minuten oder Stunden, nicht Tagen, machbar sein - wir haben mehrere Prozesse, die Millionen von Datensätzen zwischen einem Sybase- und SQL-Server in ähnlicher Weise verschieben und es dauert weniger als fünf Minuten.

Vielleicht ein Schuss:

OracleCommand cmd = new OracleCommand("SELECT ... FROM TableName", oracle_connection); 
int batchSize = 500;  
using (OracleDataReader reader = cmd.ExecuteReader()) 
{ 
    List<Record> l = new List<Record>(batchSize); 
    string[] str = new string[7]; 
    int currentRow = 0; 

    while (reader.Read()) 
    { 
     for (int i = 0; i < 7; i++) 
     { 
      str[i] = reader[i].ToString(); 
     } 

     l.Add(new Record(str[0], str[1], str[2], str[3], str[4], str[5], str[6])); 

     // Commit every time batchSize records have been read 
     if (++currentRow == batchSize) 
     { 
      Commit(l); 
      l.Clear(); 
      currentRow = 0; 
     } 
    } 

    // commit remaining records 
    Commit(l); 
} 

Hier ist, was Commit aussehen könnte:

public void Commit(IEnumerable<Record> records) 
{ 
    // TODO: Use ES's BULK features, I don't know the exact syntax 

    client.IndexMany<Record>(records, "index", "type"); 
    // client.Bulk(b => b.IndexMany(records))... something like this 
} 
+0

Vielen Dank für Ihre Hilfe! Ich habe die ES Bulk-Funktionen wie erwähnt verwendet. Es begann mit dem Einfügen von Datensätzen mit sehr hoher Geschwindigkeit, aber nach einiger Zeit gab es den Argument kann nicht leer Fehler sein. Ich habe die Frage entsprechend bearbeitet. –

+0

@AakritiMittal: Sie sollten Ihre Frage nicht überschreiben - wenn Sie ein neues Problem haben, stellen Sie bitte eine neue Frage (beziehen Sie sich auf diese, wenn Sie möchten). Dies ist eine Frage-und-Antwort-Website, und wenn Sie Ihre Frage ändern, machen Sie andere Antworten ungültig. Ich habe Ihre Änderungen zurückgesetzt. –

+0

Sorry wegen der Überschreibung, aber ich habe diesen Fehler behoben.Ich möchte Sie auch fragen, wie Ihre Anwendung Millionen von Datensätzen in weniger als 5 Minuten zwischen einem Sybase- und SQL-Server verschoben hat. Meine neue Anwendung benötigt noch 2 Tage, um die 4 Millionen von Oracle nach Elasticsearch zu verschieben, da sie eine große Anzahl von Spalten in der Tabelle hat. –

3

Aber Sie einfügen nicht in Chargen von 100
Am Ende Sie ein einfügen an eine Zeit
(und das ist möglicherweise nicht einmal der richtige Code, um einen einzufügen)

foreach(Record r in l) 
    client.Index<Record>(r, "index", "type"); 

All diese girations beim Lesen nichts zu tun, wenn der Einsatz eine Zeile zu einem Zeitpunkt
Sie ist ist nur Verzögerung einzuführen, während Sie das das nächste Batch
lesen ist (fast) immer schneller als

schreiben
OracleCommand cmd = new OracleCommand(BuildQuery(all), oracle_connection); 
OracleDataReader reader = cmd.ExecuteReader(); 
while (reader.Read()) 
{ 
    client.Index<Record>(new Record(reader.GetSting(0), 
         reader.GetSting(1), reader.GetSting(2), reader.GetSting(3),  
         reader.GetSting(4), reader.GetSting(5), reader.GetSting(6), 
         "index", "type"); 
} 
reader.Close(); 

Sie könnten eine Blocking verwenden, wenn Sie parallel
Aber verwenden, um eine maximale Größe nicht zu lesen möchten, lesen und schreiben bekommt weit voraus Schreib

Verwandte Themen