2017-04-13 11 views
1

kombinieren Ich möchte Boost.Asio Strang und priorisierte Wrapper in der gleichen Zeit verwenden.Wie Strang Wrapper und Priorität Wrapper Boost Asio

Bevor ich meinen Code zu schreiben, habe ich die folgenden Informationen lesen:

Boost asio priority and strand

boost::asio and Active Object

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

Why do I need strand per connection when using boost::asio?

Ich mag würde Wrapper-Ansatz verwenden , weil ich verschiedene Async-APIs wie async_read verwenden möchte, a sync_write und async_connect. Nach der http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531 scheint es, dass die Priorität Wrapper und Strang Wrapper kombiniert werden können.

Also schrieb ich den Code basierend auf dem folgenden Beispiel:

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

Hier ist mein Code:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     wrapped_handler(handler_priority_queue& q, int p, Handler h) 
      : queue_(q), priority_(p), handler_(std::move(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_PRIORITY 
       ) 
#endif 
#if ENABLE_STRAND 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

Die Wrapper durch die folgenden Makros aktiviert sind:

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

Wenn beide Makros aktiviert sind, habe ich folgende Ergebnisse erhalten t:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
execute(3) 
execute(2) 
execute(1) 
execute(0) 
before run_one() 
before run_one() 
before run_one() 

Ich erwarte, dass ich

[called] priority,thread_id 

Ausgabe als

[called] 1,140512649541376 

bekam aber ich habe es nicht.

Es scheint, dass in der Funktion execute(), function_() aufgerufen wird, aber wrapped_handler::operator() nicht aufgerufen wird. (Die Funktion execute() aus pq.execute_all(); in meinem Code genannt.)

void execute() { 
    std::cout << "execute(" << priority_ << ")" << std::endl; 
    function_(); // It is called. 
} 

template <typename Handler> 
class wrapped_handler { 
public: 

    template <typename... Args> 
    void operator()(Args&&... args) { // It is NOT called 
     std::cout << "operator() " << std::endl; 
     handler_(std::forward<Args>(args)...); 
    } 

ich die Sequenz zurückgeführt, nachdem function_() genannt wird.

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L94

dann in der Funktion bool strand_service::do_dispatch(implementation_type& impl, operation* op) der Betrieb op nicht aufgerufen wird, sondern in die Warteschlange int der folgenden Zeile geschoben::

Die folgenden Funktionen werden aufgerufen

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

Ich bin mir nicht sicher, warum die function_() an strand_service versandt wird.Ich denke, dass Strang Wrapper bereits am folgenden Punkt in meinem Code ausgepackt worden:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

Wenn ich nur Priorität Wrapper aktiviert ist, ich folgendes Ergebnis bekam. Es scheint so zu funktionieren, wie ich es erwartet habe.

Wenn ich nur Strang Wrapper aktiviert, habe ich das folgende Ergebnis. Es scheint so zu funktionieren, wie ich es erwartet habe.

before run_one() 
[called] 0,140127385941760 
before poll_one() 
[called] 1,140127385941760 
[called] 2,140127385941760 
[called] 3,140127385941760 
[called] 4,140127385941760 
before execute_all() 
before run_one() 
before run_one() 
before run_one() 

Irgendwelche Ideen?

Antwort

1

Ich löste das Problem.

Ich bin nicht sicher, warum die Funktion_() an strang_service versandt wird. Ich denke, dass Strang Wrapper bereits am folgenden Punkt in meinem Code ausgepackt worden:

template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, std::forward<Function>(f)); 
} 

Der Parameter f der ursprüngliche Handler ist. Das bedeutet, dass die Warteschlange mit der Priorität "Queue" umwickelt und mit dem Strang umwickelt wird Die Strangverpackung ist draußen. Wenn also f aufgerufen wird, wird es an strang_service gesendet. Dieser Prozess findet im selben strang_service statt, sodass der Handler nicht aufgerufen wird.

Um dieses Problem zu lösen, fügen h->handler_ in der priorisierten Warteschlange anstelle von f wie folgt:

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

handler_ Mitglied Variable der Klassenvorlage ist wrapped_handler. Es enthält den Handler, der nicht eingepackt ist. Hier

ist der vollständige Code:

#include <iostream> 
#include <functional> 
#include <queue> 
#include <vector> 
#include <thread> 
#include <mutex> 

#include <boost/asio.hpp> 
#include <boost/optional.hpp> 

#define ENABLE_STRAND 1 
#define ENABLE_PRIORITY 1 

class handler_priority_queue { 
public: 
    template <typename Handler> 
    void add(int priority, Handler&& handler) { 
     std::cout << "add(" << priority << ")" << std::endl; 
     std::lock_guard<std::mutex> g(mtx_); 
     handlers_.emplace(priority, std::forward<Handler>(handler)); 
    } 

    void execute_all() { 
     auto top = [&]() -> boost::optional<queued_handler> { 
      std::lock_guard<std::mutex> g(mtx_); 
      if (handlers_.empty()) return boost::none; 
      boost::optional<queued_handler> opt = handlers_.top(); 
      handlers_.pop(); 
      return opt; 
     }; 
     while (auto h_opt = top()) { 
      h_opt.get().execute(); 
     } 
    } 

    template <typename Handler> 
    class wrapped_handler { 
    public: 
     template <typename HandlerArg> 
     wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h) 
      : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h)) 
     { 
     } 

     template <typename... Args> 
     void operator()(Args&&... args) { 
      std::cout << "operator() " << std::endl; 
      handler_(std::forward<Args>(args)...); 
     } 

     //private: 
     handler_priority_queue& queue_; 
     int priority_; 
     Handler handler_; 
    }; 

    template <typename Handler> 
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) { 
     return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler)); 
    } 

private: 
    class queued_handler { 
    public: 
     template <typename Handler> 
     queued_handler(int p, Handler&& handler) 
      : priority_(p), function_(std::forward<Handler>(handler)) 
     { 
      std::cout << "queued_handler()" << std::endl; 
     } 

     void execute() { 
      std::cout << "execute(" << priority_ << ")" << std::endl; 
      function_(); 
     } 

     friend bool operator<(
      queued_handler const& lhs, 
      queued_handler const & rhs) { 
      return lhs.priority_ < rhs.priority_; 
     } 

    private: 
     int priority_; 
     std::function<void()> function_; 
    }; 

    std::priority_queue<queued_handler> handlers_; 
    std::mutex mtx_; 
}; 

// Custom invocation hook for wrapped handlers. 
template <typename Function, typename Handler> 
void asio_handler_invoke(Function&& f, 
         handler_priority_queue::wrapped_handler<Handler>* h) { 
    std::cout << "asio_handler_invoke " << std::endl; 
    h->queue_.add(h->priority_, h->handler_); 
} 

//---------------------------------------------------------------------- 

int main() { 
    int const num_of_threads = 4; 
    int const num_of_tasks = 5; 

    boost::asio::io_service ios; 
    boost::asio::strand strand(ios); 


    handler_priority_queue pq; 

    for (int i = 0; i != num_of_tasks; ++i) { 
     ios.post(
#if ENABLE_STRAND 
      strand.wrap(
#endif 
#if ENABLE_PRIORITY 
       pq.wrap(
        i, 
#endif 
        [=] { 
         std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl; 
        } 
#if ENABLE_STRAND 
       ) 
#endif 
#if ENABLE_PRIORITY 
      ) 
#endif 
     ); 
    } 

    std::vector<std::thread> pool; 
    for (int i = 0; i != num_of_threads; ++i) { 
     pool.emplace_back([&]{ 
       std::cout << "before run_one()" << std::endl; 
       while (ios.run_one()) { 
        std::cout << "before poll_one()" << std::endl; 
        while (ios.poll_one()) 
         ; 
        std::cout << "before execute_all()" << std::endl; 
        pq.execute_all(); 
       } 
      } 
     ); 
    } 
    for (auto& t : pool) t.join(); 
} 

Und hier ist ein Ausgang:

before run_one() 
asio_handler_invoke 
add(0) 
queued_handler() 
before poll_one() 
asio_handler_invoke 
add(1) 
queued_handler() 
asio_handler_invoke 
add(2) 
queued_handler() 
asio_handler_invoke 
add(3) 
queued_handler() 
asio_handler_invoke 
add(4) 
queued_handler() 
before execute_all() 
execute(4) 
[called] 4,139903315736320 
execute(3) 
[called] 3,139903315736320 
execute(2) 
[called] 2,139903315736320 
execute(1) 
[called] 1,139903315736320 
execute(0) 
[called] 0,139903315736320 
before run_one() 
before run_one() 
before run_one()