2017-05-07 3 views
0

Ich arbeite an einem einfachen TCP-Server, der liest und schreibt seine Nachrichten in Thread-Warteschlange. Die Anwendung kann dann diese Warteschlange verwenden, um den Socket auch von verschiedenen Threads sicher zu lesen und zu schreiben.Boost asio async Lesen und Schreiben in Socket mit Warteschlange

Das Problem, mit dem ich konfrontiert bin, ist, dass ich nicht async_read. Meine Warteschlange hat die Operation pop, die das nächste zu verarbeitende Element zurückgibt, blockiert aber, wenn nichts verfügbar ist. Also, sobald ich pop rufe, wird der async_read Callback natürlich nicht mehr ausgelöst. Gibt es eine Möglichkeit, eine solche Warteschlange in boost asio zu integrieren oder muss ich das komplett neu schreiben?

Unten ist ein kurzes Beispiel, das ich gemacht habe, um das Problem zu zeigen, das ich habe. Sobald eine TCP-Verbindung hergestellt ist, erstelle ich einen neuen Thread, der die Anwendung unter dieser tcp_connection ausführt. Danach möchte ich async_read und async_write starten. Ich habe mir für ein paar Stunden den Kopf gebrochen und ich weiß wirklich nicht, wie ich das lösen soll.

class tcp_connection : public std::enable_shared_from_this<tcp_connection> 
{ 
public: 
    static std::shared_ptr<tcp_connection> create(boost::asio::io_service &io_service) { 
     return std::shared_ptr<tcp_connection>(new tcp_connection(io_service)); 
    } 

    boost::asio::ip::tcp::socket& get_socket() 
    { 
     return this->socket; 
    } 

    void app_start() 
    { 
     while(1) 
     { 
      // Pop is a blocking call. 
      auto inbound_message = this->inbound_messages.pop(); 
      std::cout << "Got message in app thread: " << inbound_message << ". Sending it back to client." << std::endl; 
      this->outbound_messages.push(inbound_message); 
     } 
    } 

    void start() { 
     this->app_thread = std::thread(&tcp_connection::app_start, shared_from_this()); 

     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 

     // Start async writing here. The message to send are in the outbound_message queue. But a Pop operation blocks 
     // empty() is also available to check whether the queue is empty. 
     // So how can I async write without blocking the read. 
     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_read(const boost::system::error_code& e, size_t bytes_read) 
    { 
     std::cout << "handle_read called" << std::endl; 
     if (e) 
     { 
      std::cout << "Error handle_read: " << e.message() << std::endl; 
      return; 
     } 
     if (bytes_read != 0) 
     { 
      std::istream istream(&this->input_stream); 
      std::string message; 
      message.resize(bytes_read); 
      istream.read(&message[0], bytes_read); 
      std::cout << "Got message: " << message << std::endl; 
      this->inbound_messages.push(message); 
     } 
     boost::asio::async_read_until(this->socket, this->input_stream, "\r\n", 
      strand.wrap(boost::bind(&tcp_connection::handle_read, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 

    void handle_write(const boost::system::error_code& e, size_t /*bytes_transferred*/) 
    { 
     if (e) 
     { 
      std::cout << "Error handle_write: " << e.message() << std::endl; 
      return; 
     } 

     // block... 
     auto message = this->outbound_messages.pop(); 
     boost::asio::async_write(this->socket, boost::asio::buffer(message), 
      strand.wrap(boost::bind(&tcp_connection::handle_write, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred))); 
    } 



private: 
    tcp_connection(boost::asio::io_service& io_service) : socket(io_service), strand(io_service) 
    { 
    } 

    boost::asio::ip::tcp::socket socket; 
    boost::asio::strand strand; 
    boost::asio::streambuf input_stream; 

    std::thread app_thread; 

    concurrent_queue<std::string> inbound_messages; 
    concurrent_queue<std::string> outbound_messages; 
}; 

class tcp_server 
{ 
public: 
    tcp_server(boost::asio::io_service& io_service) 
     : acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 9001)) 
    { 
     start_accept(); 
    } 

private: 
    void start_accept() 
    { 
     std::shared_ptr<tcp_connection> new_connection = 
      tcp_connection::create(acceptor.get_io_service()); 

     acceptor.async_accept(new_connection->get_socket(), 
      boost::bind(&tlcp_tcp_server::handle_accept, this, new_connection, boost::asio::placeholders::error)); 
    } 

    void handle_accept(std::shared_ptr<tcp_connection> new_connection, 
         const boost::system::error_code& error) 
    { 
     if (!error) 
     { 
      new_connection->start(); 
     } 

     start_accept(); 
    } 

    boost::asio::ip::tcp::acceptor acceptor; 
}; 

Antwort

2

Es scheint mir, als ob Sie eine async_pop Methode wollen, die eine Fehlermeldung Platzhalter und Callback-Handler nimmt. Wenn Sie eine Nachricht erhalten, überprüfen Sie, ob ein ausstehender Handler vorhanden ist, und wenn dies der Fall ist, löschen Sie die Nachricht, heben Sie die Registrierung des Handlers auf und rufen Sie ihn auf. Wenn bei der Registrierung von async_pop bereits eine Nachricht vorhanden ist, können Sie die Nachricht aufklappen und einen Aufruf an den Handler senden, ohne ihn zu registrieren.

Möglicherweise möchten Sie die Klasse async_pop von einer polymorphen Basis des Typs pop_operation oder ähnlichem ableiten.

+0

Danke! Es kam mir nicht in den Sinn, dass ich selbst einen solchen Handler erstellen könnte. So lange ich 'strang.wrap' benutze, sollte das gut funktionieren. Eine Frage, die ich habe, ist warum brauche ich die Fehlermeldung Platzhalter? –

+0

@JohnSmith mein Fehler. Wenn Sie einen Nachrichtenplatzhalter (std :: string &?) benötigen, sollten Sie den eventuellen Fehlercode von der letzten fehlgeschlagenen io-Leseoperation an den asynchronen Handler zurückgeben, wenn die Nachrichtenwarteschlange erschöpft ist und gelesen wird Error. –

+0

Ja, ich habe es jetzt implementiert. Im Code ist es keine Zeichenkette, sondern am einfachsten mit der Beispiel-Zeichenkette zu demonstrieren. Morgen poste ich den Code in einer anderen Antwort, aber ich behalte deine als akzeptiert. Nur für Leute, die sich dasselbe fragen. –