2017-05-22 2 views
0

Ich habe ziemlich große CSV-Dateien, die ich Zeile für Zeile ändern/ändern muss (da jede Zeile andere Änderungsregeln erfordern), dann schreibe sie auf eine andere csv mit dem richtigen Formatierung.Lesen, formatieren und dann große CSV-Dateien schreiben

Derzeit habe ich:

import multiprocessing 

def read(buffer): 
    pool = multiprocessing.Pool(4) 
    with open("/path/to/file.csv", 'r') as f: 
     while True: 
      lines = pool.map(format_data, f.readlines(buffer)) 
      if not lines: 
       break 
      yield lines 

def format_data(row): 
    row = row.split(',') # Because readlines() returns a string 
    # Do formatting via list comprehension 
    return row 

def main(): 
    buf = 65535 
    rows = read(buf) 
    with open("/path/to/new.csv",'w') as out: 
     writer = csv.writer(f, lineterminator='\n') 
     while rows: 
      try: 
       writer.writerows(next(rows)) 
      except StopIteration: 
       break 

Auch wenn ich Multiprozessing über map bin mit und Speicherüberlastung mit einem Generator zu verhindern, es nimmt mich immer noch weit über 2 min 40.000 Zeilen zu verarbeiten. Es sollte ehrlich gesagt nicht so viel nehmen. Ich habe sogar eine verschachtelte Liste von den Generatorausgängen erzeugt und versucht, die Daten als eine große Datei auf einmal zu schreiben, eine Stück für Stück-Methode, und es dauert immer noch so lange. Was mache ich hier falsch?

Antwort

0

Ich habe es herausgefunden.

Zuerst war das Problem in meiner format_data() Funktion. Er rief eine Datenbankverbindung an, die bei jeder Ausführung die Datenbankverbindung erstellte und sie bei jeder Iteration schloss.

Ich reparierte es, indem ich eine grundlegende Zuordnung über ein Wörterbuch für eine exponentiell schnellere Nachschlagetabelle erstellte, die Multithreading unterstützt.

So sieht mein Code wie folgt aus:

import multiprocessing 

def read(buffer): 
    pool = multiprocessing.Pool(4) 
    with open("/path/to/file.csv", 'r') as f: 
     while True: 
      lines = pool.map(format_data, f.readlines(buffer)) 
      if not lines: 
       break 
      yield lines 

def format_data(row): 
    row = row.split(',') # Because readlines() returns a string 
    # Do formatting via list comprehension AND a dictionary lookup 
    # vice a database connection 
    return row 

def main(): 
    rows = read(1024*1024) 
    with open("/path/to/new.csv",'w') as out: 
     while rows: 
      try: 
       csv.writer(f, lineterminator='\n').writerows(next(rows)) 
      except StopIteration: 
       break 

ich in der Lage war, eine ~ 150 MB-Datei in weniger als 30 Sekunden zu analysieren. Einige Lektionen, die hier für andere gelernt wurden, um hoffentlich davon zu lernen.

Verwandte Themen