2016-06-12 4 views
1

Ich kann OpenNMS nicht dazu bringen, Nachrichten an einen AMQP-Endpunkt zu senden. Ich habe noch nie funktioniert und das ist das erste Mal, dass ich OpenNMS und AMQP benutzt habe, so dass meine Unerfahrenheit darunter leiden könnte.OpenNMS v18 AMQP Nachricht Senden Problem

Ich habe RabbitMQ 3.5.7 konfiguriert und getestet, wie dies question. Es funktioniert gut, wenn Sie einen externen QPID 0.32-Client verwenden und funktioniert auch gut, wenn Sie entweder Python oder Perl verwenden. Definition von funktioniert gut ist, dass die Kommunikation hergestellt wird und die Nachricht Payload in eine Börse übertragen und dann in eine Back-End-Warteschlange geliefert wird. Die Nachricht kann dann in der RabbitMQ Admin-GUI angezeigt werden.

In OpenNMS habe ich die Anweisungen here mit dem EventForwarder gefolgt und dann versuchte der AlarmNorthbounder beide eine NullPointerException folgen.

stelle ich Karaf wie folgt die Eigenschaften einstellen, diese Aussagen mit: -

opennms> config:edit org.opennms.features.amqp.alarmnorthbounder 
opennms> propset connectionUrl amqp://simon:[email protected]/test?brokerlist=\'localhost:5672\' 
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }" 
opennms> propset processorName default-alarm-northbounder-processor 
opennms> config:update 
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)' 
---------------------------------------------------------------- 
Pid:   org.opennms.features.amqp.alarmnorthbounder 
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0 
Properties: 
    connectionUrl = amqp://simon:[email protected]/test?brokerlist='localhost:5672' 
    destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} } 
    felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg 
    processorName = default-alarm-northbounder-processor 
    service.pid = org.opennms.features.amqp.alarmnorthbounder 

Ich erhalte die folgende Fehlermeldung in den Protokollen

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[forwardAlarm  ] [forwardAlarm  ] [seda://forwardAlarm               ] [   8] 
[forwardAlarm  ] [convertBodyTo3 ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm]     ] [   0] 
[forwardAlarm  ] [log3    ] [log                   ] [   1] 
[forwardAlarm  ] [bean3    ] [bean[ref:dynamicallyTrackedProcessor]           ] [   0] 
[forwardAlarm  ] [to3    ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }     ] [   7] 

Exchange 
--------------------------------------------------------------------------------------------------------------------------------------- 
Exchange[ 
     Id     ID-ubuntu-1604-35241-1465752556843-2-60 
     ExchangePattern  InOnly 
     Headers    {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0} 
     BodyType   String 
     Body    NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1] 
] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
java.lang.NullPointerException 
     at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 

Meine Erwartung ist, dass es in der Lage sein sollte, Übermittle die Nachricht an die Warteschlange mit denselben Bibliotheken, die ich extern verwende.

Putting DEBUG auf für org.apache.qpid i erhalten: -

2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254... 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]} 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,726 INFO org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:[email protected] 
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null] 
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ] 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: [email protected] 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1 

Es die Sitzung schließt, bevor alle Informationen zu schreiben.

Wenn ich tun, im Wesentlichen dasselbe von einem externen qpid Client - wie folgt ausgeführt: -

#!/bin/bash 

rm log.out 
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7. 
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \ 
    org.apache.qpid.example.ListSender 

ich diese: -

142 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
146 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
162 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
164 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelOpenOkBody] 
165 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [BasicQosOkBodyImpl: ] 
173 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR 
177 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 0... 
178 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
180 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms3'/'Simon'; { 
    'create': 'always', 
    'node': { 
    'type': 'topic' 
    } 
} 
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms3'/'Simon'; { 
    'create': 'always', 
    'node': { 
    'type': 'topic' 
    } 
} 
190 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90... 
191 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]} 
192 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: [email protected] 
192 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1 
194 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelCloseOkBody] 
194 [IoReceiver - localhost/127.0.0.1:5672] INFO org.apache.qpid.client.handler.ChannelCloseOkMethodHandler - Received channel-close-ok for channel-id 1 
195 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ConnectionCloseOkBody] 
196 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - Session closed called by client 

ListSender.java wie folgt: -

package org.apache.qpid.example; 

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 

import org.apache.qpid.client.AMQAnyDestination; 
import org.apache.qpid.client.AMQConnection; 

import org.apache.qpid.framing.AMQShortString; 
import org.apache.qpid.jms.ListMessage; 


public class ListSender { 

    public static void main(String[] args) throws Exception 
    { 
     Connection connection = 
      new AMQConnection("amqp://simon:[email protected]/test?brokerlist='tcp://localhost:5672'"); 
                AMQShortString a1 = new AMQShortString(""); 
                AMQShortString a2 = new AMQShortString(""); 
     AMQShortString[] bindvars = new AMQShortString[]{a1,a2}; 
     boolean is_durable = true; 
/*  
     Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
                new AMQShortString("direct"), 
                new AMQShortString("Simon"), 
                true,   
                true,   
                new AMQShortString(""), 
                false,  
                bindvars); 
     */ 

     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     //Destination queue = new AMQAnyDestination("onms3/Simon"); 
     Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }"); 
     //Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D"); 
     //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}"); 
     //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}"); 
     MessageProducer producer = session.createProducer(queue); 

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage(); 
     m.setIntProperty("Id", 987654321); 
     m.setStringProperty("name", "WidgetSimon"); 
     m.setDoubleProperty("price", 0.99); 

     List<String> colors = new ArrayList<String>(); 
     colors.add("red"); 
     colors.add("green"); 
     colors.add("white"); 
     m.add(colors); 

     Map<String,Double> dimensions = new HashMap<String,Double>(); 
     dimensions.put("length",10.2); 
     dimensions.put("width",5.1); 
     dimensions.put("depth",2.0); 
     m.add(dimensions); 
     List<List<Integer>> parts = new ArrayList<List<Integer>>(); 
     parts.add(Arrays.asList(new Integer[] {1,2,5})); 
     parts.add(Arrays.asList(new Integer[] {8,2,5})); 
     m.add(parts); 

     Map<String,Object> specs = new HashMap<String,Object>(); 
     specs.put("colours", colors); 
     specs.put("dimensions", dimensions); 
     specs.put("parts", parts); 
     m.add(specs); 

     producer.send((Message)m); 
     System.out.println("Sent: " + m); 
     connection.close(); 
    } 

} 

Meine Annahme war, dass dies ein Konnektivitätsproblem mit dem AMQP-Server einiger Beschreibung war. Nachdem ich bei der Fehlersuche auf eine Reihe von Problemen gestoßen bin, wenn ich das Problem mit dem externen Krug angegangen bin, wurde aus den Logs deutlich, was das Problem war. Ist das ein Problem mit OpenNMS? Hat jemand dies erfolgreich funktioniert? Irgendwelche Ideen?

Prost

Simon

Antwort

0

Nach Anleitung des OpenNMS Liste entschied ich jetzt die Qpid Broker V6.0.3 anstelle von RabbitMQ und Nachrichten fließen keine Probleme zu installieren.

+0

Entschuldigung, wie ich das Protokoll von OpenNMS org.opennms.features.amqp.alarmnorthbounder anzeigen kann? – sintetico82