2016-10-11 3 views
2

Wenn ich laufen die Outbound Channel Adapter Beispiel für MQTT es einen Fehler wirft:Wie implementiert man MQTT-Server mit Spring Integration?

Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication 

    . ____   _   __ _ _ 
/\\/___'_ __ _ _(_)_ __ __ _ \ \ \ \ 
(()\___ | '_ | '_| | '_ \/ _` | \ \ \ \ 
\\/ ___)| |_)| | | | | || (_| | )))) 
    ' |____| .__|_| |_|_| |_\__, |//// 
=========|_|==============|___/=/_/_/_/ 
:: Spring Boot ::  (v1.4.1.RELEASE) 

2016-10-11 21:53:36.811 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound) 
2016-10-11 21:53:36.816 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : No active profile set, falling back to default profiles: default 
2016-10-11 21:53:36.960 INFO 2102 --- [   main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy 
2016-10-11 21:53:37.724 INFO 2102 --- [   main] o.s.b.f.config.PropertiesFactoryBean  : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties] 
2016-10-11 21:53:37.729 INFO 2102 --- [   main] o.s.i.config.IntegrationRegistrar  : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 
2016-10-11 21:53:37.933 INFO 2102 --- [   main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 
2016-10-11 21:53:37.947 INFO 2102 --- [   main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 
2016-10-11 21:53:38.143 INFO 2102 --- [   main] o.s.b.f.config.PropertiesFactoryBean  : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties] 
2016-10-11 21:53:38.148 INFO 2102 --- [   main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 
2016-10-11 21:53:38.177 INFO 2102 --- [   main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 
2016-10-11 21:53:38.592 INFO 2102 --- [   main] o.s.s.c.ThreadPoolTaskScheduler   : Initializing ExecutorService 'taskScheduler' 
2016-10-11 21:53:39.064 INFO 2102 --- [   main] o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup 
2016-10-11 21:53:39.077 INFO 2102 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147483648 
2016-10-11 21:53:39.078 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel 
2016-10-11 21:53:39.078 INFO 2102 --- [   main] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 1 subscriber(s). 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : started mqttJavaApplication.mqttOutbound.serviceActivator 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s). 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : started _org.springframework.integration.errorLogger 
2016-10-11 21:53:39.093 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669) 
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) 
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) 
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source) 
    at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27) 
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180) 
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189) 
    at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    ... 19 more 
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) 
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:589) 
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) 
    ... 2 more 
2016-10-11 21:53:39.203 INFO 2102 --- [  Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.spring[email protected]35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy 
2016-10-11 21:53:39.207 INFO 2102 --- [  Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0 
2016-10-11 21:53:39.209 INFO 2102 --- [  Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s). 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : stopped _org.springframework.integration.errorLogger 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147483648 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 0 subscriber(s). 
2016-10-11 21:53:39.211 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : stopped mqttJavaApplication.mqttOutbound.serviceActivator 
2016-10-11 21:53:39.211 INFO 2102 --- [  Thread-1] o.s.j.e.a.AnnotationMBeanExporter  : Unregistering JMX-exposed beans on shutdown 
2016-10-11 21:53:39.212 INFO 2102 --- [  Thread-1] o.s.s.c.ThreadPoolTaskScheduler   : Shutting down ExecutorService 'taskScheduler' 
------------------------------------------------------------------------ 
BUILD FAILURE 

Der Fehler-Code in MqttJavaApplication.java Datei unten kopiert wird:

@SpringBootApplication 
@IntegrationComponentScan 
public class MqttJavaApplication { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = 
       new SpringApplicationBuilder(MqttJavaApplication.class) 
         .web(false) 
         .run(args); 
     MyGateway gateway = context.getBean(MyGateway.class); 
     gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION 
    } 

    @Bean 
    public MqttPahoClientFactory mqttClientFactory() { 
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); 
     factory.setServerURIs("tcp://localhost:1883"); 
     return factory; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "mqttOutboundChannel") 
    public MessageHandler mqttOutbound() { 
     MqttPahoMessageHandler messageHandler = 
         new MqttPahoMessageHandler("testClient", mqttClientFactory()); 
     messageHandler.setAsync(true); 
     messageHandler.setDefaultTopic("testTopic"); 
     return messageHandler; 
    } 

    @Bean 
    public MessageChannel mqttOutboundChannel() { 
     return new DirectChannel(); 
    } 

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") 
    public interface MyGateway { 
     void sendToMqtt(String data); 
    } 
} 

und es scheint sowieso ein MQTT Client zu sein .. .

Wie implementieren Sie einen MQTT-Server (Broker) als Bean für Java Spring Integration, wo bitte anzufangen?

Antwort

6

Die Spring Integration bietet keinen Broker; Es bietet Clients zum Senden/Empfangen von Nachrichten.

+0

Gary, Spring Integration bietet HTTP-Server, also nahm ich an, dass die Erstellung von MQTT-Broker auch möglich wäre? –

+1

Spring Integration stellt __not__ auch keinen HTTP-Server zur Verfügung; Es kann in einem Webcontainer wie Tomcat ausgeführt werden und bietet auf diese Weise HTTP-Endpunkte. –

+0

Sind diese HTTP-Endpunkte "Clients" oder können sie "Server" sein? (Wahrscheinlich eine sehr dumme Frage von mir) –

2

Der obere Teil der Fehlermeldung lautet:

java.net.ConnectException: Connection refused 

Dies bedeutet, dass der Host (localhost) Sie als Makler eingestellt haben, ist entweder nicht als Broker ausgeführt wird oder eine Firewall-Verbindung verhindert, dass aktiviert ist.

Es ist unwahrscheinlich, dass eine Firewall Verbindungen zu localhost blockiert, so dass Sie überprüfen müssen, ob der Broker tatsächlich ausgeführt wird.

Wenn Sie nicht einen Broker haben, dann müssen Sie installieren, ist die einfachste ist wahrscheinlich mosquitto von hier zur Verfügung:

https://mosquitto.org/download/

Andere Broker zur Verfügung stehen und eine Liste finden Sie hier:

https://github.com/mqtt/mqtt.github.io/wiki/servers

+1

'> So implementieren Sie einen MQTT-Server Die Spring Integration bietet keine Brokerfunktionalität. Es ist ein Kunde. –

Verwandte Themen