2017-04-05 8 views
0

Ich schrieb eine asynchrone Boost :: Asio TCP-Anwendung, die einen Pool von Threads verwendet als std::vector<std::thread> mIOServicePool deklariert. Diese Threads lesen und schreiben asynchron TCP-Daten auf einen Server. Der folgende Code stammt aus dem Ereignis-Handler der Startup-Schaltfläche der GUI.Neustart tcp Boost Asio io_service mit unvollständigen Abschlusshandler

// launch multiple asio service threads to 
// handle the protocol instances - effectively 
// thread pooling the ioservice 
//mpIOService->reset(); 
for (auto i=0; i<3; i++) { 
mIOServicePool.emplace_back(
    std::thread([this]() { mpIOService->run(); })); 
} 

Der Code ist Teil einer Anwendung Qt-basierte GUI mit dem mIOServicePool als Mitglied der Hauptfenster Klasse GUI gespeichert.

Das funktioniert gut, wenn ich die Anwendung starte und es laufen lasse, aber beim Versuch, die Verbindung zum Back-End-Server neu zu starten, beginnen die Dinge zu schief gehen. Das Problem ist höchstwahrscheinlich auf unvollständige Handler zurückzuführen, von denen ich dachte, dass sie gelöscht worden wären, wenn ich die io_service::work zurückgesetzt habe, die dem io_service zugeordnet ist (wenn die Taste zum Beenden der GUI gedrückt wird). Das Problem manifestiert sich, wenn ich versuche, die TCP-Kommunikation (zumindest unter Windows) über eine Zugriffsverletzung zu starten, während ich den Stream-Puffer des asio :: socket aus dem Speicher lese. Wie Sie aus der Stack-Trace unten sehen können, behandelt es einen Completion-Handler, der dem Lesebuchsen zugeordnet ist.

app739.exe!boost::asio::basic_streambuf<std::allocator<char> >::commit(unsigned __int64 n) Line 226 C++ 
app739.exe!boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >::operator()(const boost::system::error_code & ec, unsigned __int64 bytes_transferred, int start) Line 624 C++ 
app739.exe!boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>::operator()() Line 129 C++ 
app739.exe!boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, ...) Line 70 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > * this_handler) Line 685 C++ 
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > & context) Line 39 C++ 
app739.exe!boost::asio::detail::win_iocp_socket_recv_op<boost::asio::mutable_buffers_1,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 97 C++ 
app739.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406 C++ 
app739.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164 C++ 
app739.exe!boost::asio::io_service::run() Line 59 C++ 
app739.exe!MainWindow::on_pushButtonStart_clicked::__l13::<lambda>() Line 943 C++ 

Diese answer zeigt an, dass das Problem mit io_service.reset() tun kann. Der Stack-Trace zum Zeitpunkt der Zugriffsverletzung zeigt, dass der Thread einen Asio-Completion-Handler verarbeitet hat. Ich denke aus dem Lesen anderer Beiträge, dass der Schlüssel zur Lösung dieses Problems io_service.stop() und io_service.reset() zu dem boost::asio::io_service Objekt ist, auch kann es wichtig sein, den Socket vor dem Stoppen der io_service oder Zurücksetzen der Sentinel Arbeit Objekt zurückgesetzt werden.

Der folgende Code zeigt, wie ich versuche, die io_service-Threads zu stoppen. Beim Debuggen des Codes sehe ich, dass alle Threads ihre Joins abschließen, also verstehe ich nicht, warum Completion-Handler outsanding sind.

void 
MainWindow::stopSys() 
{ 
    // make sure that we have no more work keeping services alive 
    mpWork.reset(); 
    // check to see if the protocol threads were started 
    if (mVCDUProtocol) { 
     // terminate protocol thread by setting the shared mShutdown atomic flag 
     mVCDUProtocol->shutdown(); 
     // Once each thread sees the shutdown flag, it will cleanly 
     // terminate so we can call join here to wait for the entire 
     // pool to finish 
     std::for_each(mIOServicePool.begin(), mIOServicePool.end(), 
      [](std::thread& rNext) { 
       rNext.join(); 
      }); 
     mIOServicePool.clear(); 
    } 
} 

Mein Code unten ist ziemlich einfach. Es startet eine asynchrone Auflösung - die in einem Lambda-Handler behandelt wird. Von dort ruft es die start_async_ops(endPointIter) an, um eine asynchrone connect() durchzuführen, und von diesem Lambda ruft der Code VCDUProtocol::do_read() auf, der eine boost::asio::async_read_until() ausführt, um auf die Daten vom Server zu warten.

void 
VCDUProtocol::prosimAsyncIOThreadFn() 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    try { 
     // convert the host-name/port to a usable endpoint 
     tcp::resolver resolver(*mpIOService); 
     tcp::resolver::query query(mProtocolConfig.getProsimHostName(), 
      std::to_string(mProtocolConfig.getProsimPortNum())); 
     const auto endPointIter = std::find_if(
      resolver.resolve(query), tcp::resolver::iterator(), 
      [](const tcp::endpoint& next) { 
       return next.protocol() == tcp::v4(); 
      }); 
     if (endPointIter != tcp::resolver::iterator()) { 
      mpSocket = std::make_unique<tcp::socket>(*mpIOService); 
      mpSocketTimer = std::make_unique<deadline_timer>(*mpIOService); 
      start_async_ops(endPointIter); 
     } 
    } catch (std::exception& rEx) { 
     LOG_ERROR(gLogger, gChannel) << boost::format(
      "%1%: %2%") 
      % __FUNCTION__ 
      % rEx.what(); 
    } 
} 

void 
VCDUProtocol::start_async_ops(tcp::resolver::iterator endpoint_iter) 
{ 
    // Start the connect actor. 
    do_connect(endpoint_iter); 

    // Start the deadline actor. You will note that we're not setting any 
    // particular deadline here. Instead, the connect and input actors will 
    // update the deadline prior to each asynchronous operation. 
    mpSocketTimer->async_wait(boost::bind(
     &VCDUProtocol::check_deadline, this, _1)); 
} 

void 
VCDUProtocol::do_connect(
    tcp::resolver::iterator endpoint_iter) 
{ 
    if (endpoint_iter != tcp::resolver::iterator()) { 
     // Set a deadline for the connect operation to complete. 
     mpSocketTimer->expires_from_now(boost::posix_time::seconds(5)); 
     boost::asio::async_connect(*mpSocket, endpoint_iter, 
      [this](boost::system::error_code ec, tcp::resolver::iterator) { 
       if (!mShutdownFlag && !ec) { 
        // successfully connected here - cancel the 
        // connect timer and kick off async write ops 
        mpSocketTimer->cancel(); 
        // kick off the prosim read operation 
        do_read(); 
       } 
      }); 
    } else { 
     // No more endpoints. Close the socket. 
     shutdown(); 
    } 
} 

void 
VCDUProtocol::do_read() 
{ 
    // Start or continue an asynchronous line reads. This will read at least 
    // up to a carriage return or line feed 
    async_read_until(*mpSocket, *mTLS->mSocketStreamBuf, "\r\n", 
     boost::bind(&VCDUProtocol::handle_read, this, _1)); 
} 

Dies ist die asynchronen Lese-Abschluss-Handler - dies muss abgesagt werden, ich irgendwo gelesen, dass die Steckdose einfach zu schließen unzureichend ist, da die Handler Fertig nicht aufgerufen. Soll ich den Stempel anrufen?

/** 
* Asynchronous read callback. 
* 
* @param ec  [in] Boost ASIO library error code. 
*/ 
void 
VCDUProtocol::handle_read(const boost::system::error_code& ec) 
{ 
    static auto& gLogger = gpLogger->getLoggerRef(
     gUseSysLog ? Logger::LogDest::SysLog : 
     Logger::LogDest::EventLog); 
    if (!mShutdownFlag) { 
     if (!ec) { 
      // Extract the newline-delimited message from the buffer. 
      std::string line; 
      std::istream is(mTLS->mSocketStreamBuf.get()); 
      while (std::getline(is, line)) { 
       // Critical Section 
       std::lock_guard<std::mutex> lock (gMutexGuard); 
       // handle partial line reads 
       if (is.eof()) { 
        mTLS->mPartialLine = std::move(line); 
        continue; 
       } else if (!mTLS->mPartialLine.empty()) { 
        line = std::move(mTLS->mPartialLine) + line; 
       } 
       . . . 
       // update GUI 
       mpListener->handlePageUpdate(
        mProtocolConfig.getCduID(), 
        mTLS->mVCDUPage, bRefreshCDU); 
       } 
       // not really required 
       line.clear(); 
      } 
      // keep reading 
      do_read(); 
     } else { 
      LOG_ERROR(gLogger, gChannel) << boost::format(
       "CDU_%1%: handle_read - error[%2%]") 
       % mProtocolConfig.getCduID() 
       % ec.message(); 
      shutdown(); 
     } 
    } 
} 

Antwort

1

Neustart tcp-Boost Asio io_service mit unvollendeten Abschluss Handler

AFAICT, das unmöglich ist.

Die Dokumentation legt klar fest, dass reset() aufgerufen werden muss, bevor run() erneut aufgerufen werden kann.

Ich denke, die einzige praktikable Alternative ist, Ihre eigene Ereignisschleife basierend auf z.B. poll_one() und verhindern damit, dass der Dienst überhaupt gestoppt werden muss.

Dies ist der asynchrone Lesevervollständigungs-Handler - DAS MUSS ABGESETZT WERDEN, ich habe irgendwo gelesen, dass das Schließen des Sockets nicht ausreicht, da die Vervollständigungshandler nicht aufgerufen werden.

Das stimmt nicht. Durch das Abbrechen eines Sockets werden die Vorgänge im Flug abgebrochen, und sie verursachen , dass die Beendigungshandler mit ec == operation_aborted aufgerufen werden. Das Schließen des Sockets führt wahrscheinlich zu unterschiedlichen Fehlercodes wie bad_socket.

+0

danke, aber nachdem ich den größten Teil des Tages damit verbracht habe, sieht es so aus, als wäre ich über eine Lösung gestolpert - es macht irgendwie Sinn - im Grunde scheint der Trick anmutig abzubrechen, herunterzufahren und den Socket VOR dem Aufruf von Reset zu schließen das Sentinel-Arbeitsobjekt. Dies führt dazu, dass der Handle_read-Handler mit einem Fehler aufwacht - in diesem Fall - Der I/O-Vorgang wurde entweder aufgrund eines Thread-Exits oder einer Anwendungsanforderung abgebrochen, und dann kann ich die Dinge erneut starten. – johnco3

+0

auch nach dem Aufrufen von Reset auf der Arbeit Sentinel Ich rief Reset auf dem io_service, um es komplett herunterzufahren (nicht sicher, wenn notwendig, wie der Thread-Pool trat kurz zuvor), dann vor dem erneuten Start, ich stellte sicher, dass io_service wurde zurückgesetzt (andernfalls es würde gestoppt bleiben und sofort von run()) beenden und den Sentinel mit dem io_service reinitialisieren. – johnco3

Verwandte Themen