2017-07-13 4 views
0

Ich versuche, ein einfaches Thema "foo" von einem Eclipse Paho MQTT-Client zu abonnieren.Eclipse Kapua Broker: Nicht autorisiert, Thema zu abonnieren

Der Broker wird von Eclipse Kapua verwaltet und ist über tcp: // localhost: 1883 mit den Zugangsdaten "kapua-broker" und "kapua-password" zugänglich.

Ich VERöFFENTLICHE Wert auf diese Weise:

send(new Payload.Builder().put("testKey","testVal"),"foo");

Dieser sendet im Grunde eine Karte ("TestKey", "testVal") mit Thema "foo". Um zu diesem Thema zu abonnieren, ich habe den folgenden Code (host = "localhost", port = 1883):

String topic = "foo"; 
    String broker = "tcp://"+host+":"+Integer.toString(port); 
    String clientId = "supply-chain-control-simulation-listener"; 
    String username = "kapua-broker"; 
    String password = "kapua-password"; 

    try { 
     MqttClient client = new MqttClient(broker, clientId); 
     MqttConnectOptions connOpts = new MqttConnectOptions(); 
     connOpts.setCleanSession(true); 
     connOpts.setUserName(username); 
     connOpts.setPassword(password.toCharArray()); 
     connOpts.setCleanSession(true); 
     logger.info("Connecting to broker: "+broker); 
     client.setCallback(new MqttCallback() { 
      @Override 
      public void connectionLost(Throwable throwable) { 
       logger.info("Subscriptions stopped"); 
      } 

      @Override 
      public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
       logger.info(s); 
       logger.info(mqttMessage.getPayload().toString()); 
      } 

      @Override 
      public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

      } 
     }); 
     client.connect(connOpts); 
     if (client.isConnected()) 
      logger.info("Connected"); 
     else 
      logger.error(client.getDebug().toString()); 
     client.subscribe(topic,2); 
    } catch(MqttException me) { 
     logger.error("reason "+me.getReasonCode()); 
     logger.error("msg "+me.getMessage()); 
     logger.error("loc "+me.getLocalizedMessage()); 
     logger.error("cause "+me.getCause()); 
     logger.error("excep "+me); 
     me.printStackTrace(); 
    } 

Die Verbindung funktioniert, aber die Abonnements gibt diesen Fehler:

15:40 : 03.240 [ActiveMQ NIO Worker 0] WARN oekbcpKapuaSecurityBrokerFilter - Benutzer 1: kapua-broker (Supply-Chain-Control-Simulation-Listener - tcp: //172.17.0.1: 40888 - conn id 1734706196170193882) ist nicht zum Lesen autorisiert: Thema: //VirtualTopic.foo

Antwort

1

In Kapua können Sie gemäß Ihrer Benutzerberechtigung veröffentlichen/abonnieren.

Wenn Ihre Benutzer nur broker:connect Berechtigung hat, können Sie Publish/Subscribe nur zum Thema:

{account-name}/{connectionClientId}/{semanticTopic} 

In Ihrem speziellen Fall sollten Sie Publish/Subscribe zum Thema:

kapus-sys/supply-chain-control-simulation-listener/foo 

kapua-sys ist das Konto Name, zu dem der Benutzer kapua-broker gehört, , während supply-chain-control-simulation-listener die clientId ist, die zum Erstellen der Verbindung verwendet wird.

Bitte beachten Sie, dass der Benutzername für die Verbindung und der Kontoname zwei verschiedene Dinge in Kapua sind. Ein Konto hat mehrere Benutzer.

1

Do not subscribe sofort connect nach dem Aufruf, sondern diesen Anruf in den connectComplete Rückruf bewegen:

IMqttAsyncClient client = new MqttAsyncClient(broker, clientId); 
MqttConnectOptions connOpts = new MqttConnectOptions(); 
connOpts.setCleanSession(true); 
connOpts.setUserName(username); 
connOpts.setPassword(password.toCharArray()); 
connOpts.setCleanSession(true); 
logger.info("Connecting to broker: "+broker); 
client.setCallback(new MqttCallbackExtended() { 
    @Override 
    public void connectComplete(boolean reconnect, String brokerAddress) { 
     logger.info("Connected"); 
     client.subscribe(topic,2); 
    } 
    @Override 
    public void connectionLost(Throwable throwable) { 
     logger.info("Subscriptions stopped"); 
    } 

    @Override 
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
     logger.info(s); 
     logger.info(mqttMessage.getPayload().toString()); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 

    } 
}); 
client.connect(connOpts); 

Das hieß, Ihre Fehler wahrscheinlich aus dem MQTT Broker kommen Sie verwenden, und Sie müssen es so konfigurieren, den Zugang zu diesem Thema ermöglichen .

+1

Danke für Ihre Antwort. Du hast recht, es ist sauberer, aber leider bekomme ich immer noch den gleichen Fehler. Das Problem ist, dass ich den Broker nicht kontrolliere, es ist ein Teil des Kapua-Projekts, und um es zu benutzen, starte ich einfach einen Andock-Container. –

Verwandte Themen