2017-12-10 3 views
0

Ich entwickle ein Programm, das Nachrichten von einem MQTT-Thema konsumiert, und mein Ziel ist, dass ich mehrere Nachrichten asynchron verarbeiten und verarbeiten kann.Mehrere Nachrichten mit Mqttclient asynchron und gleichzeitig verarbeiten

ich das Eclipse-Clients bin mit: https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttClient.html https://www.eclipse.org/paho/files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html

Das Problem ist, dass mehrere Nachrichten werden nicht zur gleichen Zeit verarbeitet werden, sie sind alle im selben Thread ausgeführt. Ich verstehe den Unterschied zwischen der Verwendung von MqttClient und MqttAsyncClient nicht sehr gut. Die javadoc sagt:

MqttClient

Leichtbau-Client für das Gespräch zu einem MQTT Server unter Verwendung von Methoden, die Block, bis ein Vorgang abgeschlossen ist.

MqttAsyncClient

Leichtbau-Client für das Gespräch zu einem MQTT Server mit nicht-blockierenden Methoden, die einer Operation ermöglichen im Hintergrund laufen zu lassen.

Weder ich habe sehr deutlich den Unterschied zwischen der Verwendung der Methode "subscribe" oder "setCallback". Nur mit „subscribe“ können Sie mehrere Zuhörer erklären: setCallback

einen Rückruf-Listener für Ereignisse verwenden Sets, die asynchron geschehen. abonnieren zu einem Thema abonnieren ...

Es hat versucht, zehn Nachrichten zur gleichen Zeit zu senden.Meine Tests sind die folgenden:

public class FooListener implements IMqttMessageListener { 
    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
     System.out.println("Thread [ " + Thread.currentThread().getName() + 
       "], Topic[ "+ topic + "], Message [" + message +"] "); 
    } 
} 

public class FooCallbackListener implements MqttCallback { 

    @Override 
    public void connectionLost(Throwable e) { 
     e.printStackTrace(); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     //TODO:emtpy 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
     System.out.println("Thread [ " + Thread.currentThread().getName() + 
       "], Topic[ "+ topic + "], Message [" + message +"] "); 
    } 

} 

MqttClient und abonnieren:

public class FooMqttClient { 

    public static void main(String[] args) { 
     MqttConnectOptions connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(true); 
     connOpt.setKeepAliveInterval(30); 
     String serverUri = "tcp://iot.eclipse.org:1883"; 
     String clientId = UUID.randomUUID().toString(); 

     try { 
      MqttClient myClient = new MqttClient(serverUri, clientId); 
      myClient.connect(connOpt); 
      myClient.subscribe("topic/foo", new FooListener()); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Ergebnisse:

Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[topic/foo], Message [Foo 0] 
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 1] 
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 2] 
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 3] 
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 4] 
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 5] 
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 6] 
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 7] 
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 8] 
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 9] 

MqttClient und setCallback:

public class FooMqttCallbackClient { 

    public static void main(String[] args) { 
     MqttConnectOptions connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(true); 
     connOpt.setKeepAliveInterval(30); 
     String serverUri = "tcp://iot.eclipse.org:1883"; 
     String clientId = UUID.randomUUID().toString(); 

     try { 
      MqttClient myClient = new MqttClient(serverUri, clientId); 
      myClient.connect(connOpt); 
      myClient.subscribe("topic/foo"); 
      myClient.setCallback(new FooCallbackListener()); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Ergebnisse:

Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 0] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 1] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 2] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 3] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 4] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 5] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 6] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 7] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 8] 
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 9] 

MqttAsyncClient und abonnieren:

public class FooAsyncMqttClient { 
    public static void main(String[] args) { 
     MqttConnectOptions connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(true); 
     connOpt.setKeepAliveInterval(30); 
     String serverUri = "tcp://iot.eclipse.org:1883"; 
     String clientId = UUID.randomUUID().toString(); 

     try { 
      MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId); 
      myClient.connect(connOpt); 
      Thread.sleep(1000); 
      myClient.subscribe("topic/foo", 1, new FooListener()); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

Ergebnisse:

MqttAsyncClient und setCallback

public class FooAsyncMqttCallbackClient { 

    public static void main(String[] args) { 
     MqttConnectOptions connOpt = new MqttConnectOptions(); 
     connOpt.setCleanSession(true); 
     connOpt.setKeepAliveInterval(30); 
     String serverUri = "tcp://iot.eclipse.org:1883"; 
     String clientId = UUID.randomUUID().toString(); 

     try { 
      MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId); 
      myClient.connect(connOpt); 
      Thread.sleep(1000); 
      myClient.subscribe("topic/foo", 1); 
      myClient.setCallback(new FooCallbackListener()); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    } 
} 

Ergebnisse:

Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 0] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 1] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 2] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 3] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 4] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 5] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 6] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 7] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 8] 
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 9] 

In all meinen Tests werden die Listener in demselben Thread und nicht gleichzeitig ausgeführt. Wie kann ich die Nachrichten gleichzeitig und gleichzeitig verarbeiten? Was ist der Unterschied zwischen MqttClient und MqttAsyncClient?

Lösung:

public class FooExecutorListener implements IMqttMessageListener { 

    private ExecutorService pool = Executors.newFixedThreadPool(10); 

    class MessageHandler implements Runnable { 
     MqttMessage message; 
     String topic; 

     public MessageHandler(String topic, MqttMessage message) { 
      this.message = message; 
      this.topic = topic; 
     } 

     public void run() { 
      System.out.println("Thread [ " + Thread.currentThread().getName() + 
        "], Topic[ "+ topic + "], Message [" + message +"] "); 
     } 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
     pool.execute(new MessageHandler(topic, message)); 
    } 

} 

Ergebnisse:

Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 0] 
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 1] 
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 4] 
Thread [ pool-2-thread-4], Topic[ topic/foo], Message [Foo 3] 
Thread [ pool-2-thread-7], Topic[ topic/foo], Message [Foo 6] 
Thread [ pool-2-thread-6], Topic[ topic/foo], Message [Foo 5] 
Thread [ pool-2-thread-8], Topic[ topic/foo], Message [Foo 7] 
Thread [ pool-2-thread-3], Topic[ topic/foo], Message [Foo 2] 
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 10] 
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 11] 
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 12] 
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 13] 
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 14] 
Thread [ pool-2-thread-9], Topic[ topic/foo], Message [Foo 8] 
Thread [ pool-2-thread-10], Topic[ topic/foo], Message [Foo 9] 

Antwort

1

Der Unterschied zwischen der 2-Version des Clients ist mit Anschluss/Publishing nicht abonniert zu tun. Die Async-Version verbindet und veröffentlicht, ohne zu blockieren.

Die Abonnementbehandlung wird in beiden Fällen auf dem Hintergrundnetzwerkprofil ausgeführt.

Wenn Sie eingehende Nachrichten parallel behandeln möchten, müssen Sie einen eigenen Threadpool implementieren und die eingehenden Nachrichten an den Pool verteilen.

Der einfachste Weg, dies zu tun ist mit Javas ExecutorService Klasse. z.B.

public class FooListener implements IMqttMessageListener { 

    ExecutorService pool = new ExecutorService(10); 

    class MessageHandler implements Runnable { 
    MqttMessage message; 
    String topic; 

    public MessageHandler(String topic; MqttMessage message) { 
     this.message = message; 
    } 

    public void run() { 
     //process message 
    } 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
    pool.execute(new MessageHandler(topic,message)); 
    } 
} 
+0

Danke, diese Implementierung funktioniert. – oscar

Verwandte Themen