2012-06-06 13 views
14

Wie implementiere ich einen FIFO-Puffer, dem ich beliebig große Brocken von Bytes zum Kopf hinzufügen kann und von dem ich beliebig große Stücke von Bytes aus dem Schwanz effizient platzen kann ?Effiziente FIFO-Warteschlange für beliebig große Byte-Stücke in Python

Hintergrund:

Ich habe eine Klasse, die Bytes aus dateiähnlichen Objekten in Stücke beliebiger Größe liest und ist selbst eine dateiähnliche Objekt, von dem die Kunden die Bytes in Stücke beliebiger Größe lesen kann. Die Art, wie ich dies implementiert habe, ist, dass, wann immer ein Client einen Teil der Bytes lesen will, die Klasse wiederholt von den zugrunde liegenden dateiähnlichen Objekten liest (mit Chunk-Größen, die für diese Objekte geeignet sind) und die Bytes zu den Bytes hinzufügt Kopf einer FIFO-Warteschlange, bis genügend Bytes in der Warteschlange vorhanden sind, um dem Client einen Teil der angeforderten Größe zu liefern. Dann werden diese Bytes vom Ende der Warteschlange entfernt und an den Client zurückgegeben.

Ich habe ein Leistungsproblem, das auftritt, wenn die Chunk-Größe für die zugrundeliegenden dateiähnlichen Objekte viel größer ist als die Chunk-Größe, die Clients beim Lesen aus der Klasse verwenden.

Angenommen, die Chunk-Größe für die zugrunde liegenden dateiähnlichen Objekte beträgt 1 MiB, und die Chunk-Größe, mit der der Client liest, beträgt 1 KiB. Wenn der Client zum ersten Mal 1 KiB anfordert, muss die Klasse 1 MiB lesen und zur FIFO-Warteschlange hinzufügen. Dann muss die Klasse für diese Anfrage und die nachfolgenden 1023 Anfragen 1 KiB aus dem Ende der FIFO-Warteschlange holen, die allmählich in der Größe von 1 MiB auf 0 Bytes abnimmt, zu welcher Zeit der Zyklus erneut beginnt.

Ich habe dies derzeit mit einem StringIO-Objekt implementiert. Das Schreiben neuer Bytes am Ende des StringIO-Objekts ist schnell, aber das Entfernen von Bytes vom Anfang an ist sehr langsam, da ein neues StringIO-Objekt erstellt werden muss, das eine Kopie des gesamten vorherigen Puffers minus dem ersten Byte enthält.

SO Fragen, die mit ähnlichen Problemen beschäftigen, neigen dazu, auf den Container zu verweisen. Deque wird jedoch als doppelt verknüpfte Liste implementiert. Das Schreiben eines Chunks in die Deque würde das Teilen des Chunks in Objekte erfordern, die jeweils ein einzelnes Byte enthalten. Die Deque würde dann zwei Zeiger zu jedem Objekt zum Speichern hinzufügen, was wahrscheinlich die Speicheranforderungen um mindestens eine Größenordnung im Vergleich zu den Bytes erhöht. Außerdem würde es lange dauern, die verknüpfte Liste zu durchlaufen und jedes Objekt zu behandeln, um Stücke in Objekte aufzuteilen und Objekte in Stücke zu verbinden.

Antwort

13

ich dies derzeit mit einem StringIO Objekt implementiert haben. Das Schreiben neuer Bytes an das Ende des StringIO-Objekts ist schnell, aber das Entfernen von Bytes von Anfang an ist sehr langsam, weil ein neues StringIO-Objekt, das eine Kopie des gesamten vorherigen Puffers abzüglich des ersten Chunks von Bytes enthält erstellt werden.

eigentlich die typische Art und Weise FIFO der Implementierung ist zwei Verwendung Wrap-around-Puffer mit zwei Zeigern als solche:

enter image description hereimage source

Nun, Sie, dass mit StringIO() mit .seek() implementieren lesen/schreibe von einem geeigneten Ort.

+1

Ooh, +1 für den Wraparound. Daran hatte ich nicht gedacht. Sie müssen jedoch die maximale Größe im Voraus wissen; tatsächlich, ich nehme an, es könnte nach Bedarf angebaut werden ... – Cameron

+0

Danke! Das sieht perfekt aus. Ich habe ein Experiment mit StringIO durchgeführt, das anzeigt, dass es sich automatisch ausdehnt, um dies zu berücksichtigen. Wenn beispielsweise die aktuelle Größe des StringIO-Objekts 10 Byte und PUTPT (der Suchort) den Index 5 aufweist, wird beim Schreiben eines 20-Byte-Chunks das StringIO-Objekt automatisch auf 25 Byte erweitert, wobei die ersten 5 Byte beibehalten und der Rest überschrieben wird. Wenn GETPT jedoch nach PUTPT ist, ist etwas mehr Logik erforderlich. –

+0

Ich habe diese Idee in meiner Antwort unten implementiert. Prost! – Cameron

3

Können Sie annehmen, irgendetwas über die erwarteten Lese-/Schreib-Mengen?

Chunking die Daten in, zum Beispiel, 1024 Byte Fragmente und mit deque [1] könnte dann besser funktionieren; Sie könnten nur N volle Blöcke lesen, dann einen letzten Teil, um ihn zu teilen und den Rest zurück auf den Anfang der Warteschlange zu setzen.

1) collections.deque

class collections.deque([iterable[, maxlen]])

Gibt ein neues Deque-Objekt initialisiert links-nach-rechts (APPEND()) mit den Daten aus iterable. Wenn iterable nicht angegeben ist, ist die neue Deque leer.

Deques sind eine Verallgemeinerung von Stacks und Queues (der Name wird "Deck" ausgesprochen und ist die Abkürzung für "double-ended queue"). Deques unterstützt thread-safe, speichereffiziente Attends und Pops von jeder Seite der Deque mit ungefähr der gleichen O (1) -Leistung in beiden Richtungen. ...

9

aktualisieren: Hier ist eine Implementierung der kreisförmigen Puffertechnik von vartec's answer (aufbauend auf meiner ursprünglichen Antwort, unten für diejenigen neugierig erhalten):

from cStringIO import StringIO 

class FifoFileBuffer(object): 
    def __init__(self): 
     self.buf = StringIO() 
     self.available = 0 # Bytes available for reading 
     self.size = 0 
     self.write_fp = 0 

    def read(self, size = None): 
     """Reads size bytes from buffer""" 
     if size is None or size > self.available: 
      size = self.available 
     size = max(size, 0) 

     result = self.buf.read(size) 
     self.available -= size 

     if len(result) < size: 
      self.buf.seek(0) 
      result += self.buf.read(size - len(result)) 

     return result 


    def write(self, data): 
     """Appends data to buffer""" 
     if self.size < self.available + len(data): 
      # Expand buffer 
      new_buf = StringIO() 
      new_buf.write(self.read()) 
      self.write_fp = self.available = new_buf.tell() 
      read_fp = 0 
      while self.size <= self.available + len(data): 
       self.size = max(self.size, 1024) * 2 
      new_buf.write('0' * (self.size - self.write_fp)) 
      self.buf = new_buf 
     else: 
      read_fp = self.buf.tell() 

     self.buf.seek(self.write_fp) 
     written = self.size - self.write_fp 
     self.buf.write(data[:written]) 
     self.write_fp += len(data) 
     self.available += len(data) 
     if written < len(data): 
      self.write_fp -= self.size 
      self.buf.seek(0) 
      self.buf.write(data[written:]) 
     self.buf.seek(read_fp) 

durch die ursprüngliche Antwort (ersetzt oben):

Sie können einen Puffer verwenden und den Startindex verfolgen (Dateizeiger lesen) und ihn gelegentlich komprimieren, wenn er zu l wird groß (dies sollte eine ziemlich gute amortisierte Leistung erbringen).

Zum Beispiel wickeln ein StringIO Objekt wie folgt:

from cStringIO import StringIO 
class FifoBuffer(object): 
    def __init__(self): 
     self.buf = StringIO() 

    def read(self, *args, **kwargs): 
     """Reads data from buffer""" 
     self.buf.read(*args, **kwargs) 

    def write(self, *args, **kwargs): 
     """Appends data to buffer""" 
     current_read_fp = self.buf.tell() 
     if current_read_fp > 10 * 1024 * 1024: 
      # Buffer is holding 10MB of used data, time to compact 
      new_buf = StringIO() 
      new_buf.write(self.buf.read()) 
      self.buf = new_buf 
      current_read_fp = 0 

     self.buf.seek(0, 2) # Seek to end 
     self.buf.write(*args, **kwargs) 

     self.buf.seek(current_read_fp) 
+3

+1 Das ist großartig. Danke für die vollständige Umsetzung. –

+0

@Roger: Kein Problem. Ich dachte, es könnte eines Tages nützlich sein ;-) – Cameron

+0

Nur aus Neugier, ist es schneller? –

Verwandte Themen