2017-08-10 3 views
0

Betrachten Sie eine Situation, in der Sie 256 TCP-Verbindungen mit Geräten aufrechterhalten müssen, nur um gelegentlich Befehle zu senden. Ich möchte dies parallel tun (es muss blockieren, bis es die Antwort bekommt), ich versuche, QThreadPool für diesen Zweck zu verwenden, aber ich habe einige Zweifel, wenn es möglich ist.Umgang mit mehreren Verbindungen mit QThreadPool

Ich versuchte QRunnable zu verwenden, aber ich bin nicht sicher, wie Steckdosen zwischen Threads verhalten (Buchsen sollten nur in Gewinde verwendet werden, dass sie erstellt wurden?)

Ich bin auch besorgt über die Effizienz dieser Lösung Ich würde mich freuen, wenn jemand Alternativen vorschlagen könnte, nicht unbedingt QT.

Im Folgenden poste ich einige Codeschnipsel.

class Task : public QRunnable { 

    Task(){ 
     //creating TaskSubclass instance and socket in it 
    } 

private: 
    TaskSubclass    *sub; 

    void run() override { 
     //some debug info and variable setting... 
     sub->doSomething(args); 
     return; 
    } 
}; 

class TaskSubclass { 
    Socket   *sock;   // socket instance 
    //... 
    void doSomething(args) 
    { 
     //writing to socket here 
    } 
} 

class MainProgram : public QObject{ 
    Q_OBJECT 
private: 
    QThreadPool *pool; 
    Task *tasks; 

public: 
    MainProgram(){ 
     pool = new QThreadPool(this); 
     //create tasks here 
    } 

    void run(){ 
     //decide which task to start 
     pool->start(tasks[i]); 
    } 
}; 
+1

kann ich fragen, warum Sie mehrere Threads für die Lösung verwenden möchten? Das Event-System von Qt ist in der Lage, hunderte von Tcp-Verbindungen über einen Thread ohne Probleme zu bewältigen, oder blockiert die "socket data handler" -Logik den Haupt-Thread? – Spiek

+0

Danke für den Kommentar, ich habe gerade Post bearbeitet. Es muss blockieren, bis es die Antwort erhält. – Vido

+0

Gibt es einen Grund, warum Sie nicht Signal und Slot-Mechanismus verwenden? Sie könnten einfach das ReadyRead-Signal der QTcpSocket verwenden und es mit einem Steckplatz verbinden. – Spiek

Antwort

0

Meine Lieblings Lösung für dieses Problem ist durch Multiplexen Ihre Sockets select(). Auf diese Weise müssen Sie keine zusätzlichen Threads erstellen, und es ist ein "sehr POSIX" Weg, dies zu tun.

Siehe zum Beispiel dieses Tutorial sehen:

http://www.binarytides.com/multiple-socket-connections-fdset-select-linux/

Oder eine ähnliche Frage in:

Using select(..) on client

+0

Ich möchte es auf einer höheren Abstraktionsebene halten, vorzugsweise mit Wrappern wie QTcpSockets oder Boost-Sockets, so dass ich nicht mit dem Rohspeicher arbeiten muss. – Vido

+0

Nachdem ich dachte, ich könnte rohe Deskriptoren aus diesen Wraps nur für den Moment der Verwendung von select() nehmen, aber ich möchte es parallel machen. Es ist immer noch eine gute Alternative, danke dafür. – Vido

0

Wie OMD_AT hat allready die beste Lösung wies darauf hin, ist Select() zu verwenden, und lass den Kernel die Arbeit für dich erledigen :-)

Hier hast du ein Beispiel für ein As ync-Ansatz und ein Syncron-Multi-Thread-Ansatz.

In diesem Beispiel erstellen wir 10 Verbindung zu einem Google Webservice und machen eine einfache Anfrage an den Server, wir messen, wie lange alle Verbindungen in jedem Ansatz benötigt, um die Antwort vom Google Server zu erhalten.

Beachten Sie, dass Sie einen schnelleren Webserver verwenden sollten, um einen echten Test wie den localhost durchzuführen, da die Netzwerklatenz einen großen Einfluss auf das Ergebnis hat.

#include <QCoreApplication> 
#include <QTcpSocket> 
#include <QtConcurrent/QtConcurrentRun> 
#include <QElapsedTimer> 
#include <QAtomicInt> 

class Task : public QRunnable 
{ 
    public: 
     Task() : QRunnable() {} 
     static QAtomicInt counter; 
     static QElapsedTimer timer; 
     virtual void run() override 
     { 
      QTcpSocket* socket = new QTcpSocket(); 
      socket->connectToHost("www.google.com", 80); 
      socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
      socket->waitForReadyRead(); 
      if(!--counter) { 
       qDebug("Multiple Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
      } 
     } 
}; 

QAtomicInt Task::counter; 
QElapsedTimer Task::timer; 

int main(int argc, char *argv[]) 
{ 
    QCoreApplication app(argc, argv); 

    // init 
    int connections = 10; 
    Task::counter = connections; 
    QElapsedTimer timer; 

    /// Async via One Thread (Select) 

    // handle the data 
    auto dataHandler = [&timer,&connections](QByteArray data) { 
     Q_UNUSED(data); 
     if(!--connections) qDebug(" Single Threads elapsed: %lld nanoseconds", timer.nsecsElapsed()); 
    }; 

    // create 10 connection to google.com and send an http get request 
    timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QTcpSocket* socket = new QTcpSocket(); 
     socket->connectToHost("www.google.com", 80); 
     socket->write("GET/HTTP/1.1\r\nHost: www.google.com\r\n\r\n"); 
     QObject::connect(socket, &QTcpSocket::readyRead, [dataHandler,socket]() { 
      dataHandler(socket->readAll()); 
     }); 
    } 


    /// Async via Multiple Threads 

    Task::timer.start(); 
    for(int i = 0; i < connections; i++) { 
     QThreadPool::globalInstance()->start(new Task()); 
    } 

    return app.exec(); 
} 

Drucke:

Multiple Threads elapsed: 62324598 nanoseconds 
    Single Threads elapsed: 63613967 nanoseconds 
+0

Wie ich oben in den Kommentaren erwähnt habe, möchte ich wirklich viele Kerne im Computer nutzen, die für diese Software bestimmt sind. – Vido

+0

Ich habe eine Unterstützung für die Unterstützung mehrerer Threads hinzugefügt – Spiek

+0

Ist es sicher, Socket im Haupt-Thread zu erstellen und einige Operationen in einem anderen Thread auszuführen? Auch aus Gründen der Effizienz können Sie die Leistung dieser Lösung und den select() -Ansatz vergleichen? (Gibt es einen großen Unterschied) – Vido

0

Obwohl, ist die Antwort bereits akzeptiert, ich möchte meine)

Was ich aus Ihrer Frage verstanden teilen: 256 Mit derzeit aktiv Von Zeit zu Zeit senden Sie eine Anfrage ("Befehl", wie Sie es genannt haben) an eine von ihnen und warten auf die Antwort. In der Zwischenzeit möchten Sie diesen Prozess Multithread machen, und obwohl Sie sagten "Es muss blockieren, bis es die Antwort erhält", nehme ich an, Sie implizierte blockieren einen Thread, der Anfrage-Antwort Prozess behandelt, aber nicht den Hauptthread.

Wenn ich in der Tat die Frage richtig verstehen, ist hier, wie ich vorschlagen, es mit Qt zu tun:

#include <functional> 

#include <QObject>   // need to add "QT += core" in .pro 
#include <QTcpSocket>  // QT += network 
#include <QtConcurrent>  // QT += concurrent 
#include <QFuture>   
#include <QFutureWatcher> 

class CommandSender : public QObject 
{ 
public: 
    // Sends a command via connection and blocks 
    // until the response arrives or timeout occurs 
    // then passes the response to a handler 
    // when the handler is done - unblocks 
    void SendCommand(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler)(Response&&)) 
    { 
     const int timeout = 1000; // milliseconds, set it to -1 if you want no timeouts 

     // Sending a command (blocking) 
     connection.write(command.ToByteArray()); // Look QByteArray for more details 
     if (connection.waitForBytesWritten(timeout) { 
      qDebug() << connection.errorString() << endl; 
      emit error(connection); 
      return; 
     } 

     // Waiting for a response (blocking) 
     QDataStream in{ connection, QIODevice::ReadOnly }; 
     QString message; 
     do { 
      if (!connection.waitForReadyRead(timeout)) { 
       qDebug() << connection.errorString() << endl; 
       emit error(connection); 
       return; 
      } 
      in.startTransaction(); 
      in >> message; 
     } while (!in.commitTransaction()); 

     responseHandler(Response{ message }); // Translate message to a response and handle it 
    } 

    // Non-blocking version of SendCommand 
    void SendCommandAsync(
     QTcpSocket* connection, 
     const Command& command, 
     void(*responseHandler) (Response&&)) 
    { 
     QFutureWatcher<void>* watcher = new QFutureWatcher<void>{ this }; 
     connect(watcher, &QFutureWatcher<void>::finished, [connection, watcher]() 
     { 
      emit done(connection); 
      watcher->deleteLater(); 
     }); 

     // Does not block, 
     // emits "done" when finished 
     QFuture<void> future 
      = QtConcurrent::run(this, &CommandSender::SendCommand, connection, command, responseHandler); 
     watcher->setFuture(future); 
    } 

signals: 
    void done(QTcpSocket* connection); 
    void error(QTcpSocket* connection); 
} 

Jetzt können Sie einen Befehl an eine Steckdose mit einem separaten Thread von einem Thread-Pool genommen senden: unter Die Haube QtConcurrent::run() verwendet die von Qt für Sie bereitgestellte globale Instanz QThreadPool. Dieser Thread blockiert, bis er eine Antwort zurückbekommt, und behandelt sie dann mit responseHandler. Währenddessen bleibt Ihr Hauptthread, der alle Ihre Befehle und Sockets verwaltet, nicht blockiert. Fangen Sie einfach done() Signal, das sagt, dass die Antwort empfangen und erfolgreich behandelt wurde.

Eine Sache zu beachten: asynchrone Version sendet Anfrage nur, wenn es einen freien Thread im Thread-Pool gibt und darauf wartet, andernfalls. Natürlich ist dies das Verhalten für jeden Thread-Pool (das ist genau der Punkt eines solchen Musters), aber vergessen Sie das nicht.

Auch ich schrieb Code ohne Qt in handliche, so kann einige Fehler enthalten.

Edit: Wie sich herausstellte, ist dies nicht Thread-sicher, da Sockets nicht in Qt reentrant sind.

Sie können einen Mutex mit einem Socket verknüpfen und ihn jedes Mal sperren, wenn Sie seine Funktion ausführen. Dies kann leicht geschehen, indem ein Wrapper für die QTcpSocket-Klasse erstellt wird. Bitte korrigieren Sie mich, wenn ich falsch liege.

+0

Danke, es sieht gut aus, aber sollte es nicht ein Problem mit Thread-Sicherheit geben, wenn wir den Zeiger auf Socket in QtConcureent.run() übergeben? Ich habe gehört, dass Sockets nur in Threads verwendbar sind, in denen sie erstellt werden, und sie zwischen verschiedenen Threads zu übergeben, ist nicht sicher (ich weiß nicht, wo Sockets erstellt werden sollten, da der Main Thread nicht sicher wäre?). Bitte korrigieren Sie mich, wenn ich falsch liege oder wenn Sie eine Erklärung haben, dass dies sicher ist. – Vido

+0

Sie haben Recht, es ist nicht threadsicher. Der Grund dafür ist - Sockets in Qt sind nicht reentrant (ich vergaß, sorry). Ich bin also auf die Situation gestoßen, in der ich versuche, Sockets vom Haupt-Thread zu verbinden/trennen und einen anderen Worker-Thread zu lesen/schreiben. Mein Fehler. ** Aber ** führt zu der logischen Schlussfolgerung, dass ** alle ** Operationen auf einem Socket in demselben Thread ausgeführt werden müssen, egal was (zumindest in Qt). – WindyFields

Verwandte Themen