2016-06-15 7 views
2

Ich versuche ein C++ 11/14-Programm zu schreiben, in dem eine feste Anzahl von Threads (sagen wir 4) kontinuierlich eine Thread-Warteschlange bis dahin ausreizt ist keine Arbeit mehr in der Warteschlange.C++ Multithreading mit einer festen Anzahl von Threads und einer threadsafe-Warteschlange

Thread Warteschlange Umsetzung:

template<typename T> 
class threadsafe_queue 
{ 
private: 
    mutable std::mutex mut; 
    std::queue<T> data_queue; 
    std::condition_variable data_cond; 
public: 
    threadsafe_queue() {} 
    threadsafe_queue(threadsafe_queue const &other) 
    { 
    std::lock_guard<std::mutex> lk(other.mut); 
    data_queue = other.data_queue; 
    } 

    void push(T new_value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    data_queue.push(new_value); 
    data_cond.notify_one(); 
    } 

    void wait_and_pop(T &value) 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    value = data_queue.front(); 
    data_queue.pop(); 
    } 

    std::shared_ptr<T> wait_and_pop() 
    { 
    std::unique_lock<std::mutex> lk(mut); 
    data_cond.wait(lk, [this]{return !data_queue.empty();}); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool try_pop(T &value) 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return false; 
    value = data_queue.front(); 
    data_queue.pop(); 
    return true; 
    } 

    std::shared_ptr<T> try_pop() 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    if (data_queue.empty()) 
     return std::shared_ptr<T>(); 
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front())); 
    data_queue.pop(); 
    return res; 
    } 

    bool empty() const 
    { 
    std::lock_guard<std::mutex> lk(mut); 
    return data_queue.empty(); 
    } 
}; 

Funktion jeder Thread läuft:

void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ } 

Die Haupt in dem die Fäden sollen eine Arbeit aus dem Workqueue nehmen, bis es keine Arbeit in der linken Seite ist Warteschlange:

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    threadsafe_queue<std::string> wqueue; 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     wqueue.push(name); 
    } 
    } 

    /* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue? 
    std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 
    std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw); 

    consumer1.join(); 
    consumer2.join(); 
    consumer3.join(); 
    consumer4.join(); 
    */ 

    errlog.close(); 
    return 0; 
} 

Ich versuchte einen anderen Ansatz basierend auf Nims Antwort unten und Es klappt.

/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */ 

#include <boost/filesystem.hpp> 
#include <regex> 
#include <iostream> 
#include <fstream> 
#include <string> 
#include <pqxx/pqxx> 
#include <zip.h> 
#include <thread> 
#include <boost/asio.hpp> 
#include "threadsafe_oerrlog.h" 

void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog) 
{ 
    std::string fileyearmonth = ziparchivename.substr(27, 6); 
    std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip"; 
    std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv"; 
    int err, r; 
    char buffer[39]; // each line takes up 39 bytes 

    struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err); 
    if (ziparchive) 
    { 
    struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0); 
    if (zipfile) 
    { 
     while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0) 
     { 
     std::string str(buffer); 
     txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")"); 
     } 
     zip_fclose(zipfile); 
     std::cout << fileyearmonth << std::endl; 
    } 
    else 
    { 
     errlog << zipfilepath; 
    } 
    } 
    else 
    { 
    errlog << ziparchivepath; 
    } 

    zip_close(ziparchive); 
} 


int main() 
{ 
    pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn1(conn1); 
    pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn2(conn2); 
    pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn3(conn3); 
    pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J"); 
    pqxx::nontransaction txn4(conn4); 

    std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt"); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service1; // queue 
    boost::asio::io_service service2; 
    boost::asio::io_service service3; 
    boost::asio::io_service service4; 

    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    int serviceid = 0; 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     serviceid %= 3; 
     switch (serviceid) 
     { 
     case 0 : 
      service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); }); 
      break; 
     case 1 : 
      service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); }); 
      break; 
     case 2 : 
      service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); }); 
      break; 
     case 3 : 
      service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); }); 
      break; 
     } 
     ++serviceid; 
    } 
    } 

    std::thread t1([&service1]() { service1.run(); }); 
    std::thread t2([&service2]() { service2.run(); }); 
    std::thread t3([&service3]() { service3.run(); }); 
    std::thread t4([&service4]() { service4.run(); }); 

    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 

} 

Nicht sicher, welcher Ansatz schneller ist, aber ich denke, es hängt von Arbeitslast und Plattform ab, an der man arbeitet. Einen Versuch wert zu sehen, was schneller ist. Irgendwelche Kommentare, welcher Ansatz wäre schneller und wer wird geschätzt.

+0

Fragen wie diese sind hier nicht willkommen und führen oft zu schweren downvoting. Teilen Sie uns mit, was Sie bis jetzt implementiert oder versucht haben und stellen Sie spezifische Fragen dazu, was in Ihrem Code nicht funktioniert. Sagte, willkommen in SO! – Arunmu

+0

Sehen Sie sich die [thread support library] (http://en.cppreference.com/w/cpp/thread) an. Sie finden dort die meisten Sachen, die Sie brauchen. – Aconcagua

+0

Editiert die Frage zu teilen, was ich versucht habe. – vorlket

Antwort

2

Wenn dies nicht zum Lernen/etwas ist, wo es nicht schnell genug ist, würde ich diese Crud-Operationen an einen bestehenden Mechanismus delegieren. Und ich ziehe boost::asio::io_service für diese genaue Art der Sache zu nutzen ..

-Code wäre:

// Additional header 
#include <boost/asio.hpp> 

int main() 
{ 
    std::ofstream errlog 
    errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out); 
    OFStreamWriter ofsw(&errlog); 

    boost::asio::io_service service; // queue 
    boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data"); 
    std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip"); 
    for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter) 
    { 
    std::string name = iter->path().filename().string(); 
    if (std::regex_match(name, pattern_fx)) 
    { 
     service.post([name]() { 
     // Do something with this file 
     }); 
    } 
    } 
    // Now start-up n-threads to dispatch on the io_service 
    std::thread t1([&service]() { service.run(); }); // this will dispatch on queue until there is nothing left to do... 
    std::thread t2([&service]() { service.run(); }); 
    std::thread t3([&service]() { service.run(); }); 
    std::thread t4([&service]() { service.run(); }); 
    : 

    // Wait for them to complete 
    t1.join(); 
    t2.join(); 
    t3.join(); 
    t4.join(); 
} 
+0

Nim, ich habe einen Funktionsaufruf in das Lambda eingefügt, aber die Threads scheinen keine Arbeit zu tun. Der Code, den ich ausprobiert habe, befindet sich in der obigen Frage. – vorlket

+0

@vorlket, die Änderungen sehen gut aus, ich kann nichts speziell falsch damit sehen - könnte etwas Protokollierung helfen, zu sehen, wo die Dinge sind? – Nim

+0

Es gab einen Fehler mit dem relativen/absoluten Pfad in der Funktion inserintobidask. Es klappt. Danke für das Teilen. – vorlket

Verwandte Themen