2009-06-24 11 views
8

Ich wollte einen Server schreiben, mit dem sich ein Client verbinden und regelmäßige Updates erhalten konnte, ohne abfragen zu müssen. Das Problem, das ich mit asyncore erlebt habe, ist, dass, wenn Sie nicht true zurückgeben, wenn dispatcher.writable() aufgerufen wird, Sie warten müssen, bis das asyncore.loop abgelaufen ist (Standard ist 30s).Pythons Asyncore, um Daten periodisch über ein Variablen-Timeout zu senden. Gibt es einen besseren Weg?

Die zwei Möglichkeiten, die ich versucht habe, um dies zu umgehen, ist 1) reduzieren Timeout auf einen niedrigen Wert oder 2) Abfragen Verbindungen für wann sie nächste Aktualisierung und generieren einen angemessenen Timeout-Wert. Wenn Sie jedoch auf "Select Law" in "man 2 select_tut" verweisen, heißt es: "Sie sollten immer select() ohne Timeout verwenden."

Gibt es einen besseren Weg, dies zu tun? Twisted vielleicht? Ich wollte versuchen, zusätzliche Threads zu vermeiden. Ich werde die Variable Timeout Beispiel hier ist:

#!/usr/bin/python 

import time 
import socket 
import asyncore 


# in seconds 
UPDATE_PERIOD = 4.0 

class Channel(asyncore.dispatcher): 

    def __init__(self, sock, sck_map): 
     asyncore.dispatcher.__init__(self, sock=sock, map=sck_map) 
     self.last_update = 0.0 # should update immediately 
     self.send_buf = '' 
     self.recv_buf = '' 

    def writable(self): 
     return len(self.send_buf) > 0 

    def handle_write(self): 
     nbytes = self.send(self.send_buf) 
     self.send_buf = self.send_buf[nbytes:] 

    def handle_read(self): 
     print 'read' 
     print 'recv:', self.recv(4096) 

    def handle_close(self): 
     print 'close' 
     self.close() 

    # added for variable timeout 
    def update(self): 
     if time.time() >= self.next_update(): 
      self.send_buf += 'hello %f\n'%(time.time()) 
      self.last_update = time.time() 

    def next_update(self): 
     return self.last_update + UPDATE_PERIOD 


class Server(asyncore.dispatcher): 

    def __init__(self, port, sck_map): 
     asyncore.dispatcher.__init__(self, map=sck_map) 
     self.port = port 
     self.sck_map = sck_map 
     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.bind(("", port)) 
     self.listen(16) 
     print "listening on port", self.port 

    def handle_accept(self): 
     (conn, addr) = self.accept() 
     Channel(sock=conn, sck_map=self.sck_map) 

    # added for variable timeout 
    def update(self): 
     pass 

    def next_update(self): 
     return None 


sck_map = {} 

server = Server(9090, sck_map) 
while True: 
    next_update = time.time() + 30.0 
    for c in sck_map.values(): 
     c.update() # <-- fill write buffers 
     n = c.next_update() 
     #print 'n:',n 
     if n is not None: 
      next_update = min(next_update, n) 
    _timeout = max(0.1, next_update - time.time()) 

    asyncore.loop(timeout=_timeout, count=1, map=sck_map) 
+0

Nick: Was ist die kleine Änderung, damit es funktioniert? Könnten Sie den Code eingeben? Danke –

Antwort

4

Das „select Gesetz“ gilt nicht für Ihren Fall, da Sie nicht nur Client-triggered haben (reinen Server) Aktivitäten, sondern auch zeitgesteuerte Aktivitäten - Genau dafür ist das Select-Timeout vorgesehen. Was das Gesetz wirklich sagen sollte ist, "wenn Sie eine Zeitüberschreitung angeben, stellen Sie sicher, dass Sie etwas Nützliches tun müssen, wenn die Zeitüberschreitung eintritt". Das Gesetz soll vor vielbeschäftigtem Warten schützen; Ihr Code ist nicht beschäftigt - warten Sie.

Ich würde _timeout nicht auf das Maximum von 0,1 und die nächste Aktualisierungszeit setzen, sondern auf das Maximum von 0,0 und das nächste Timeout. IOW, wenn ein Aktualisierungszeitraum abgelaufen ist, während Sie Updates durchgeführt haben, sollten Sie dieses spezielle Update sofort durchführen.

Anstatt jedes Mal nach jedem Kanal zu fragen, ob es aktualisiert werden soll, können Sie alle Kanäle in einer Prioritätswarteschlange (sortiert nach nächster Aktualisierungszeit) speichern und dann nur für die ältesten Kanäle aktualisieren (bis Sie eine finden, deren Aktualisierungszeit ist nicht angekommen). Sie können das heapq-Modul dafür verwenden.

Sie können auch einige Systemaufrufe speichern, indem Sie nicht jeden Kanal nach der aktuellen Uhrzeit fragen, sondern nur einmal die aktuelle Uhrzeit abfragen und an update übergeben.

1

Ich würde verwenden Verdreht, seit langer Zeit benutzte ich asyncore aber ich denke, das ist das verdrehte gleichwertig sein sollte (aus dem Gedächtnis nicht getestet, geschrieben):

from twisted.internet import reactor, protocol 
import time 

UPDATE_PERIOD = 4.0 

class MyClient(protocol.Protocol): 

    def connectionMade(self): 
     self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update) 

    def connectionLost(self, reason): 
     self.updateCall.cancel() 

    def update(self): 
     self.transport.write("hello %f\n" % (time.time(),)) 

    def dataReceived(self, data): 
     print "recv:", data 


f = protocol.ServerFactory() 
f.protocol = MyClient 

reactor.listenTCP(9090, f) 
reactor.run() 
+0

+1: mit verdrehten Code wird lesbar und einfach zu pflegen. – nosklo

+0

Code funktioniert (ziemlich gut, wenn aus dem Speicher!), Aber eine kleine Änderung benötigt, um reactor.callLater() in der update() Methode aufzurufen, um das nächste Update zu senden. Andernfalls erhalten Sie nur eine Nachricht und die updateCall.cancel() wird bei der Trennung fehlschlagen. Mein einziges Problem ist, dass Twist eine zusätzliche Abhängigkeit hinzufügt, aber ich muss dies gegen tatsächliche Produktivität und Lesbarkeit abwägen. –

4

Vielleicht sched.scheduler dies mit Ihnen zu tun, wie diese (nb nicht getestet):

import sched, asyncore, time 

# Create a scheduler with a delay function that calls asyncore.loop 
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time())) 

# Add the update timeouts with scheduler.enter 
# ... 

def _poll_loop(timeout, start_time): 
    asyncore.loop(timeout, count=1) 
    finish_time = time.time() 
    timeleft = finish_time - start_time 
    if timeleft > timeout: # there was a message and the timeout delay is not finished 
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket 

def main_loop(): 
    while True: 
    if scheduler.empty(): 
     asyncore.loop(30.0, count=1) # just default timeout, use what suits you 
     # add other work that might create scheduled events here 
    else: 
     scheduler.run() 
+0

Während diese Antwort schön ist, werden Sie bald in einem RuntimeError mit der Rekursionstiefe in _poll_loop laufen. Schreibe es besser ohne Rekursion für reale Beispiele. ;-) –

2

Dies ist im Grunde Demiurgus 'Lösung mit den Ecken und Kanten rund gemacht. Er behält seine Grundidee bei, verhindert aber RuntimeErrors und Busy Loops und wird getestet. [Edit: behobenen Probleme mit Modifizieren die Scheduler während _delay]

class asynschedcore(sched.scheduler): 
    """Combine sched.scheduler and asyncore.loop.""" 
    # On receiving a signal asyncore kindly restarts select. However the signal 
    # handler might change the scheduler instance. This tunable determines the 
    # maximum time in seconds to spend in asycore.loop before reexamining the 
    # scheduler. 
    maxloop = 30 
    def __init__(self, map=None): 
     sched.scheduler.__init__(self, time.time, self._delay) 
     if map is None: 
      self._asynmap = asyncore.socket_map 
     else: 
      self._asynmap = map 
     self._abort_delay = False 

    def _maybe_abort_delay(self): 
     if not self._abort_delay: 
      return False 
     # Returning from this function causes the next event to be executed, so 
     # it might be executed too early. This can be avoided by modifying the 
     # head of the queue. Also note that enterabs sets _abort_delay to True. 
     self.enterabs(0, 0, lambda:None,()) 
     self._abort_delay = False 
     return True 

    def _delay(self, timeout): 
     if self._maybe_abort_delay(): 
      return 
     if 0 == timeout: 
      # Should we support this hack, too? 
      # asyncore.loop(0, map=self._asynmap, count=1) 
      return 
     now = time.time() 
     finish = now + timeout 
     while now < finish and self._asynmap: 
      asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap, 
          count=1) 
      if self._maybe_abort_delay(): 
       return 
      now = time.time() 
     if now < finish: 
      time.sleep(finish - now) 

    def enterabs(self, abstime, priority, action, argument): 
     # We might insert an event before the currently next event. 
     self._abort_delay = True 
     return sched.scheduler.enterabs(self, abstime, priority, action, 
             argument) 

    # Overwriting enter is not necessary, because it is implemented using enter. 

    def cancel(self, event): 
     # We might cancel the next event. 
     self._abort_delay = True 
     return sched.scheduler.cancel(self, event) 

    def run(self): 
     """Runs as long as either an event is scheduled or there are 
     sockets in the map.""" 
     while True: 
      if not self.empty(): 
       sched.scheduler.run(self) 
      elif self._asynmap: 
       asyncore.loop(self.maxloop, map=self._asynmap, count=1) 
      else: 
       break 
+0

Habe nur begrenzte Tests gemacht, scheint aber super zu funktionieren! – konrad

0

Vielleicht verstehe ich nicht, was die OP zu erreichen versucht, aber ich gelöst gerade dieses Problem mit 1 Thread, der eine WeakRef jeden Kanals wird (asyncore.dispatcher) Objekt. Dieser Thread bestimmt sein eigenes Timing und sendet dem Kanal periodisch eine Aktualisierung unter Verwendung einer Warteschlange in diesem Kanal. Er ruft die Warteschlange vom Channel-Objekt ab, indem er getQueue aufruft.

Der Grund, warum ich eine Schwachstelle verwende, ist, weil Clients vorübergehend sind. Wenn der Kanal abstirbt, gibt die Schwachstelle None zurück. Auf diese Weise behält der Timing-Thread alte Objekte nicht am Leben, weil er sie referenziert.

Ich weiß, dass das OP Gewinde vermeiden wollte, aber diese Lösung ist sehr einfach.Es erstellt immer nur einen Thread und spricht mit allen Channels, die erstellt werden, wenn das Server-Objekt sie der Threads-Liste der zu überwachenden Objekte hinzufügt.

Verwandte Themen