1

Ich bin völlig neu in Netzwerk-Programmierung und Event-Driven-Events. Ich konnte jedoch ein Pub-Sub-Schema erfolgreich implementieren, indem ich eine TCP-Verbindung zwischen meinem Computer (Server) und den Client-Maschinen zum Testen (Befehlszeile) nutzte. Allerdings muss ich eigentlich einen UNIX-Socket mit Twisted verwenden.Unbehandelter Fehler in Deferred? Mit UNIX-Socket mit Twisted

Ich erhalte die folgende Fehlermeldung, wenn der Code ausgeführt wird:

Unhandled error in Deferred

Deferred Error

Hier ist mein Code für pub_sub.py:

""" 
a networking implementation of PubSub using Twisted. 
============= 
PubSub Server 
============= 
A PubSub server listens for subscription requests and publish commands, and, when 
published to, sends data to subscribers. All incoming and outgoing requests are 
encoded in JSON. 
A Subscribe request looks like this: 
    { 
     "command": "subscribe", 
     "topic": "hello" 
    } 
A Publish request looks like this: 
    { 
     "command": "publish", 
     "topic": "hello", 
     "data": { 
      "world": "WORLD" 
     } 
    } 
When the server receives a Publish request, it will send the 'data' object to all 
subscribers of 'topic'. 
""" 

import argparse 
import json 
import logging 

from collections import defaultdict 

from twisted.internet import reactor 
from twisted.python import log 
from twisted.python.filepath import FilePath 
from twisted.internet.endpoints import UNIXClientEndpoint, UNIXServerEndpoint, \ 
             connectProtocol 
from twisted.internet.protocol import Protocol, Factory 


class PubSubProtocol(Protocol): 

    def __init__(self, topics): 
     self.topics = topics 
     self.subscribed_topic = None 

    def connectionLost(self, reason): 
     print("Connection lost: {}".format(reason)) 
     if self.subscribed_topic: 
      self.topics[self.subscribed_topic].remove(self) 

    def dataReceived(self, data): 
     print("Data received: {}".format(data)) 
     try: 
      request = json.loads(data) 
     except ValueError: 
      logging.debug("ValueError on deconding incoming data. " 
          "Data: {}".format(data), exc_info=True) 
      self.transport.loseConnection() 
      return 

     if request['command'] == 'subscribe': 
      self.handle_subscribe(request['topic']) 
     elif request['command'] == 'publish': 
      self.handle_publish(request['topic'], request['data']) 

    def handle_subscribe(self, topic): 
     print("Subscribed to topic: {}".format(topic)) 
     self.topics[topic].add(self) 
     self.subscribed_topic = topic 

    def handle_publish(self, topic, data): 
     request = json.dumps(data) 

     for protocol in self.topics[topic]: 
      protocol.transport.write(request) 
     print("Publish sent for topic: {}".format(topic)) 


class PubSubFactory(Factory): 

    def __init__(self): 
     self.topics = defaultdict(set) 

    def buildProtocol(self, addr): 
     return PubSubProtocol(self.topics) 


class PublisherProtocol(Protocol): 
    """ 
    Publish protocol for sending data to client, i.e. front-end web GUI. 
    """ 
    def __init__(self, topic, **kwargs): 
     self.topic = topic 
     self.kwargs = kwargs 

    def connectionMade(self): 
     request = json.dumps({ 
      'command': 'publish', 
      'topic': self.topic, 
      'data': self.kwargs, 
     }) 

     self.transport.write(request) 
     self.transport.loseConnection() 


class SubscriberProtocol(Protocol): 
    """ 
    Subscriber protocol for client sending a request to subscribe to a specific 
    topic. 
    """ 
    def __init__(self, topic, callback): 
     self.topic = topic 
     self.callback = callback 

    def connectionMade(self): 
     request = json.dumps({ 
      'command': 'subscribe', 
      'topic': self.topic, 
     }) 

     self.transport.write(request) 

    def dataReceived(self, data): 
     kwargs = json.loads(data) 

     self.callback(**kwargs) 


class PubSub(object): 

    def __init__(self, path='./.sock'): 
     self.path = FilePath(path) 
     self.reactor = reactor 

    def _make_connection(self, protocol): 
     endpoint = UNIXClientEndpoint(reactor, self.path) 
     connection = connectProtocol(endpoint, protocol) 

    def subscribe(self, topic, callback): 
     """ 
     Subscribe 'callback' callable to 'topic'. 
     """ 
     sub = SubscriberProtocol(topic, callback) 
     self._make_connection(sub) 

    def publish(self, topic, **kwargs): 
     """ 
     Publish 'kwargs' to 'topic', calling all callables subscribed to 'topic' 
     with the arguments specified in '**kwargs'. 
     """ 
     pub = PublisherProtocol(topic, **kwargs) 
     self._make_connection(pub) 

    def run(self): 
     """ 
     Convenience method to start the Twisted event loop. 
     """ 
     self.reactor.run() 


def main(): 
    path = FilePath("./.sock") 
    endpoint = UNIXServerEndpoint(reactor, path) 
    endpoint.listen(PubSubFactory()) 

    reactor.run() 


if __name__ == '__main__': 
    main() 

Jede Hilfe sehr zu schätzen würde, was ich falsch mache.

Danke,

Brian

Antwort

1

Sie scheinen Ihre Software unter Windows ausgeführt werden. Leider sind UNIX-Sockets unter Windows nicht verfügbar. Wenn Sie UNIX-Sockets verwenden möchten, müssen Sie eine POSIX-ish-Umgebung verwenden - Linux, * BSD, OS X usw.

+0

Vielen Dank! Das habe ich letzte Nacht bemerkt und habe den gleichen Code auf meiner Linux-Maschine ausgeführt und alles hat gut funktioniert. Ich schätze Ihre Hilfe :) – Brian

Verwandte Themen