2015-01-25 4 views
7

Ich versuche, einen Ereignisstrom von der Kubernetes API mit dem requests-Modul zur Verfügung gestellt zu konsumieren. Ich habe in, was wie ein Pufferungsproblem scheint: das requests Modul scheint um ein Ereignis zu verzögern.Streaming-HTTP-Antwort mit Python "Anfragen" Bibliothek lesen

Ich habe Code, der etwa wie folgt aussieht:

r = requests.get('http://localhost:8080/api/v1beta1/watch/services', 
       stream=True) 

for line in r.iter_lines(): 
    print 'LINE:', line 

Als Kubernetes Ereignisbenachrichtigungen sendet, wird dieser Code nur Anzeige das letzte Ereignis emittiert wird, wenn ein neues Ereignis kommt, das es fast macht völlig nutzlos für Code, der auf den Dienst antworten muss Ereignisse hinzufügen/löschen.

Ich habe dies durch curl in einem Subprozess Laichen gelöst statt mit die requests Bibliothek:

p = subprocess.Popen(['curl', '-sfN', 
         'http://localhost:8080/api/watch/services'], 
        stdout=subprocess.PIPE, 
        bufsize=1) 

for line in iter(p.stdout.readline, b''): 
    print 'LINE:', line 

Dies funktioniert, aber auf Kosten einer gewissen Flexibilität. Gibt es eine Möglichkeit, dieses Pufferproblem mit der Bibliothek requests zu vermeiden?

Antwort

5

Dieses Verhalten ist aufgrund einer fehlerhaften Implementierung der iter_lines -Methode in der requests-Bibliothek.

iter_lines iteriert über den Antwortinhalt in chunk_size Blöcke von Daten mit dem Iterator. Wenn es weniger als chunk_size Bytes von Daten für von dem entfernten Server Lesen (die in der Regel der Fall sein wird, wenn die letzte Zeile des Ausgangs Lesen), wird der Lesevorgang blockiert, bis chunk_size Bytes Daten verfügbar sind.

Ich habe meine eigene iter_lines Routine geschrieben, die richtig arbeitet:

import os 


def iter_lines(fd, chunk_size=1024): 
    '''Iterates over the content of a file-like object line-by-line.''' 

    pending = None 

    while True: 
     chunk = os.read(fd.fileno(), chunk_size) 
     if not chunk: 
      break 

     if pending is not None: 
      chunk = pending + chunk 
      pending = None 

     lines = chunk.splitlines() 

     if lines and lines[-1]: 
      pending = lines.pop() 

     for line in lines: 
      yield line 

    if pending: 
     yield(pending) 

für einen Puffer zu füllen Das funktioniert, weil os.read kehren weniger als chunk_size Bytes von Daten, anstatt zu warten.

+0

Es kann argumentiert werden, welche Implementierung korrekt ist - Ihre wird einen falschen "logischen Zeilenumbruch" einfügen, wenn mehr Daten verfügbar sind. Der richtige Ansatz scheint darin zu bestehen, die Gesamtgröße der Daten herauszufinden (wobei eine eine Voraussetzung für TCP-Kommunikationen ist) und nur partielles Lesen am bekannten Ende zu verwenden. –

+0

Ich glaube nicht, dass Sie argumentieren können, dass die bestehende Implementierung korrekt ist. Meins wurde nicht strengen Tests unterzogen, aber es funktioniert sicherlich besser. Eine korrektere Implementierung - idealerweise als Upstream-Patch - wäre sehr nützlich. – larsks

+0

@ivan_pozdeev * "Der richtige Ansatz scheint darin zu bestehen, die Gesamtgröße der Daten herauszufinden (eine Angabe ist eine Voraussetzung für die TCP-Kommunikation)" * - Nein, TCP ist ein * Stream * und könnte unendlich lang sein. Ich bin mir nicht sicher, wo Sie das gehört haben, aber es ist grundsätzlich falsch. –