2017-09-12 1 views
3

So können Sie eine std::future erstellen, die keine Arbeit tut, bis .get() genannt wird:Kann ich ausführen, um mein `std :: future` zu ​​erhalten und darauf zu warten?

auto f_deferred = std::async(std::launch::deferred, []{ std::cout << "I ran\n"; }); 

Sie können auch eine std::future schreiben, wartbar ist, und kann in jedem Thread an einer beliebigen Stelle von Code bereit gestellt werden:

std::packaged_task<void()> p([](std::cout << "I also ran\n"; }); 
auto f_waitable = p.get_future(); 

Wenn Sie f_deferred.wait_for(1ms) anrufen, wird es nicht warten warten. Wenn Sie f_deferred.get() nennen, eine Lambda Ihrer Wahl (in diesem Fall eine, die "I ran\n" ausführt druckt.

Wenn Sie f_waitable.get() nennen, gibt es keine Möglichkeit für Code, um die Verwaltung von Aufgaben bewusst sein, dass jemand auf die Zukunft wartet. aber wenn Sie f_deferred.wait(1ms); nennen, erhalten Sie einfach future_status::deferred sofort.

gibt es eine Möglichkeit, diese beiden?

ein konkreter Anwendungsfall ist ein Thread-Pool kombinieren Rückkehr Futures, wenn die Menschen Aufgaben Warteschlange. wenn eine unqueued Zukunft .get() Ich möchte den Thread verwenden, der blockiert ist, um die Aufgabe statt havin auszuführen g es im Leerlauf. Auf der anderen Seite möchte ich, dass Leute mit den zurückgegebenen Futures feststellen können, ob die Aufgabe beendet ist, und sogar eine begrenzte Zeit warten, bis die Aufgabe beendet ist. (in dem Fall, wo Sie warten, bin ich in Ordnung mit Ihrem Thread im Leerlauf während Ihres Wartens)

Scheitern, gibt es Lösungen in kommenden Vorschlägen, die dieses Problem besser als meine Thread-Pool-Rückkehr eine Zukunft mit lösen würde all seine Grenzen? Ich habe gehört, dass Futures keine Zukunft haben und bessere Lösungen für die Lösung von Problem-Futures existieren.

+0

Sie sagen, dass Konsumenten-Threads Lage sein sollten, arbeiten vom Testamentsvollstrecker zu stehlen, wenn das bedeutet, dass sie schneller abgeschlossen werden kann? Ich denke, du musst dein eigenes schreiben. Der Standard würde niemals etwas so Nützliches bieten :) –

+0

@richard Ja; Von Bedeutung ist auch nicht das Deadlocking, da alle Threads in meinem Pool auf Threads warten, die auf Slots warten, um ausgeführt zu werden. – Yakk

+0

Ja, es ist nicht einfach. –

Antwort

0

Ich bin nicht sicher, ob dies genau das ist, was Sie brauchen, aber es dient dem Zweck, zu illustrieren, was ich in dem Kommentar vorgeschlagen habe. Zumindest hoffe ich, dass es Ihnen einige Ideen gibt, um zu implementieren, was Sie brauchen, wenn es nicht alle Ihre Bedürfnisse deckt.

Haftungsausschluss: Dies ist sehr grob. Viele Dinge könnten sicherlich eleganter und effizienter gemacht werden.

#include <iostream> 
#include <thread> 
#include <future> 
#include <memory> 
#include <functional> 
#include <queue> 
#include <random> 
#include <chrono> 
#include <mutex> 

typedef std::packaged_task<void()> task; 
typedef std::shared_ptr<task> task_ptr; 
typedef std::lock_guard<std::mutex> glock; 
typedef std::unique_lock<std::mutex> ulock; 
typedef unsigned int uint; 
typedef unsigned long ulong; 

// For sync'd std::cout 
std::mutex cout_mtx; 

// For task scheduling 
std::mutex task_mtx; 
std::condition_variable task_cv; 

// Prevents main() from exiting 
// before the last worker exits 
std::condition_variable kill_switch; 

// RNG engine 
std::mt19937_64 engine; 

// Random sleep (in ms) 
std::uniform_int_distribution<int> sleep(100, 10000); 

// Task queue 
std::queue<task_ptr> task_queue; 

static uint tasks = 0; 
static std::thread::id main_thread_id; 
static uint workers = 0; 

template<typename T> 
class Task 
{ 
    // Not sure if this needs 
    // to be std::atomic. 
    // A simple bool might suffice. 
    std::atomic<bool> working; 
    task_ptr tp; 

public: 

    Task(task_ptr _tp) 
     : 
      working(false), 
      tp(_tp) 
    {} 

    inline T get() 
    { 
     working.store(true); 
     (*tp)(); 
     return tp->get_future().get(); 
    } 

    inline bool is_working() 
    { 
     return working.load(); 
    } 
}; 

auto task_factory() 
{ 
    return std::make_shared<task>([&] 
    { 
     uint task_id(0); 
     { 
      glock lk(cout_mtx); 
      task_id = ++tasks; 
      if (std::this_thread::get_id() == main_thread_id) 
      { 
       std::cout << "Executing task " << task_id << " in main thread.\n"; 
      } 
      else 
      { 
       std::cout << "Executing task " << task_id << " in worker " << std::this_thread::get_id() << ".\n"; 
      } 
     } 
     std::this_thread::sleep_for(std::chrono::milliseconds(sleep(engine))); 
     { 
      glock lk(cout_mtx); 
      std::cout << "\tTask " << task_id << " completed.\n"; 
     } 
    }); 
} 

auto func_factory() 
{ 
    return [&] 
    { 

     while(true) 
     { 
      ulock lk(task_mtx); 
      task_cv.wait(lk, [&]{ return !task_queue.empty(); }); 
      Task<void> task(task_queue.front()); 
      task_queue.pop(); 

      // Check if the task has been assigned 
      if (!task.is_working()) 
      { 
       // Sleep for a while and check again. 
       // If it is still not assigned after 1 s, 
       // start working on it. 
       // You can also place these checks 
       // directly in Task::get() 
       { 
        glock lk(cout_mtx); 
        std::cout << "\tTask not started, waiting 1 s...\n"; 
       } 
       lk.unlock(); 
       std::this_thread::sleep_for(std::chrono::milliseconds(1000)); 
       lk.lock(); 
       if (!task.is_working()) 
       { 
        { 
         glock lk(cout_mtx); 
         std::cout << "\tTask not started after 1 s, commencing work...\n"; 
        } 
        lk.unlock(); 
        task.get(); 
        lk.lock(); 
       } 

       if (task_queue.empty()) 
       { 
        break; 
       } 
      } 
     } 
    }; 
} 

int main() 
{ 
    engine.seed(std::chrono::high_resolution_clock::now().time_since_epoch().count()); 

    std::cout << "Main thread: " << std::this_thread::get_id() << "\n"; 
    main_thread_id = std::this_thread::get_id(); 

    for (int i = 0; i < 50; ++i) 
    { 
     task_queue.push(task_factory()); 
    } 

    std::cout << "Tasks enqueued: " << task_queue.size() << "\n"; 

    // Spawn 5 workers 
    for (int i = 0; i < 5; ++i) 
    { 
     std::thread([&] 
     { 
      { 
       ulock lk(task_mtx); 
       ++workers; 
       task_cv.wait(lk); 
       { 
        glock lk(cout_mtx); 
        std::cout << "\tWorker started\n"; 
       } 
      } 

      auto fn(func_factory()); 
      fn(); 

      ulock lk(task_mtx); 
      --workers; 
      if (workers == 0) 
      { 
       kill_switch.notify_all(); 
      } 

     }).detach(); 
    } 

    // Notify all workers to start processing the queue 
    task_cv.notify_all(); 

    // This is the important bit: 
    // Tasks can be executed by the main thread 
    // as well as by the workers. 
    // In fact, any thread can grab a task from the queue, 
    // check if it is running and start working 
    // on it if it is not. 
    auto fn(func_factory()); 
    fn(); 

    ulock lk(task_mtx); 
    if (workers > 0) 
    { 
     kill_switch.wait(lk); 
    } 

    return 0; 
} 

Das ist mein CMakeLists.txt

cmake_minimum_required(VERSION 3.2) 

project(tp_wait) 

set(CMAKE_CXX_COMPILER "clang++") 
set(CMAKE_CXX_STANDARD 14) 
set(CMAKE_CXX_STANDARD_REQUIRED ON) 

set(CMAKE_BUILD_TYPE "Debug" CACHE STRING "Build type" FORCE) 

find_package(Threads REQUIRED) 

add_executable(${PROJECT_NAME} "main.cpp") 
target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) 
Verwandte Themen