2016-06-13 8 views
1

Ich brauche wirklich eine Antwort auf diese Frage, weshalb ich es bearbeite.Java Qpid Proton - ActiveMQ-Broker Kann angeforderte Adresse nicht zuweisen: bind

Ich habe einen Apache ActiveMQ Broker in meiner Verbindung aufgebaut mit diesem Code

Broker.java

public class Broker {

private BrokerService broker; 

public Broker(String connector) { 
    this.broker = new BrokerService(); 
    this.broker.setUseJmx(true); 
    try { 
     this.broker.addConnector(connector); 

    } catch (URISyntaxException e) { 
     e.printStackTrace(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public void addConnector(String connector){ 
    try { 
     this.broker.addConnector(connector); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public void start() { 
    try { 
     this.broker.start(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
} 

public BrokerService getBroker() { 
    return broker; 
} 

public void setBroker(BrokerService broker) { 
    this.broker = broker; 
} 

}

Hier ist mein Problem

Ich benutze die Qpid Proton-Bibliothek (hier verfügbar: Qpid Proton). Ich habe eine Klasse, Daten zu lesen, die fast das Beispiel ist sie Ihnen auf qpid Webiste

package messaging; 

import java.io.IOException; 

import org.apache.qpid.proton.Proton; 
import org.apache.qpid.proton.amqp.messaging.AmqpValue; 
import org.apache.qpid.proton.engine.BaseHandler; 
import org.apache.qpid.proton.engine.Delivery; 
import org.apache.qpid.proton.engine.Event; 
import org.apache.qpid.proton.engine.Receiver; 
import org.apache.qpid.proton.message.Message; 
import org.apache.qpid.proton.reactor.FlowController; 
import org.apache.qpid.proton.reactor.Handshaker; 

public class AMQPSubscriber extends BaseHandler { 

    private String broker; 
    private String topic; 
    private String port; 

    public AMQPSubscriber(String broker, String port, String topic) { 
     this.broker = broker; 
     this.port = port; 
     this.topic = topic; 
     this.add(new Handshaker()); 
     this.add(new FlowController()); 

    } 

    @Override 
    public void onReactorInit(Event event) { 
     try { 
      event.getReactor().acceptor(broker, Integer.parseInt(port), new AMQPSubscriber(broker, port, topic)); 
     } catch (NumberFormatException e) { 
      e.printStackTrace(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onDelivery(Event event) { 
     System.out.println("---------Message Received--------"); 
     Receiver recv = (Receiver) event.getLink(); 
     Delivery delivery = recv.current(); 
     if (delivery.isReadable() && !delivery.isPartial()) { 
      int size = delivery.pending(); 
      byte[] buffer = new byte[size]; 
      int read = recv.recv(buffer, 0, buffer.length); 
      recv.advance(); 

      Message msg = Proton.message(); 
      msg.decode(buffer, 0, read); 
      System.out.println("Subject : " + msg.getProperties().getSubject()); 
      System.out.println("Text : " + ((AmqpValue) msg.getBody()).getValue()); 
     } 
    } 

} 

Diese Klasse im Haupt genannt wird:

public static void main (String[]args) throws IOException, TimeoutException, InterruptedException{ 

    Broker broker = new Broker("amqp://" + host + ":" + AMQPport); 
    broker.start(); 

AMQPSubscriber receiv = new AMQPSubscriber(host, "5672", topic); 
     Reactor r; 
     try { 
      r = Proton.reactor(receiv); 
      r.run(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 


} 

Aber wenn ich diesen Code ausführen, bekomme ich ein

INFO | Loaded the Bouncy Castle security provider. 
INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB] 
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi 
INFO | KahaDB is version 6 
INFO | Recovering from the journal @1:61115 
INFO | Recovery replayed 11 operations from the journal in 0.014 seconds. 
INFO | PListStore:[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\tmp_storage] started 
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) is starting 
INFO | Listening for connections at: amqp://127.0.0.1:5672 
INFO | Connector amqp://127.0.0.1:5672 started 
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) started 
INFO | For help or more information please see: http://activemq.apache.org 
WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb 
WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb 
java.net.BindException: Address already in use: bind 
    at sun.nio.ch.Net.bind0(Native Method) 
    at sun.nio.ch.Net.bind(Unknown Source) 
    at sun.nio.ch.Net.bind(Unknown Source) 
    at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source) 
    at java.nio.channels.ServerSocketChannel.bind(Unknown Source) 
    at org.apache.qpid.proton.reactor.impl.AcceptorImpl.<init>(AcceptorImpl.java:102) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.acceptor(ReactorImpl.java:477) 
    at messaging.AMQPSubscriber.onReactorInit(AMQPSubscriber.java:33) 
    at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:209) 
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108) 
    at org.apache.qpid.proton.engine.impl.EventImpl.delegate(EventImpl.java:129) 
    at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:114) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:275) 
    at org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:343) 
    at messaging.Main.main(Main.java:98) 

Dieser Broker funktioniert gut, wenn ich MQTT und Paho verwenden, ich möchte, dass es auch mit AMQP funktioniert. Ich weiß, Bindung bedeutet, dass der Port bereits verwendet wird, aber ich kann nicht herausfinden, wie ich an einem Port hören konnte, wo keine Daten gesendet werden.

Vielen Dank für Ihre Hilfe.

Alexi

+0

ist '192.168.100.47' angeblich Ihre lokale IP? Wenn ja, bist du * sicher *? Überprüfen Sie auch Ihre 'hosts' -Datei, um zu sehen, ob es einen Eintrag für' localhost' gibt, der nicht '127.0.0.1' ist. – Bohemian

+0

Diese IP ist die Serveradresse, die ein anderer Computer im Netzwerk ist. –

Antwort

0

Ich habe eine Lösung gefunden.

Wenn ich einen Connector zum integrierten Broker activeMQ hinzufüge, fügt er ihn als TCP hinzu, der nur jeweils eine Verbindung zulässt.

So erstelle ich einen Stecker als UDP wie folgt aus: broker.addConnector("udp://"+host+":"+AMQPport);

Diese Lösung arbeitet für mich Ich hoffe, es anderen Entwicklern in der Zukunft helfen können.

Prost, Alexi

0

Sie diesen Fehler erhalten, wenn es eine andere Anwendung, die einen Server-Socket lauscht auf den Zielport erstellt hat anzubieten. Sie müssen überprüfen, ob auf diesem Port keine weitere Broker-Instanz ausgeführt wird oder ob eine Firewall vorhanden ist, die den Zugriff blockiert.

+0

Ich habe 'netstat -a -n -o | versucht finde "5672" '' und wenn mein Programm beendet ist, habe ich nichts mehr übrig. Ich habe auch meine Firewall überprüft, die Verbindungen darauf erlauben. Und der einzige Prozess, der diesen Port in meiner App verwendet, ist mein eingebetteter Broker, den ich für den AMQP-Transport unbedingt benötige. Ich schätze, der Fehler ist irgendwo anders –

Verwandte Themen