Ich versuche ein C++ 11/14-Programm zu schreiben, in dem eine feste Anzahl von Threads (sagen wir 4) kontinuierlich eine Thread-Warteschlange bis dahin ausreizt ist keine Arbeit mehr in der Warteschlange.C++ Multithreading mit einer festen Anzahl von Threads und einer threadsafe-Warteschlange
Thread Warteschlange Umsetzung:
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const &other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
Funktion jeder Thread läuft:
void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ }
Die Haupt in dem die Fäden sollen eine Arbeit aus dem Workqueue nehmen, bis es keine Arbeit in der linken Seite ist Warteschlange:
int main()
{
std::ofstream errlog
errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out);
OFStreamWriter ofsw(&errlog);
threadsafe_queue<std::string> wqueue;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
wqueue.push(name);
}
}
/* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue?
std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
*/
errlog.close();
return 0;
}
Ich versuchte einen anderen Ansatz basierend auf Nims Antwort unten und Es klappt.
/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */
#include <boost/filesystem.hpp>
#include <regex>
#include <iostream>
#include <fstream>
#include <string>
#include <pqxx/pqxx>
#include <zip.h>
#include <thread>
#include <boost/asio.hpp>
#include "threadsafe_oerrlog.h"
void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog)
{
std::string fileyearmonth = ziparchivename.substr(27, 6);
std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip";
std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv";
int err, r;
char buffer[39]; // each line takes up 39 bytes
struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err);
if (ziparchive)
{
struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0);
if (zipfile)
{
while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0)
{
std::string str(buffer);
txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")");
}
zip_fclose(zipfile);
std::cout << fileyearmonth << std::endl;
}
else
{
errlog << zipfilepath;
}
}
else
{
errlog << ziparchivepath;
}
zip_close(ziparchive);
}
int main()
{
pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn1(conn1);
pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn2(conn2);
pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn3(conn3);
pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn4(conn4);
std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt");
OFStreamWriter ofsw(&errlog);
boost::asio::io_service service1; // queue
boost::asio::io_service service2;
boost::asio::io_service service3;
boost::asio::io_service service4;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
int serviceid = 0;
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
serviceid %= 3;
switch (serviceid)
{
case 0 :
service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); });
break;
case 1 :
service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); });
break;
case 2 :
service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); });
break;
case 3 :
service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); });
break;
}
++serviceid;
}
}
std::thread t1([&service1]() { service1.run(); });
std::thread t2([&service2]() { service2.run(); });
std::thread t3([&service3]() { service3.run(); });
std::thread t4([&service4]() { service4.run(); });
t1.join();
t2.join();
t3.join();
t4.join();
}
Nicht sicher, welcher Ansatz schneller ist, aber ich denke, es hängt von Arbeitslast und Plattform ab, an der man arbeitet. Einen Versuch wert zu sehen, was schneller ist. Irgendwelche Kommentare, welcher Ansatz wäre schneller und wer wird geschätzt.
Fragen wie diese sind hier nicht willkommen und führen oft zu schweren downvoting. Teilen Sie uns mit, was Sie bis jetzt implementiert oder versucht haben und stellen Sie spezifische Fragen dazu, was in Ihrem Code nicht funktioniert. Sagte, willkommen in SO! – Arunmu
Sehen Sie sich die [thread support library] (http://en.cppreference.com/w/cpp/thread) an. Sie finden dort die meisten Sachen, die Sie brauchen. – Aconcagua
Editiert die Frage zu teilen, was ich versucht habe. – vorlket