Wir verwenden seit Jahren asio in der Produktion und vor kurzem haben wir einen kritischen Punkt erreicht, wenn unsere Server gerade genug geladen werden, um ein mysteriöses Problem zu bemerken.boost :: asio Argumentation hinter num_implementations für io_service :: strang
In unserer Architektur verwendet jede separate Entität, die unabhängig ausgeführt wird, ein persönliches Objekt strand
. Einige der Entitäten können eine lange Arbeit ausführen (Lesen von einer Datei, Ausführen einer MySQL-Anfrage, usw.). Offensichtlich wird die Arbeit in Handlern ausgeführt, die mit einem Strang umwickelt sind. Alles hört sich gut und schön an und sollte einwandfrei funktionieren, bis wir bemerken, dass unmögliche Dinge, wie Timer, Sekunden später auslaufen, obwohl Threads "auf Arbeit warten" und ohne erkennbaren Grund arbeiten. Es sah so aus, als hätte lange Arbeit in einem Strang Auswirkungen auf andere nicht verwandte Stränge gehabt, nicht auf alle, sondern auf die meisten.
Unzählige Stunden wurden verbracht, um das Problem zu lokalisieren. Die Spur hat zu dem Weg geführt, strand
Objekt wird erstellt: strand_service::construct
(here).
Aus irgendeinem Grund entschieden sich Entwickler für eine begrenzte Anzahl von strand
Implementierungen. Das bedeutet, dass einige Objekte, die keine Beziehung zueinander haben, eine einzige Implementierung gemeinsam nutzen und daher Engpässe aufweisen.
In der eigenständigen (nicht Boost) Asio Bibliothek ähnlicher Ansatz wird verwendet. Aber anstelle von geteilten Implementierungen ist jede Implementierung nun unabhängig, kann jedoch ein mutex
Objekt mit anderen Implementierungen teilen (here).
Worum geht es? Ich habe noch nie von einer Begrenzung der Anzahl der Mutexe im System gehört. Oder irgendwelche Gemeinkosten in Bezug auf ihre Schaffung/Zerstörung. Obwohl das letzte Problem leicht gelöst werden könnte, indem man Mutexe recycelt, anstatt sie zu zerstören.
Ich habe einen einfachen Testfall um zu zeigen, wie dramatisch eine Leistungsverschlechterung ist:
#include <boost/asio.hpp>
#include <atomic>
#include <functional>
#include <iostream>
#include <thread>
std::atomic<bool> running{true};
std::atomic<int> counter{0};
struct Work
{
Work(boost::asio::io_service & io_service)
: _strand(io_service)
{ }
static void start_the_work(boost::asio::io_service & io_service)
{
std::shared_ptr<Work> _this(new Work(io_service));
_this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
}
static void do_the_work(std::shared_ptr<Work> _this)
{
counter.fetch_add(1, std::memory_order_relaxed);
if (running.load(std::memory_order_relaxed)) {
start_the_work(_this->_strand.get_io_service());
}
}
boost::asio::strand _strand;
};
struct BlockingWork
{
BlockingWork(boost::asio::io_service & io_service)
: _strand(io_service)
{ }
static void start_the_work(boost::asio::io_service & io_service)
{
std::shared_ptr<BlockingWork> _this(new BlockingWork(io_service));
_this->_strand.get_io_service().post(_this->_strand.wrap(std::bind(do_the_work, _this)));
}
static void do_the_work(std::shared_ptr<BlockingWork> _this)
{
sleep(5);
}
boost::asio::strand _strand;
};
int main(int argc, char ** argv)
{
boost::asio::io_service io_service;
std::unique_ptr<boost::asio::io_service::work> work{new boost::asio::io_service::work(io_service)};
for (std::size_t i = 0; i < 8; ++i) {
Work::start_the_work(io_service);
}
std::vector<std::thread> workers;
for (std::size_t i = 0; i < 8; ++i) {
workers.push_back(std::thread([&io_service] {
io_service.run();
}));
}
if (argc > 1) {
std::cout << "Spawning a blocking work" << std::endl;
workers.push_back(std::thread([&io_service] {
io_service.run();
}));
BlockingWork::start_the_work(io_service);
}
sleep(5);
running = false;
work.reset();
for (auto && worker : workers) {
worker.join();
}
std::cout << "Work performed:" << counter.load() << std::endl;
return 0;
}
Bauen diesen Befehl:
g++ -o asio_strand_test_case -pthread -I/usr/include -std=c++11 asio_strand_test_case.cpp -lboost_system
Testlauf in üblichen Weise:
time ./asio_strand_test_case
Work performed:6905372
real 0m5.027s
user 0m24.688s
sys 0m12.796s
Testlauf mit einer langen Blockierungsarbeit:
time ./asio_strand_test_case 1
Spawning a blocking work
Work performed:770
real 0m5.031s
user 0m0.044s
sys 0m0.004s
Der Unterschied ist dramatisch. Was passiert, ist, dass jede neue nicht blockierende Arbeit ein neues Objekt strand
erstellt, bis es die gleiche Implementierung mit strand
der Blockierungsarbeit teilt. Wenn das passiert, ist es eine Sackgasse, bis die Arbeit beendet ist.
bearbeiten: Reduzierte parallele Arbeiten bis auf die Anzahl der Arbeitsfäden (1000
-8
) und aktualisierte Testlauf ausgegeben. Ist dies der Fall, weil, wenn beide Zahlen in der Nähe sind, das Problem besser sichtbar ist.
* "Meine Vermutung für solch eine Sache wäre, das OS nicht zu überlasten, indem man viele, viele und viele Mutexe erstellt. Das wäre schlecht." * Warum? Welcher Overhead gibt es abgesehen von kleinen konstanten (pro Mutex) Betragsspeicher? –
@yurikilochek Sie sind Mutexe. Per Definition sind sie nutzlos, wenn sie nicht zum Synchronisieren verwendet werden. Dadurch werden große Sammlungen von Synchronisationsgrundelementen gleichzeitig abgewartet. ':: WaitForMultipleObjectsEx' stört vielleicht nicht, aber das ist ein Kontextwechsel, es sind nicht nur ein paar Bytes Speicher. Unter Linux gibt es keinen solchen Aufruf AFAIK. – sehe
@Arunmu Unabhängig von der Anzahl der Implementierungen wird das Problem bestehen bleiben, da es im Design ist. Das Erhöhen der Anzahl kann einige Zeit gewinnen, aber nur in gewissem Maße. In Echtzeit wird dies nie funktionieren. Probieren Sie mein Beispiel mit 'work objects' gleich der Anzahl der Threads, also' 8' statt '1000'. In diesem Fall helfen '1024' Implementierungen kaum (' Work performed: 8331'). – GreenScape