2016-03-26 8 views
0

Ich habe ein Problem mit dem Senden mehrerer Back-to-Back separate UDP-Puffer mit Boost Asio. Ich habe einen 1-Sekunden-Asio-Timer, der einen Callback auslöst, der eine 2 separate UDP-Datagrammstrukturen über UDP überträgt. Jede dieser Nachrichtenstrukturen wird über std :: unique_ptr zugewiesen, so dass sie nicht zu dem Zeitpunkt, zu dem der asynchrone Aufruf von CADaemon :: handle_send aufgerufen wird, den Gültigkeitsbereich verlassen sollten.mehrere Rücken an Rücken boost :: asio async_send_to Anrufe Ursache Pufferüberlauf

void 
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs) 
{ 
    mpStatusTimer->expires_from_now(rHeartBeatMs); 
    mpStatusTimer->async_wait(boost::bind(
     &CADaemon::heartBeatTimer, 
     this, rHeartBeatMs)); 
    if (mpALBFSocket && mpALBFEndpoint) { 
     mpALBFSocket->async_send_to(
      buffer(mpStatusMessage.get(), 
       sizeof(MemberSystemStatusMessage)), 
      *mpALBFEndpoint, 
      boost::bind(&CADaemon::handle_send, this, 
       boost::asio::placeholders::error, 
       boost::asio::placeholders::bytes_transferred)); 


     // must insert delay to prevent buffer overwrites 
     std::this_thread::sleep_for(std::chrono::milliseconds(10); 

     // heartbeat messages are also sent to this socket/endpoint 
     mpALBFSocket->async_send_to(
      buffer(mpHeartbeatMessage.get(), 
       sizeof(CAServiceHeartbeatMessage)), 
      *mpALBFEndpoint, 
      boost::bind(&CADaemon::handle_send, this, 
       boost::asio::placeholders::error, 
       boost::asio::placeholders::bytes_transferred)); 
    } 
} 

Die empfangende Anwendung funktioniert, wenn ich eine kleine Verzögerung setzen zwischen der ersten Nachricht und die zweite Nachricht, aber senden, wenn ich sie schicken, wie sie sind, erscheint es, dass der zweite Puffer die erste von der Zeit überschreibt es kommt in der empfangenden Anwendung an.

Was mache ich falsch?

Ich habe auch versucht, mehrere Puffer mit dem Code unten zu senden, aber das verhält sich schlechter, da es beide Datagramme als ein einzelnes langes Datagramm zusammenführt.

void 
CADaemon::heartBeatTimer(
    const milliseconds& rHeartBeatMs) 
{ 
    mpStatusTimer->expires_from_now(rHeartBeatMs); 
    mpStatusTimer->async_wait(boost::bind(
     &CADaemon::heartBeatTimer, 
     this, rHeartBeatMs)); 
    if (mpALBFSocket && mpALBFEndpoint) { 
     std::vector<boost::asio::const_buffer> transmitBuffers; 
     transmitBuffers.push_back(buffer(
      mpStatusMessage.get(), 
      sizeof(MemberSystemStatusMessage))); 
     //transmitBuffers.push_back(buffer(
     // mpHeartbeatMessage.get(), 
     // sizeof(CAServiceHeartbeatMessage))); 
     mpALBFSocket->async_send_to(
      transmitBuffers, *mpALBFEndpoint, 
      boost::bind(&CADaemon::handle_send, this, 
       boost::asio::placeholders::error, 
       boost::asio::placeholders::bytes_transferred)); 
    } 
} 

Hier sind die Mitglieder der Klasse, die an der ASIO aus der zugehörigen Headerdatei beteiligt sind

// this message is transmitted @1HZ 
std::unique_ptr<MemberSystemStatusMessage> mpStatusMessage; 
// this message is transmitted @1HZ 
std::unique_ptr<CAServiceHeartbeatMessage> mpHeartbeatMessage; 
// this message is received @1HZ 
std::unique_ptr<WOperationalSupportMessage> mpOpSupportMessage; 
// this message is received @1HZ when valid 
std::unique_ptr<MaintenanceOTPMessage> mpOTPMessage; 

std::shared_ptr<boost::asio::io_service> mpIOService; 
std::unique_ptr<boost::asio::ip::udp::socket> mpALBFSocket; 
std::unique_ptr<boost::asio::ip::udp::endpoint> mpALBFEndpoint; 
std::unique_ptr<boost::asio::ip::udp::socket> mpServerSocket; 
std::unique_ptr<boost::asio::ip::udp::endpoint> mpServerEndpoint; 
std::unique_ptr<boost::asio::steady_timer> mpStatusTimer; 
std::unique_ptr<uint8_t[]> mpReceiveBuffer; 

Diese Der Handler

Rückruf ist
void 
CADaemon::handle_send(
    const boost::system::error_code& error, 
    std::size_t bytes_transferred) 
{ 
    static auto& gEvtLog = gpLogger->getLoggerRef(
     Logger::LogDest::EventLog); 
    if (!error || (error == boost::asio::error::message_size)) { 
     // Critical Section - exclusive write 
     boost::unique_lock<boost::shared_mutex> uniqueLock(gRWMutexGuard); 
     LOG_EVT_INFO(gEvtLog) << *mpStatusMessage; 
     LOG_EVT_INFO(gEvtLog) << *mpHeartbeatMessage; 
     LOG_EVT_INFO(gEvtLog) << "Sent " << bytes_transferred << " bytes"; 
     mpStatusMessage->incrementSequenceCounter(); 
    } else { 
     LOG_EVT_ERROR(gEvtLog) << "handle_send: asio error code[" 
      << error.value() << "]"; 
    } 
} 

EDIT: added bemerkt DIE DEN EMPFANG Java-Anwendung Code mit dem BUFFER KORRUPTION

Der folgende Code den Code in der empfangenden Java-Anwendung zeigt, dass die Größe des empfangenen Datagramms niemals beschädigt ist, nur der Inhalt, die Größe scheint immer die des längeren Datagramms zu sein. Hoffe, das ist nützlich, um das Problem aufzuspüren.

@Override 
    protected Task<Void> createTask() { 
     return new Task<Void>() { 
      @Override 
      protected Void call() throws Exception { 
       updateMessage("Running..."); 
       try { 
        DatagramSocket serverSocket = new DatagramSocket(mPortNum); 
        // allocate space for received datagrams 
        byte[] bytes = new byte[1024]; 
        DatagramPacket packet = new DatagramPacket(bytes, bytes.length);      
        while (!isCancelled()) {      
         serverSocket.receive(packet); 
         int bytesReceived = packet.getLength(); 
         MemberSystemStatusMessage statusMessage = 
          new MemberSystemStatusMessage(); 
         int statusMessageSize = statusMessage.size(); 
         CAServiceHeartbeatMessage heartbeatMessage = 
          new CAServiceHeartbeatMessage(); 
         int heartbeatMessageSize = heartbeatMessage.size(); 
         if (Platform.isFxApplicationThread()) { 
          if (bytesReceived == statusMessage.size()) { 
           statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0); 
           setMemberSystemMessage(statusMessage); 
          } else if (bytesReceived == heartbeatMessage.size()){ 
           heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0); 
           setHeartbeatMessage(heartbeatMessage); 
          } else { 
           System.out.println("unexpected datagram"); 
          } 
         } else { // update later in FxApplicationThread 
          if (bytesReceived == statusMessage.size()) { 
           statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0); 
           Platform.runLater(() -> setMemberSystemMessage(statusMessage)); 
          } else if (bytesReceived == heartbeatMessage.size()){ 
           heartbeatMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0); 
           Platform.runLater(() -> setHeartbeatMessage(heartbeatMessage)); 
          } else { 
           System.out.println("unexpected datagram"); 
          } 
         } 
        } 
       } catch (Exception ex) { 
        System.out.println(ex.getMessage()); 
       } 
       updateMessage("Cancelled"); 
       return null; 
      } 
     }; 
    } 
} 

Antwort

2

Der Code sieht gut aus, solange die Größe der Puffer korrekt ist und der zugrunde liegende Speicher für die Puffer gültig bleibt, bis der Handler aufgerufen wird. Man kann mehrere nicht-zusammengesetzte asynchrone Operationen wie async_send_to() für ein bestimmtes E/A-Objekt sicher initiieren. Es ist jedoch nicht spezifiziert, in welcher Reihenfolge diese Operationen ausgeführt werden.

Die Empfängeranwendung hat ein einzelnes gemeinsames Byte-Array, in das Datagramme gelesen werden. Wenn zwei Datagramme empfangen werden und zwei Lesevorgänge stattfinden, enthält der Puffer den Inhalt des zuletzt gelesenen Datagramms.Auf der Grundlage des dargestellten Codes kann dies aufgrund der Runnable s, die zu einem nicht spezifizierten Zeitpunkt in der Zukunft aufgerufen wird, eine Wettlaufsituation erzeugen. Betrachten Sie beispielsweise das Szenario, in dem zwei Datagramme gesendet werden, wobei das erste eine Systemnachricht und das zweite eine Heartbeat-Nachricht enthält. In dem folgenden Code:

byte[] bytes = new byte[1024]; 
DatagramPacket packet = new DatagramPacket(bytes, bytes.length); 
while (...) 
{ 
    serverSocket.receive(packet); 
    int bytesReceived = packet.getLength(); 
    MemberSystemStatusMessage statusMessage = ...; 
    CAServiceHeartbeatMessage heartbeatMessage = ...; 
    if (bytesReceived == statusMessage.size()) 
    { 
    statusMessage.setByteBuffer(ByteBuffer.wrap(bytes), 0); 
    Platform.runLater(() -> setMemberSystemMessage(statusMessage)); 
    } 
    ... 
} 

nach der ersten Iteration der while-Schleife enthält bytes eine Statusmeldung und das statusMessage Objekt bezieht sich auf die bytes Puffer. Ein Runnable wurde zu einem unbestimmten Zeitpunkt in der Zukunft geplant. Beim Lesen des zweiten Datagramms enthält der Puffer bytes eine Heartbeat-Nachricht. Das Runnable wird jetzt ausgeführt und übergibt das Objekt statusMessage an setMemberSystemMessage(); Der zugrunde liegende Puffer enthält jedoch eine Heartbeat-Nachricht. Um dieses Problem zu beheben, sollten tief Kopieren den Byte-Array, wenn verzögerte Ausführung erfolgen muss:

if (bytesReceived == statusMessage.size()) 
{ 
    byte[] bytes_copy = Arrays.copyOf(bytes, bytesReceived); 
    statusMessage.setByteBuffer(ByteBuffer.wrap(bytes_copy), 0); 
    Platform.runLater(() -> setMemberSystemMessage(statusMessage)); 
} 

Alternativ könnte man einen neuen Puffer verwendet für jede Operation lesen.

Es kann auch Probleme mit den Erwartungen des zugrunde liegenden Protokolls sein. UDP wird als ein unzuverlässiges Protokoll bezeichnet, da es dem Sender keine Benachrichtigungen über die Zustellung des Datagramms bereitstellt. Jede Operation async_send_to() führt dazu, dass bis zu ein Datagramm gesendet wird. Der Status des Beendigungshandlers zeigt an, ob die Daten geschrieben wurden, und impliziert keinen Status, wenn das Datagramm empfangen wurde. Dies gilt auch dann, wenn mehrere Puffer über scatter-gather I/O bereitgestellt werden. Als solches wird das in der Frage beschriebene Szenario, bei dem zwei async_send_to() Operationen initiiert werden, aber der Empfänger nur ein einzelnes Datagramm empfängt, vom Protokoll zugelassen. Das Anwendungsprotokoll sollte dieses Verhalten berücksichtigen. Anstatt beispielsweise einen Fehler zu melden, nachdem ein einzelner Heartbeat-Termin versäumt wurde, kann der Empfänger einen Fehler melden, sobald eine aufeinanderfolgende Anzahl verpasster Heartbeat-Deadlines einen Schwellenwert überschritten hat. Das Hinzufügen einer kleinen Verzögerung zwischen Schreibvorgängen gibt keine Garantien hinsichtlich des Verhaltens des Protokolls.

+0

Wenn Boost.Asio wird der TR2-Vorschlag entsprechen, dann scheint die Reihenfolge angegeben werden. Ich stimme zu, dass das Protokoll Pakete fallen lassen darf. In diesem Fall ist jedoch auch ein Fehler in der empfangenden Anwendung möglich/wahrscheinlich. – janm

+0

@TannerSansbury danke für das Fortbestehen und das Finden, dass das Problem tatsächlich der Kunde war, gut gemacht! – johnco3

+0

@TannerSansbury Ich frage mich, ob Sie eine Follow-up-Frage http://stackoverflow.com/questions/36626870/java-nio-datagramchannel-corruption - Ich änderte meine Java-Code NIO verwenden und ich habe nie wirklich verstanden, warum ich hatte um die Bytes schnell zu kopieren, bevor die nächste Nachricht die vorhandenen Empfangspufferbytes beschädigte. Ich dachte, dass bei einem ausreichend großen Empfangspuffer der nächste Schreibvorgang in den Empfangspuffer den vorherigen Puffer nicht beschädigen würde – johnco3

1

Update:

Wie Tanner Sansbury erwähnt, diese Antwort ist wahrscheinlich falsch. Ich lasse es hier für Leute, die nach einer Antwort auf die Frage suchen, ob mehrere gleichzeitige Anrufe gleichzeitig gültig sind. Die Antwort scheint "Ja" zu sein.

Original:

Das Problem mit diesem Code ist, dass der zweite Aufruf von async_send_to() nicht für den ersten abgeschlossen ist zu warten. Sie sollten den zweiten Aufruf von async_send_to() vom ersten Beendigungshandler ausführen, vorausgesetzt, dass keine Fehler auftreten.

Das Zusammenführen der zwei Puffer in einem Datagramm in Ihrem zweiten Beispiel ist das erwartete Verhalten.

+1

Man kann mehrere 'async_send_to()' -Operationen auf einem einzelnen Socket sicher starten, während an demselben Socket ([demo] (http://coliru.stacked-crooked.com/a) Operationen async_send_to() ausstehen)/f29db29ad346031e)). –

+0

@TannerSansbury schöne Demo, aber ich dachte, dass es nicht gültig war zu tun, was ich tat (die man auch in der Demo zu tun), die mehrere Async senden Sie schreibt nacheinander ohne darauf zu warten jeder ersten empfangen werden. Ich habe etwas ähnliches wie Sie getan - es schien zu funktionieren, aber die Puffer waren beschädigt, wenn ich sie untersuchte, ich werde Ihre Demo auf Windows versuchen, um zu sehen, ob es gleich funktioniert (ich muss den Pufferinhalt ändern, wie sie nicht sind überprüft in Ihrer Demo). Meine Ergebnisse waren über Visual Studio 2015 mit Boost v1.60 – johnco3

+0

@TannerSansbury Interessant. Die Demo funktioniert möglicherweise auf Ihrer Plattform, belegt jedoch nicht das plattformübergreifende API-Verhalten. Ich habe gerade ein wenig Zeit damit verbracht, die Art des API-Versprechens zu ermitteln. Meine Antwort könnte falsch sein. Die einzige Erklärung, die ich bisher gefunden ist hier http://comments.gmane.org/gmane.comp.lib.boost.asio.user/5403, die besagt, „dass Denken Sie daran, wenn Sie async_send_to() verwenden Sie es nicht wieder aufrufen können auf dem gleichen Socket, bis die erste async_send_to() als abgeschlossen gemeldet wurde. " Dies entspricht meiner Verwendung von ASIO. Ich werde bei der Umsetzung einen kurzen Blick ... – janm