2015-06-30 3 views
7

Ich habe mehrere große Dateien (> 5M Zeilen von Daten), die nach einem eindeutigen Zeitstempel sortiert sind. Alle Dateien enthalten praktisch alle dieselben Zeitstempel mit Ausnahme einer Handvoll zufällig fehlender Zeilen (< 1000). Ich möchte die Daten aus allen Dateien effizient zu einem einzigen Datensatz mit einer Zeile pro Zeitstempel verbinden, vorzugsweise mit einem Generator.Python 3 Join Daten von großen Dateien, die sortiert sind

Mit Ausnahme der fehlenden Zeilen, konnte ich nur zip verwenden:

def get_data(list_of_iterables): 
    for data in zip(*list_of_iterables): 
     yield data 

Da es jedoch einige fehlende Zeilen, ich brauche zippen die Daten über Zeitstempel zu verbinden, anstatt einfach. Ich kann einfach alle Zeilen ignorieren, die keine übereinstimmenden Zeitstempel in jeder Datei haben.

Gibt es eine pythonische Möglichkeit, diese Funktionalität in ein paar Zeilen zu implementieren?

Mein Ansatz wäre es, jedes iterable nacheinander zu beschleunigen, bis sein Zeitstempel nicht länger als der maximale Zeitstempel für die Gruppe der iterablen Elemente ist. Wenn alle Zeitstempel übereinstimmen, ergeben Sie eine Zeile und verschieben Sie alle Iterablen. Aber die Logik scheint chaotisch zu sein, wenn ich versuche, diesen Ansatz zu implementieren.

Bearbeiten: Leistung.

Die Implementierung muss Zeilen zurückgeben, ohne zuerst alle Daten in den Speicher einzulesen. Es dauert eine Weile, alle Daten zu lesen, und oft müssen nur die ersten paar Zeilen untersucht werden.

+1

bitte fügen Sie einige Beispieldaten hinzu – synner

+0

Ich würde vorschlagen, dass Sie sich Pandas ansehen (http://pandas.pydata.org/). Es hat Tools, um genau diese Art von Zusammenführung zu tun, die Sie erwähnen. – Ivan

+0

@Ivan Ich bemerkte die Pandas-Bibliothek, als ich dieses Problem googelte. Die Bibliothek sieht für viele der Operationen, die ich mache, sehr nützlich aus. Ich verlasse mich momentan stark auf Numpy, aber ich werde Pandas untersuchen, wenn ich etwas Zeit habe. – RandomBits

Antwort

0

Meine erste Schätzung wäre, ein Wörterbuch mit Zeitstempeln als Schlüssel und den Rest der Daten in den Zeilen als Werte zu verwenden, und dann für jede Zeile in jeder Datei nur dann zum Wörterbuch hinzuzufügen, wenn ein Element denselben Zeitstempel hat (Schlüssel) ist nicht schon vorhanden.

Wenn Sie jedoch wirklich mit riesigen Datensätzen zu tun haben (in diesem Fall scheint es, als ob Sie in diesem Fall sind), dann wäre der Ansatz, den Sie in Ihrer ursprünglichen Frage erwähnen, die beste Option.

+0

Ich kann sehen, wie das funktionieren würde, aber scheint mein Kommentar zur Leistung, da diese Methode scheint, als müsste es zuerst alle Daten in den Speicher lesen. – RandomBits

+0

@RandomBits OK, danke für die zusätzlichen Informationen. Ich kann mir keinen besseren Weg vorstellen, um das Problem zu lösen als den, den Sie in Ihrer ursprünglichen Frage gegeben haben. –

0

ok, ich habe mich für das Problem interessiert (hatte in letzter Zeit ein ähnliches Problem) und habe ein wenig daran gearbeitet. Sie könnten versuchen, etwas wie folgt:

import io 
import datetime 
from csv import DictReader 

file0 = io.StringIO('''timestamp,data 
2015-06-01 10:00, data00 
2015-06-01 11:00, data01 
2015-06-01 12:00, data02 
2015-06-01 12:30, data03 
2015-06-01 13:00, data04 
''') 

file1 = io.StringIO('''timestamp,data 
2015-06-01 09:00, data10 
2015-06-01 10:30, data11 
2015-06-01 11:00, data12 
2015-06-01 12:30, data13 
''') 

class Data(object): 

    def __init__(self): 
     self.timestamp = None 
     self.data = None 

    @staticmethod 
    def new_from_dict(dct=None): 
     if dct is None: 
      return None 
     ret = Data() 
     ret.data = dct['data'].strip() 
     ret.timestamp = datetime.datetime.strptime(dct['timestamp'], 
                '%Y-%m-%d %H:%M') 
     return ret 

    def __lt__(self, other): 
     if other is None: 
      return False 
     return self.timestamp < other.timestamp 

    def __gt__(self, other): 
     if other is None: 
      return False 
     return self.timestamp > other.timestamp 

    def __str__(self): 
     ret = '{0.__class__.__name__}'.format(self) +\ 
       '(timestamp={0.timestamp}, data={0.data})'.format(self) 
     return ret 


def next_or_none(reader): 
    try: 
     return Data.new_from_dict(next(reader)) 
    except StopIteration: 
     return None 


def yield_in_order(reader0, reader1): 

    data0 = next_or_none(reader0) 
    data1 = next_or_none(reader1) 

    while not data0 == data1 == None: 

     if data0 is None: 
      yield None, data1 
      data1 = next_or_none(reader1) 
      continue 
     if data1 is None: 
      yield data0, None 
      data0 = next_or_none(reader0) 
      continue 

     while data0 < data1: 
      yield data0, None 
      data0 = next_or_none(reader0) 

     while data0 > data1: 
      yield None, data1 
      data1 = next_or_none(reader1) 

     if data0 is not None and data1 is not None: 
      if data0.timestamp == data1.timestamp: 
       yield data0, data1 
       data0 = next_or_none(reader0) 
       data1 = next_or_none(reader1) 

csv0 = DictReader(file0) 
csv1 = DictReader(file1) 

FMT = '{!s:50s} | {!s:50s}' 
print(FMT.format('file0', 'file1')) 
print(101*'-') 
for dta0, dta1 in yield_in_order(csv0, csv1): 
    print(FMT.format(dta0, dta1)) 

Dies ist nur für 2 Dateien.

1

Ich landete Schreiben Sie den folgenden Code, mein Problem zu lösen, das leichter zu sein, stellte sich heraus, als ich erwartet hatte:

def advance_values(iters): 
    for it in iters: 
     yield next(it) 

def align_values(iters, values, key): 
    for it, value in zip(iters, values): 
     while (value[0],value[1]) < key: 
      value = next(it) 
     yield value 

def merge_join(*iters): 
    values = list(advance_values(iters)) 
    while True: 
     if len(values) != len(iters): 
      return 
     tms = [(v[0],v[1]) for v in values] 
     max_tm = max(tms) 
     if all((v[0],v[1]) == max_tm for v in values): 
      yield values 
      values = list(advance_values(iters)) 
     else: 
      values = list(align_values(iters, values, max_tm)) 
+0

Entschuldigung. habe meine Antwort aktualisiert und jetzt nur gesehen, dass du selbst eine Lösung gepostet hast ... –

1

Wenn jeder iterable in list_of_iterables von timestamp sortiert wird, dann könnten Sie heapq.merge() verwenden sie zu fusionieren unter Berücksichtigung möglicher Lücken in den Daten und itertools.groupby() zu Gruppendatensätzen mit dem gleichen Datenstand:

from heapq import merge 
from itertools import groupby 
from operator import attrgetter 

for timestamp, group in groupby(merge(*list_of_iterables), 
           key=attrgetter('timestamp')): 
    print(timestamp, list(group)) # same timestamp 

die Umsetzung Gruppen, ohne zu lesen, alle Daten in mich ergibt mory zuerst.

Verwandte Themen