2013-05-01 19 views
9

Ich habe einige Modul-basierte Active Object Design Pattern implementiert. Es ist eine sehr einfache Implementierung. Ich habe Scheduler, ActivationList, Requests und Futures, um eine Antwort zu bekommen. Meine Anforderungen waren wie folgt aus:boost :: asio und Active Object

  • Zugriff auf aktives Objekt wird durch die Ausführung seiner Methoden in seinem eigenen Thread (Haupt req und Annahme von Active Object Entwurfsmustern) serialisiert wird
  • Anrufer muss in der Lage sein, zu spezifizieren die Priorität der Ausführung von Anforderungen. Dies bedeutet, dass, wenn mehr als null Anforderungen auf die Ausführung warten, diese nach der Priorität sortiert werden sollen, die jeder Anforderung zugewiesen wurde. Anfragen mit höherer Priorität sollen zuerst ausgeführt werden, wenn also immer einige Anfragen auf der Aktivierungsliste stehen und diese eine höhere Priorität haben als eine gegebene Anfrage, wird diese Anfrage niemals ausgeführt - es ist OK für mich
  • Es soll möglich sein geben Sie die maximale Anzahl der Anfragen auf der Liste anhängig (Begrenzung der Speichernutzung)
  • Es muss möglich sein, alle anhängigen Anträge werden
  • Anträge der Lage sein, zu entkräften Werte zurück (Blockierung der Anrufer) OR wird nur ohne Wert ausgeführt werden Rückkehr, aber der Anrufer muss blockiert werden, bis die Anfrage bearbeitet wird ODER der Anrufer nicht blockiert werden soll und es nicht wichtig ist, ob die Anfrage bearbeitet wurde oder nicht g
  • Kurz vor der Ausführung der Anforderung muss eine Schutzmethode ausgeführt werden, um zu prüfen, ob die gegebene Anforderung ausgeführt werden soll oder nicht. Wenn nicht - es wird etwas nicht definierten Wert zu Aufrufer (in meiner aktuellen Implementierung ist es boost :: none, weil jede Anforderung Rückgabetyp ist boost :: optional)

OK jetzt Frage: Ist es möglich, zu verwenden, boost :: asio und erfülle alle meine Anforderungen? Meine Implementierung funktioniert, aber ich würde gerne etwas verwenden, was wahrscheinlich viel besser implementiert ist als das, was ich getan habe. Auch ich würde es gerne für die Zukunft wissen und nicht "das Rad neu erfinden".

+0

Boost Asio nicht blockiert. Der letzte Teil Ihrer Vorletzten wird von der letzten Aussage abgedeckt. alles andere ist komplett in regulärem C++ ohne boost möglich, allerdings zugegebenermaßen einfacher. Vielleicht möchten Sie auch die Boost-Serialisierung überprüfen, wenn Sie sie nicht bereits verwenden. – johnathon

+0

Ich habe es bereits mit Plain C++ implementiert. Eigentlich mit einer großen Hilfe von Boost-Thread und Boost-Multi-Index-Conatiner. Aber das Ziel ist, meine ipmplementation nicht zu verwenden und stattdessen boost :: asio zu verwenden. – user2301299

Antwort

28

Boost.Asio kann verwendet werden, um die Absicht von Active Object zu umfassen: Entkoppeln der Methodenausführung vom Methodenaufruf. Zusätzliche Anforderungen müssen auf einer höheren Ebene gehandhabt werden, aber es ist nicht übermäßig komplex, wenn Boost.Asio in Verbindung mit anderen Boost-Bibliotheken verwendet wird.

Scheduler könnte verwenden:

  • boost::thread für Gewinde Abstraktion.
  • boost::thread_group, um die Lebensdauer von Threads zu verwalten.
  • boost::asio::io_service einen Threadpool bereitstellen. Will wahrscheinlich boost::asio::io_service::work verwenden, um Threads am Leben zu halten, wenn keine Arbeit ansteht.

    ActivationList könnte so implementiert werden. Bei einer angedeuteten Position insert() bleibt der Anzeigenauftrag für die Anforderung mit der gleichen Priorität erhalten.

  • std::multiset oder std::multimap kann verwendet werden. In C++ 03 ist jedoch die Reihenfolge der Anfrage mit dem gleichen Schlüssel (Priorität) nicht spezifiziert.
  • Wenn Request keine Schutzmethode benötigt, kann std::priority_queue verwendet werden.

Request könnte eine nicht spezifizierte Art sein:

  • boost::function und boost::bind verwendet werden könnte, einen Typ-Löschung zu schaffen, während ohne die Einführung einer Request Hierarchie aufrufbare Typen binden.
  • Futures könnte Boost.Thread Futures Unterstützung verwenden.

    • future.valid() gibt true zurück, wenn Request-ActivationList hinzugefügt wurde.
    • future.wait() blockiert die Wartezeit bis ein Ergebnis verfügbar wird.
    • future.get() blockiert das Warten auf das Ergebnis.
    • Wenn Anrufer nichts mit der future tut, wird der Anrufer nicht blockiert.
    • Ein weiterer Vorteil der Verwendung von Boost.Thread Futures ist, dass Ausnahmen, die von Request stammen, an die Future weitergegeben werden.

    Hier ist ein komplettes Beispiel verschiedene Boost-Bibliotheken nutzen und die Anforderungen erfüllen sollte:

    // Standard includes 
    #include <algorithm> // std::find_if 
    #include <iostream> 
    #include <string> 
    
    // 3rd party includes 
    #include <boost/asio.hpp> 
    #include <boost/bind.hpp> 
    #include <boost/function.hpp> 
    #include <boost/make_shared.hpp> 
    #include <boost/multi_index_container.hpp> 
    #include <boost/multi_index/ordered_index.hpp> 
    #include <boost/multi_index/member.hpp> 
    #include <boost/shared_ptr.hpp> 
    #include <boost/thread.hpp> 
    #include <boost/utility/result_of.hpp> 
    
    /// @brief scheduler that provides limits with prioritized jobs. 
    template <typename Priority, 
          typename Compare = std::less<Priority> > 
    class scheduler 
    { 
    public: 
        typedef Priority priority_type; 
    private: 
    
        /// @brief method_request is used to couple the guard and call 
        ///  functions for a given method. 
        struct method_request 
        { 
        typedef boost::function<bool()> ready_func_type; 
        typedef boost::function<void()> run_func_type; 
    
        template <typename ReadyFunctor, 
           typename RunFunctor> 
        method_request(ReadyFunctor ready, 
            RunFunctor run) 
         : ready(ready), 
         run(run) 
        {} 
    
        ready_func_type ready; 
        run_func_type run; 
        }; 
    
        /// @brief Pair type used to associate a request with its priority. 
        typedef std::pair<priority_type, 
            boost::shared_ptr<method_request> > pair_type; 
    
        static bool is_method_ready(const pair_type& pair) 
        { 
        return pair.second->ready(); 
        } 
    
    public: 
    
        /// @brief Construct scheduler. 
        /// 
        /// @param max_threads Maximum amount of concurrent task. 
        /// @param max_request Maximum amount of request. 
        scheduler(std::size_t max_threads, 
          std::size_t max_request) 
        : work_(io_service_), 
         max_request_(max_request), 
         request_count_(0) 
        { 
        // Spawn threads, dedicating them to the io_service. 
        for (std::size_t i = 0; i < max_threads; ++i) 
         threads_.create_thread(
         boost::bind(&boost::asio::io_service::run, &io_service_)); 
        } 
    
        /// @brief Destructor. 
        ~scheduler() 
        { 
        // Release threads from the io_service. 
        io_service_.stop(); 
        // Cleanup. 
        threads_.join_all(); 
        } 
    
        /// @brief Insert a method request into the scheduler. 
        /// 
        /// @param priority Priority of job. 
        /// @param ready_func Invoked to check if method is ready to run. 
        /// @param run_func Invoked when ready to run. 
        /// 
        /// @return future associated with the method. 
        template <typename ReadyFunctor, 
          typename RunFunctor> 
        boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
        insert(priority_type priority, 
         const ReadyFunctor& ready_func, 
         const RunFunctor& run_func) 
        { 
        typedef typename boost::result_of<RunFunctor()>::type result_type; 
        typedef boost::unique_future<result_type> future_type; 
    
        boost::unique_lock<mutex_type> lock(mutex_); 
    
        // If max request has been reached, then return an invalid future. 
        if (max_request_ && 
         (request_count_ == max_request_)) 
         return future_type(); 
    
        ++request_count_; 
    
        // Use a packaged task to handle populating promise and future. 
        typedef boost::packaged_task<result_type> task_type; 
    
        // Bind does not work with rvalue, and packaged_task is only moveable, 
        // so allocate a shared pointer. 
        boost::shared_ptr<task_type> task = 
         boost::make_shared<task_type>(run_func); 
    
        // Create method request. 
        boost::shared_ptr<method_request> request = 
         boost::make_shared<method_request>(
         ready_func, 
         boost::bind(&task_type::operator(), task)); 
    
        // Insert into priority. Hint to inserting as close to the end as 
        // possible to preserve insertion order for request with same priority. 
        activation_list_.insert(activation_list_.end(), 
              pair_type(priority, request)); 
    
        // There is now an outstanding request, so post to dispatch. 
        io_service_.post(boost::bind(&scheduler::dispatch, this)); 
    
        return task->get_future(); 
        } 
    
        /// @brief Insert a method request into the scheduler. 
        /// 
        /// @param ready_func Invoked to check if method is ready to run. 
        /// @param run_func Invoked when ready to run. 
        /// 
        /// @return future associated with the method. 
        template <typename ReadyFunctor, 
          typename RunFunctor> 
        boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
        insert(const ReadyFunctor& ready_func, 
         const RunFunctor& run_func) 
        { 
        return insert(priority_type(), ready_func, run_func); 
        } 
    
        /// @brief Insert a method request into the scheduler. 
        /// 
        /// @param priority Priority of job. 
        /// @param run_func Invoked when ready to run. 
        /// 
        /// @return future associated with the method. 
        template <typename RunFunctor> 
        boost::unique_future<typename boost::result_of<RunFunctor()>::type> 
        insert(priority_type priority, 
         const RunFunctor& run_func) 
        { 
        return insert(priority, &always_ready, run_func); 
        } 
    
        /// @brief Insert a method request with default priority into the 
        ///  scheduler. 
        /// 
        /// @param run_func Invoked when ready to run. 
        /// 
        /// @param functor Job to run. 
        /// 
        /// @return future associated with the job. 
        template <typename RunFunc> 
        boost::unique_future<typename boost::result_of<RunFunc()>::type> 
        insert(const RunFunc& run_func) 
        { 
        return insert(&always_ready, run_func); 
        } 
    
        /// @brief Cancel all outstanding request. 
        void cancel() 
        { 
        boost::unique_lock<mutex_type> lock(mutex_); 
        activation_list_.clear(); 
        request_count_ = 0; 
        } 
    
    private: 
    
        /// @brief Dispatch a request. 
        void dispatch() 
        { 
        // Get the current highest priority request ready to run from the queue. 
        boost::unique_lock<mutex_type> lock(mutex_); 
        if (activation_list_.empty()) return; 
    
        // Find the highest priority method ready to run. 
        typedef typename activation_list_type::iterator iterator; 
        iterator end = activation_list_.end(); 
        iterator result = std::find_if(
         activation_list_.begin(), end, &is_method_ready); 
    
        // If no methods are ready, then post into dispatch, as the 
        // method may have become ready. 
        if (end == result) 
        { 
         io_service_.post(boost::bind(&scheduler::dispatch, this)); 
         return; 
        } 
    
        // Take ownership of request. 
        boost::shared_ptr<method_request> method = result->second; 
        activation_list_.erase(result); 
    
        // Run method without mutex. 
        lock.unlock(); 
        method->run();  
        lock.lock(); 
    
        // Perform bookkeeping. 
        --request_count_; 
        } 
    
        static bool always_ready() { return true; } 
    
    private: 
    
        /// @brief List of outstanding request. 
        typedef boost::multi_index_container< 
        pair_type, 
        boost::multi_index::indexed_by< 
         boost::multi_index::ordered_non_unique< 
         boost::multi_index::member<pair_type, 
                typename pair_type::first_type, 
                &pair_type::first>, 
         Compare 
         > 
        > 
        > activation_list_type; 
        activation_list_type activation_list_; 
    
        /// @brief Thread group managing threads servicing pool. 
        boost::thread_group threads_; 
    
        /// @brief io_service used to function as a thread pool. 
        boost::asio::io_service io_service_; 
    
        /// @brief Work is used to keep threads servicing io_service. 
        boost::asio::io_service::work work_; 
    
        /// @brief Maximum amount of request. 
        const std::size_t max_request_; 
    
        /// @brief Count of outstanding request. 
        std::size_t request_count_; 
    
        /// @brief Synchronize access to the activation list. 
        typedef boost::mutex mutex_type; 
        mutex_type mutex_; 
    }; 
    
    typedef scheduler<unsigned int, 
            std::greater<unsigned int> > high_priority_scheduler; 
    
    /// @brief adder is a simple proxy that will delegate work to 
    ///  the scheduler. 
    class adder 
    { 
    public: 
        adder(high_priority_scheduler& scheduler) 
        : scheduler_(scheduler) 
        {} 
    
        /// @brief Add a and b with a priority. 
        /// 
        /// @return Return future result. 
        template <typename T> 
        boost::unique_future<T> add(
        high_priority_scheduler::priority_type priority, 
        const T& a, const T& b) 
        { 
        // Insert method request 
        return scheduler_.insert(
         priority, 
         boost::bind(&adder::do_add<T>, a, b)); 
        } 
    
        /// @brief Add a and b. 
        /// 
        /// @return Return future result. 
        template <typename T> 
        boost::unique_future<T> add(const T& a, const T& b) 
        { 
        return add(high_priority_scheduler::priority_type(), a, b); 
        } 
    
    private: 
    
        /// @brief Actual add a and b. 
        template <typename T> 
        static T do_add(const T& a, const T& b) 
        { 
        std::cout << "Starting addition of '" << a 
           << "' and '" << b << "'" << std::endl; 
        // Mimic busy work. 
        boost::this_thread::sleep_for(boost::chrono::seconds(2)); 
        std::cout << "Finished addition" << std::endl; 
        return a + b; 
        } 
    
    private: 
        high_priority_scheduler& scheduler_; 
    }; 
    
    bool get(bool& value) { return value; } 
    void guarded_call() 
    { 
        std::cout << "guarded_call" << std::endl; 
    } 
    
    int main() 
    { 
        const unsigned int max_threads = 1; 
        const unsigned int max_request = 4; 
    
        // Sscheduler 
        high_priority_scheduler scheduler(max_threads, max_request); 
    
        // Proxy 
        adder adder(scheduler); 
    
        // Client 
    
        // Add guarded method to scheduler. 
        bool ready = false; 
        std::cout << "Add guarded method." << std::endl; 
        boost::unique_future<void> future1 = scheduler.insert(
        boost::bind(&get, boost::ref(ready)), 
        &guarded_call); 
    
        // Add 1 + 100 with default priority. 
        boost::unique_future<int> future2 = adder.add(1, 100); 
    
        // Force sleep to try to get scheduler to run request 2 first. 
        boost::this_thread::sleep_for(boost::chrono::seconds(1)); 
    
        // Add: 
        // 2 + 200 with low priority (5) 
        // "test" + "this" with high priority (99) 
        boost::unique_future<int> future3 = adder.add(5, 2, 200); 
        boost::unique_future<std::string> future4 = adder.add(99, 
        std::string("test"), std::string("this")); 
    
        // Max request should have been reached, so add another. 
        boost::unique_future<int> future5 = adder.add(3, 300); 
    
        // Check if request was added. 
        std::cout << "future1 is valid: " << future1.valid() 
          << "\nfuture2 is valid: " << future2.valid() 
          << "\nfuture3 is valid: " << future3.valid() 
          << "\nfuture4 is valid: " << future4.valid() 
          << "\nfuture5 is valid: " << future5.valid() 
          << std::endl; 
    
        // Get results for future2 and future3. Do nothing with future4's results. 
        std::cout << "future2 result: " << future2.get() 
          << "\nfuture3 result: " << future3.get() 
          << std::endl; 
    
        std::cout << "Unguarding method." << std::endl; 
        ready = true; 
        future1.wait(); 
    } 
    

    Die Ausführung verwendet Threadpool von 1 mit einem Maximum von 4 Anfrage.

  • request1 wird bis zum Ende des Programms geschützt und sollte als letztes ausgeführt werden.
  • request2 (1 + 100) wird mit der Standardpriorität eingefügt und sollte zuerst ausgeführt werden.
  • request3 (2 + 200) wird mit niedriger Priorität eingefügt und sollte nach request4 ausgeführt werden.
  • request4 ('test' + 'this') wird mit hoher Priorität eingefügt und sollte vor request3 ausgeführt werden.
  • request5 sollte aufgrund von max-Anforderung nicht eingefügt werden und sollte nicht gültig sein.

Die Ausgabe ist wie folgt:

Add guarded method. 
Starting addition of '1' and '100' 
future1 is valid: 1 
future2 is valid: 1 
future3 is valid: 1 
future4 is valid: 1 
future5 is valid: 0 
Finished addition 
Starting addition of 'test' and 'this' 
Finished addition 
Starting addition of '2' and '200' 
Finished addition 
future2 result: 101 
future3 result: 202 
Unguarding method. 
guarded_call
+1

Danke für diese Antwort, ich wünschte, ich könnte dir mehr als 1 Upvote geben. – MrEvil

+0

Sehr hilfreiche Post, was Boost fehlt ist Use Cases, die ich nicht github/so für – arynaq