2014-05-13 14 views
6

Ich muss eine Sturmauslauf zum Lesen von Daten von einem Port schreiben. Wollte wissen, ob das logisch möglich war.Storm: Tülle zum Lesen von Daten von einem Port

In diesem Sinne hatte ich eine einfache Topologie für die gleiche mit einem Auslauf und einer Schraube entwickelt. Der Auslauf würde HTTP-Anfragen sammeln, die mit wget gesendet wurden, und die Schraube würde die Anfrage anzeigen. Genau das.

Meine Ausguss Struktur ist wie folgt:

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     SpoutOutputCollector sc; 
     //The socket 
     Socket clientSocket; 
     //The server socket 
     ServerSocket sc; 

     public ProxySpout(int port){ 
      this.sc=new ServerSocket(port); 
      try{ 
       clientSocket=sc.accept(); 
      }catch(IOException ex){ 
       //Handle it 
      } 
     } 

     public void nextTuple(){ 
      try{ 
       InputStream ic=clientSocket.getInputStream(); 
       byte b=new byte[8196]; 
       int len=ic.read(b); 

       sc.emit(new Values(b)); 
       ic.close(); 
      }catch(//){ 
       //Handle it 
      }finally{ 
       clientSocket.close(); 
      } 
     } 
} 

ich den Rest der Methoden auch umgesetzt haben.

Wenn ich drehe diese in eine Topologie und führen Sie es, ich erhalte eine Fehlermeldung, wenn ich die erste Anfrage senden:

java.lang.RuntimeException: java.io.NotSerializableException: java.net.Socket

Ich muss nur wissen, ob etwas mit der Art und Weise, wie ich diesen Auslauf ausführe, nicht stimmt. Ist es sogar möglich, dass ein Auslauf Daten von einem Port sammelt? Oder für eine Tülle, die als Proxy-Instanz fungiert?

bearbeiten

habe es funktioniert.

Der Code ist:

public class ProxySpout extends BaseRichSpout{ 
     //The O/P collector 
     static SpoutOutputCollector _collector; 
     //The socket 
     static Socket _clientSocket; 
     static ServerSocket _serverSocket; 
     static int _port; 

     public ProxySpout(int port){ 
      _port=port; 
     } 

     public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){ 
      _collector=collector; 
      _serverSocket=new ServerSocket(_port); 
     } 

     public void nextTuple(){ 
      _clientSocket=_serverSocket.accept(); 
      InputStream incomingIS=_clientSocket.getInputStream(); 
      byte[] b=new byte[8196]; 
      int len=b.incomingIS.read(b); 
      _collector.emit(new Values(b)); 
    } 
} 

Wie pro @ Shaw Vorschlag, versuchte _serverSocket in der open() Methode initialisiert und die _clientSocket läuft in nextTuple() Verfahren für auf Anfragen zu hören.

Keine Ahnung, die Leistung metrices von diesem, aber es funktioniert .. :-)

Antwort

6

Im Konstruktor nur die Variablen zuweisen. Versuchen Sie, ServerSocket in der prepare-Methode zu instanziieren, schreiben Sie keine neuen ... im Konstruktor. Und Variablen umbenennen, Sie haben zwei SC-Variablen.

public class ProxySpout extends BaseRichSpout{ 

    int port; 

    public ProxySpout(int port){ 
     this.port=port; 
    } 

    @Override 
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 
     //new ServerSocket 
    } 

    @Override 
    public void nextTuple() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 
} 

Wenn Sie setzen es Verfahren in Vorbereitung, dann wird es erst, wenn der Auslauf ist bereits im Einsatz aufgerufen werden, es braucht also nicht serialisiert werden, und es wird nur einmal pro Lebensdauer des Auslaufs genannt werden, Es ist also nicht ineffizient.

+0

Also ist es dann möglich? Um einen Auslauf wie einen Proxy zu machen? –

+4

Ja, aber nextTuple() wird jede kleine Zeit aufgerufen und du musst das verwalten, wenn Tülle nichts bekommt, Fehler ... – gasparms