Ich schrieb eine asynchrone Boost :: Asio TCP-Anwendung, die einen Pool von Threads verwendet als std::vector<std::thread> mIOServicePool
deklariert. Diese Threads lesen und schreiben asynchron TCP-Daten auf einen Server. Der folgende Code stammt aus dem Ereignis-Handler der Startup-Schaltfläche der GUI.Neustart tcp Boost Asio io_service mit unvollständigen Abschlusshandler
// launch multiple asio service threads to
// handle the protocol instances - effectively
// thread pooling the ioservice
//mpIOService->reset();
for (auto i=0; i<3; i++) {
mIOServicePool.emplace_back(
std::thread([this]() { mpIOService->run(); }));
}
Der Code ist Teil einer Anwendung Qt-basierte GUI mit dem mIOServicePool
als Mitglied der Hauptfenster Klasse GUI gespeichert.
Das funktioniert gut, wenn ich die Anwendung starte und es laufen lasse, aber beim Versuch, die Verbindung zum Back-End-Server neu zu starten, beginnen die Dinge zu schief gehen. Das Problem ist höchstwahrscheinlich auf unvollständige Handler zurückzuführen, von denen ich dachte, dass sie gelöscht worden wären, wenn ich die io_service::work
zurückgesetzt habe, die dem io_service zugeordnet ist (wenn die Taste zum Beenden der GUI gedrückt wird). Das Problem manifestiert sich, wenn ich versuche, die TCP-Kommunikation (zumindest unter Windows) über eine Zugriffsverletzung zu starten, während ich den Stream-Puffer des asio :: socket aus dem Speicher lese. Wie Sie aus der Stack-Trace unten sehen können, behandelt es einen Completion-Handler, der dem Lesebuchsen zugeordnet ist.
app739.exe!boost::asio::basic_streambuf<std::allocator<char> >::commit(unsigned __int64 n) Line 226 C++
app739.exe!boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >::operator()(const boost::system::error_code & ec, unsigned __int64 bytes_transferred, int start) Line 624 C++
app739.exe!boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>::operator()() Line 129 C++
app739.exe!boost::asio::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, ...) Line 70 C++
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > & context) Line 39 C++
app739.exe!boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > * this_handler) Line 685 C++
app739.exe!boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64>,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >(boost::asio::detail::binder2<boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > >,boost::system::error_code,unsigned __int64> & function, boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const &>,boost::_bi::list2<boost::_bi::value<VCDUProtocol *>,boost::arg<1> > > > & context) Line 39 C++
app739.exe!boost::asio::detail::win_iocp_socket_recv_op<boost::asio::mutable_buffers_1,boost::asio::detail::read_until_delim_string_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp,boost::asio::stream_socket_service<boost::asio::ip::tcp> >,std::allocator<char>,boost::_bi::bind_t<void,boost::_mfi::mf1<void,VCDUProtocol,boost::system::error_code const & __ptr64>,boost::_bi::list2<boost::_bi::value<VCDUProtocol * __ptr64>,boost::arg<1> > > > >::do_complete(boost::asio::detail::win_iocp_io_service * owner, boost::asio::detail::win_iocp_operation * base, const boost::system::error_code & result_ec, unsigned __int64 bytes_transferred) Line 97 C++
app739.exe!boost::asio::detail::win_iocp_operation::complete(boost::asio::detail::win_iocp_io_service & owner, const boost::system::error_code & ec, unsigned __int64 bytes_transferred) Line 47 C++
app739.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block, boost::system::error_code & ec) Line 406 C++
app739.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec) Line 164 C++
app739.exe!boost::asio::io_service::run() Line 59 C++
app739.exe!MainWindow::on_pushButtonStart_clicked::__l13::<lambda>() Line 943 C++
Diese answer zeigt an, dass das Problem mit io_service.reset()
tun kann. Der Stack-Trace zum Zeitpunkt der Zugriffsverletzung zeigt, dass der Thread einen Asio-Completion-Handler verarbeitet hat. Ich denke aus dem Lesen anderer Beiträge, dass der Schlüssel zur Lösung dieses Problems io_service.stop()
und io_service.reset()
zu dem boost::asio::io_service
Objekt ist, auch kann es wichtig sein, den Socket vor dem Stoppen der io_service oder Zurücksetzen der Sentinel Arbeit Objekt zurückgesetzt werden.
Der folgende Code zeigt, wie ich versuche, die io_service-Threads zu stoppen. Beim Debuggen des Codes sehe ich, dass alle Threads ihre Joins abschließen, also verstehe ich nicht, warum Completion-Handler outsanding sind.
void
MainWindow::stopSys()
{
// make sure that we have no more work keeping services alive
mpWork.reset();
// check to see if the protocol threads were started
if (mVCDUProtocol) {
// terminate protocol thread by setting the shared mShutdown atomic flag
mVCDUProtocol->shutdown();
// Once each thread sees the shutdown flag, it will cleanly
// terminate so we can call join here to wait for the entire
// pool to finish
std::for_each(mIOServicePool.begin(), mIOServicePool.end(),
[](std::thread& rNext) {
rNext.join();
});
mIOServicePool.clear();
}
}
Mein Code unten ist ziemlich einfach. Es startet eine asynchrone Auflösung - die in einem Lambda-Handler behandelt wird. Von dort ruft es die start_async_ops(endPointIter)
an, um eine asynchrone connect()
durchzuführen, und von diesem Lambda ruft der Code VCDUProtocol::do_read()
auf, der eine boost::asio::async_read_until()
ausführt, um auf die Daten vom Server zu warten.
void
VCDUProtocol::prosimAsyncIOThreadFn()
{
static auto& gLogger = gpLogger->getLoggerRef(
gUseSysLog ? Logger::LogDest::SysLog :
Logger::LogDest::EventLog);
try {
// convert the host-name/port to a usable endpoint
tcp::resolver resolver(*mpIOService);
tcp::resolver::query query(mProtocolConfig.getProsimHostName(),
std::to_string(mProtocolConfig.getProsimPortNum()));
const auto endPointIter = std::find_if(
resolver.resolve(query), tcp::resolver::iterator(),
[](const tcp::endpoint& next) {
return next.protocol() == tcp::v4();
});
if (endPointIter != tcp::resolver::iterator()) {
mpSocket = std::make_unique<tcp::socket>(*mpIOService);
mpSocketTimer = std::make_unique<deadline_timer>(*mpIOService);
start_async_ops(endPointIter);
}
} catch (std::exception& rEx) {
LOG_ERROR(gLogger, gChannel) << boost::format(
"%1%: %2%")
% __FUNCTION__
% rEx.what();
}
}
void
VCDUProtocol::start_async_ops(tcp::resolver::iterator endpoint_iter)
{
// Start the connect actor.
do_connect(endpoint_iter);
// Start the deadline actor. You will note that we're not setting any
// particular deadline here. Instead, the connect and input actors will
// update the deadline prior to each asynchronous operation.
mpSocketTimer->async_wait(boost::bind(
&VCDUProtocol::check_deadline, this, _1));
}
void
VCDUProtocol::do_connect(
tcp::resolver::iterator endpoint_iter)
{
if (endpoint_iter != tcp::resolver::iterator()) {
// Set a deadline for the connect operation to complete.
mpSocketTimer->expires_from_now(boost::posix_time::seconds(5));
boost::asio::async_connect(*mpSocket, endpoint_iter,
[this](boost::system::error_code ec, tcp::resolver::iterator) {
if (!mShutdownFlag && !ec) {
// successfully connected here - cancel the
// connect timer and kick off async write ops
mpSocketTimer->cancel();
// kick off the prosim read operation
do_read();
}
});
} else {
// No more endpoints. Close the socket.
shutdown();
}
}
void
VCDUProtocol::do_read()
{
// Start or continue an asynchronous line reads. This will read at least
// up to a carriage return or line feed
async_read_until(*mpSocket, *mTLS->mSocketStreamBuf, "\r\n",
boost::bind(&VCDUProtocol::handle_read, this, _1));
}
Dies ist die asynchronen Lese-Abschluss-Handler - dies muss abgesagt werden, ich irgendwo gelesen, dass die Steckdose einfach zu schließen unzureichend ist, da die Handler Fertig nicht aufgerufen. Soll ich den Stempel anrufen?
/**
* Asynchronous read callback.
*
* @param ec [in] Boost ASIO library error code.
*/
void
VCDUProtocol::handle_read(const boost::system::error_code& ec)
{
static auto& gLogger = gpLogger->getLoggerRef(
gUseSysLog ? Logger::LogDest::SysLog :
Logger::LogDest::EventLog);
if (!mShutdownFlag) {
if (!ec) {
// Extract the newline-delimited message from the buffer.
std::string line;
std::istream is(mTLS->mSocketStreamBuf.get());
while (std::getline(is, line)) {
// Critical Section
std::lock_guard<std::mutex> lock (gMutexGuard);
// handle partial line reads
if (is.eof()) {
mTLS->mPartialLine = std::move(line);
continue;
} else if (!mTLS->mPartialLine.empty()) {
line = std::move(mTLS->mPartialLine) + line;
}
. . .
// update GUI
mpListener->handlePageUpdate(
mProtocolConfig.getCduID(),
mTLS->mVCDUPage, bRefreshCDU);
}
// not really required
line.clear();
}
// keep reading
do_read();
} else {
LOG_ERROR(gLogger, gChannel) << boost::format(
"CDU_%1%: handle_read - error[%2%]")
% mProtocolConfig.getCduID()
% ec.message();
shutdown();
}
}
}
danke, aber nachdem ich den größten Teil des Tages damit verbracht habe, sieht es so aus, als wäre ich über eine Lösung gestolpert - es macht irgendwie Sinn - im Grunde scheint der Trick anmutig abzubrechen, herunterzufahren und den Socket VOR dem Aufruf von Reset zu schließen das Sentinel-Arbeitsobjekt. Dies führt dazu, dass der Handle_read-Handler mit einem Fehler aufwacht - in diesem Fall - Der I/O-Vorgang wurde entweder aufgrund eines Thread-Exits oder einer Anwendungsanforderung abgebrochen, und dann kann ich die Dinge erneut starten. – johnco3
auch nach dem Aufrufen von Reset auf der Arbeit Sentinel Ich rief Reset auf dem io_service, um es komplett herunterzufahren (nicht sicher, wenn notwendig, wie der Thread-Pool trat kurz zuvor), dann vor dem erneuten Start, ich stellte sicher, dass io_service wurde zurückgesetzt (andernfalls es würde gestoppt bleiben und sofort von run()) beenden und den Sentinel mit dem io_service reinitialisieren. – johnco3