Ich brauche wirklich eine Antwort auf diese Frage, weshalb ich es bearbeite.Java Qpid Proton - ActiveMQ-Broker Kann angeforderte Adresse nicht zuweisen: bind
Ich habe einen Apache ActiveMQ Broker in meiner Verbindung aufgebaut mit diesem Code
Broker.java
public class Broker {
private BrokerService broker;
public Broker(String connector) {
this.broker = new BrokerService();
this.broker.setUseJmx(true);
try {
this.broker.addConnector(connector);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public void addConnector(String connector){
try {
this.broker.addConnector(connector);
} catch (Exception e) {
e.printStackTrace();
}
}
public void start() {
try {
this.broker.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public BrokerService getBroker() {
return broker;
}
public void setBroker(BrokerService broker) {
this.broker = broker;
}
}
Hier ist mein Problem
Ich benutze die Qpid Proton-Bibliothek (hier verfügbar: Qpid Proton). Ich habe eine Klasse, Daten zu lesen, die fast das Beispiel ist sie Ihnen auf qpid Webiste
package messaging;
import java.io.IOException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
public class AMQPSubscriber extends BaseHandler {
private String broker;
private String topic;
private String port;
public AMQPSubscriber(String broker, String port, String topic) {
this.broker = broker;
this.port = port;
this.topic = topic;
this.add(new Handshaker());
this.add(new FlowController());
}
@Override
public void onReactorInit(Event event) {
try {
event.getReactor().acceptor(broker, Integer.parseInt(port), new AMQPSubscriber(broker, port, topic));
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onDelivery(Event event) {
System.out.println("---------Message Received--------");
Receiver recv = (Receiver) event.getLink();
Delivery delivery = recv.current();
if (delivery.isReadable() && !delivery.isPartial()) {
int size = delivery.pending();
byte[] buffer = new byte[size];
int read = recv.recv(buffer, 0, buffer.length);
recv.advance();
Message msg = Proton.message();
msg.decode(buffer, 0, read);
System.out.println("Subject : " + msg.getProperties().getSubject());
System.out.println("Text : " + ((AmqpValue) msg.getBody()).getValue());
}
}
}
Diese Klasse im Haupt genannt wird:
public static void main (String[]args) throws IOException, TimeoutException, InterruptedException{
Broker broker = new Broker("amqp://" + host + ":" + AMQPport);
broker.start();
AMQPSubscriber receiv = new AMQPSubscriber(host, "5672", topic);
Reactor r;
try {
r = Proton.reactor(receiv);
r.run();
} catch (IOException e) {
e.printStackTrace();
}
}
Aber wenn ich diesen Code ausführen, bekomme ich ein
INFO | Loaded the Bouncy Castle security provider.
INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB]
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO | KahaDB is version 6
INFO | Recovering from the journal @1:61115
INFO | Recovery replayed 11 operations from the journal in 0.014 seconds.
INFO | PListStore:[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\tmp_storage] started
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) is starting
INFO | Listening for connections at: amqp://127.0.0.1:5672
INFO | Connector amqp://127.0.0.1:5672 started
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
java.net.BindException: Address already in use: bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at java.nio.channels.ServerSocketChannel.bind(Unknown Source)
at org.apache.qpid.proton.reactor.impl.AcceptorImpl.<init>(AcceptorImpl.java:102)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.acceptor(ReactorImpl.java:477)
at messaging.AMQPSubscriber.onReactorInit(AMQPSubscriber.java:33)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:209)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.engine.impl.EventImpl.delegate(EventImpl.java:129)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:114)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:275)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:343)
at messaging.Main.main(Main.java:98)
Dieser Broker funktioniert gut, wenn ich MQTT und Paho verwenden, ich möchte, dass es auch mit AMQP funktioniert. Ich weiß, Bindung bedeutet, dass der Port bereits verwendet wird, aber ich kann nicht herausfinden, wie ich an einem Port hören konnte, wo keine Daten gesendet werden.
Vielen Dank für Ihre Hilfe.
Alexi
ist '192.168.100.47' angeblich Ihre lokale IP? Wenn ja, bist du * sicher *? Überprüfen Sie auch Ihre 'hosts' -Datei, um zu sehen, ob es einen Eintrag für' localhost' gibt, der nicht '127.0.0.1' ist. – Bohemian
Diese IP ist die Serveradresse, die ein anderer Computer im Netzwerk ist. –