2017-12-20 7 views
2

Gemäß der Dokumentation gewährleisten:boost :: asio :: async_write - nur ein hervorragender Ruf

„Das Programm muss dafür sorgen, dass der Strom keine andere Schreiboperationen durchführt (wie async_write, die async_write_some Funktion Stream oder eine beliebigen andere zusammengesetzte Operationen, die Schreibvorgänge ausführen), bis diese Operation abgeschlossen ist. "

Heißt das, ich kann boost :: asio :: async_write nicht ein zweites Mal aufrufen, bis der Handler für den ersten aufgerufen wird? Wie erreicht man das und ist trotzdem asynchron?

Wenn ich eine Methode Senden:

//-------------------------------------------------------------------- 
void Connection::Send(const std::vector<char> & data) 
{ 
    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); 
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback); 
} 

Muss ich es zu etwas ändern müssen wie:

//-------------------------------------------------------------------- 
void Connection::Send(const std::vector<char> & data) 
{ 
    // Issue a send 
    std::lock_guard<std::mutex> lock(m_numPostedSocketIOMutex); 
    ++m_numPostedSocketIO; 

    m_numPostedSocketIOConditionVariable.wait(lock, [this]() {return m_numPostedSocketIO == 0; }); 

    auto callback = boost::bind(&Connection::OnSend, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred); 
    boost::asio::async_write(m_socket, boost::asio::buffer(data), callback); 
} 

und wenn ja, dann nicht, dass ich nach dem ersten Anrufblockierung wieder ?

+0

_ "Der übliche Ansatz zur Befestigung dieser eine Warteschlange von abgehenden Puffer haben, ist, anstelle eines einzelnen, und sie hintereinander zu senden, z.B. [Boost Asio Async \ _write: wie nicht async \ _write Anrufe verschachteln] (https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls/7756894 # 7756894)? "(Zitiert aus [dieser zuvor gelöschten Antwort] (https://stackoverflow.com/a/46983984/85371)) – sehe

Antwort

0

Hier ist ein komplettes, kompilierbares und getestetes Beispiel, das ich recherchiert habe und durch Versuch und Irrtum arbeiten musste, nachdem ich die Antwort und nachfolgende Änderungen von RustyX gelesen hatte.

Verbindung.h

#pragma once 

#include <boost/asio.hpp> 

#include <atomic> 
#include <condition_variable> 
#include <memory> 
#include <mutex> 

//-------------------------------------------------------------------- 
class ConnectionManager; 

//-------------------------------------------------------------------- 
class Connection : public std::enable_shared_from_this<Connection> 
{ 
public: 

    typedef std::shared_ptr<Connection> SharedPtr; 

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this 
    static Connection::SharedPtr Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket); 

    // 
    static std::string ErrorCodeToString(const boost::system::error_code & errorCode); 

    Connection(const Connection &) = delete; 
    Connection(Connection &&) = delete; 
    Connection & operator = (const Connection &) = delete; 
    Connection & operator = (Connection &&) = delete; 
    ~Connection(); 

    // We have to defer the start until we are fully constructed because we share_from_this() 
    void Start(); 
    void Stop(); 

    void Send(const std::vector<char> & data); 

private: 

    static size_t           m_nextClientId; 

    size_t             m_clientId; 
    ConnectionManager *          m_owner; 
    boost::asio::ip::tcp::socket       m_socket; 
    std::atomic<bool>          m_stopped; 
    boost::asio::streambuf         m_receiveBuffer; 
    mutable std::mutex          m_sendMutex; 
    std::vector<char>          m_sendBuffers[2];   // Double buffer 
    int              m_activeSendBufferIndex; 
    bool             m_sending; 

    std::vector<char>          m_allReadData;   // Strictly for test purposes 

    Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket); 

    void DoReceive(); 
    void DoSend(); 
}; 

//-------------------------------------------------------------------- 

Connection.cpp

#include "Connection.h" 
#include "ConnectionManager.h" 

#include <boost/bind.hpp> 

#include <algorithm> 
#include <cstdlib> 

//-------------------------------------------------------------------- 
size_t Connection::m_nextClientId(0); 

//-------------------------------------------------------------------- 
Connection::SharedPtr Connection::Create(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket & socket) 
{ 
    return Connection::SharedPtr(new Connection(connectionManager, std::move(socket))); 
} 

//-------------------------------------------------------------------------------------------------- 
std::string Connection::ErrorCodeToString(const boost::system::error_code & errorCode) 
{ 
    std::ostringstream debugMsg; 
    debugMsg << " Error Category: " << errorCode.category().name() << ". " 
      << " Error Message: " << errorCode.message() << ". "; 

    // IMPORTANT - These comparisons only work if you dynamically link boost libraries 
    //    Because boost chose to implement boost::system::error_category::operator == by comparing addresses 
    //    The addresses are different in one library and the other when statically linking. 
    // 
    // We use make_error_code macro to make the correct category as well as error code value. 
    // Error code value is not unique and can be duplicated in more than one category. 
    if (errorCode == boost::asio::error::make_error_code(boost::asio::error::connection_refused)) 
    { 
     debugMsg << " (Connection Refused)"; 
    } 
    else if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof)) 
    { 
     debugMsg << " (Remote host has disconnected)"; 
    } 
    else 
    { 
     debugMsg << " (boost::system::error_code has not been mapped to a meaningful message)"; 
    } 

    return debugMsg.str(); 
} 

//-------------------------------------------------------------------- 
Connection::Connection(ConnectionManager * connectionManager, boost::asio::ip::tcp::socket socket) 
    : 
    m_clientId       (m_nextClientId++) 
    , m_owner        (connectionManager) 
    , m_socket       (std::move(socket)) 
    , m_stopped       (false) 
    , m_receiveBuffer     () 
    , m_sendMutex      () 
    , m_sendBuffers      () 
    , m_activeSendBufferIndex    (0) 
    , m_sending       (false) 
    , m_allReadData      () 
{ 
    printf("Client connection with id %zd has been created.", m_clientId); 
} 

//-------------------------------------------------------------------- 
Connection::~Connection() 
{ 
    // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business 
    printf("Client connection with id %zd has been destroyed.", m_clientId); 
} 

//-------------------------------------------------------------------- 
void Connection::Start() 
{ 
    DoReceive(); 
} 

//-------------------------------------------------------------------- 
void Connection::Stop() 
{ 
    // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count 
    // as a consequence of the outstanding async receive call that gets posted every time we receive. 
    // Once we stop posting another receive in the receive handler and once our owner release any references to 
    // us, we will get destroyed. 
    m_stopped = true; 
    m_owner->OnConnectionClosed(shared_from_this()); 
} 

//-------------------------------------------------------------------- 
void Connection::Send(const std::vector<char> & data) 
{ 
    std::lock_guard<std::mutex> lock(m_sendMutex); 

    // Append to the inactive buffer 
    std::vector<char> & inactiveBuffer = m_sendBuffers[m_activeSendBufferIndex^1]; 
    inactiveBuffer.insert(inactiveBuffer.end(), data.begin(), data.end()); 

    // 
    DoSend(); 
} 

//-------------------------------------------------------------------- 
void Connection::DoSend() 
{ 
    // Check if there is an async send in progress 
    // An empty active buffer indicates there is no outstanding send 
    if (m_sendBuffers[m_activeSendBufferIndex].empty()) 
    { 
     m_activeSendBufferIndex ^= 1; 

     std::vector<char> & activeBuffer = m_sendBuffers[m_activeSendBufferIndex]; 
     auto self(shared_from_this()); 

     boost::asio::async_write(m_socket, boost::asio::buffer(activeBuffer), 
      [self](const boost::system::error_code & errorCode, size_t bytesTransferred) 
      { 
       std::lock_guard<std::mutex> lock(self->m_sendMutex); 

       self->m_sendBuffers[self->m_activeSendBufferIndex].clear(); 

       if (errorCode) 
       { 
        printf("An error occured while attemping to send data to client id %zd. %s", self->m_clientId, ErrorCodeToString(errorCode).c_str()); 

        // An error occurred 
        // We do not stop or close on sends, but instead let the receive error out and then close 
        return; 
       } 

       // Check if there is more to send that has been queued up on the inactive buffer, 
       // while we were sending what was on the active buffer 
       if (!self->m_sendBuffers[self->m_activeSendBufferIndex^1].empty()) 
       { 
        self->DoSend(); 
       } 
      }); 
    } 
} 

//-------------------------------------------------------------------- 
void Connection::DoReceive() 
{ 
    auto self(shared_from_this()); 

    boost::asio::async_read_until(m_socket, m_receiveBuffer, '#', 
     [self](const boost::system::error_code & errorCode, size_t bytesRead) 
     { 
      if (errorCode) 
      { 
       // Check if the other side hung up 
       if (errorCode == boost::asio::error::make_error_code(boost::asio::error::eof)) 
       { 
        // This is not really an error. The client is free to hang up whenever they like 
        printf("Client %zd has disconnected.", self->m_clientId); 
       } 
       else 
       { 
        printf("An error occured while attemping to receive data from client id %zd. Error Code: %s", self->m_clientId, ErrorCodeToString(errorCode).c_str()); 
       } 

       // Notify our masters that we are ready to be destroyed 
       self->m_owner->OnConnectionClosed(self); 

       // An error occured 
       return; 
      } 

      // Grab the read data 
      std::istream stream(&self->m_receiveBuffer); 
      std::string data; 
      std::getline(stream, data, '#'); 
      data += "#"; 

      printf("Received data from client %zd: %s", self->m_clientId, data.c_str()); 

      // Issue the next receive 
      if (!self->m_stopped) 
      { 
       self->DoReceive(); 
      } 
     }); 
} 

//-------------------------------------------------------------------- 

ConnectionManager.h

#pragma once 

#include "Connection.h" 

// Boost Includes 
#include <boost/asio.hpp> 

// Standard Includes 
#include <thread> 
#include <vector> 

//-------------------------------------------------------------------- 
class ConnectionManager 
{ 
public: 

    ConnectionManager(unsigned port, size_t numThreads); 
    ConnectionManager(const ConnectionManager &) = delete; 
    ConnectionManager(ConnectionManager &&) = delete; 
    ConnectionManager & operator = (const ConnectionManager &) = delete; 
    ConnectionManager & operator = (ConnectionManager &&) = delete; 
    ~ConnectionManager(); 

    void Start(); 
    void Stop(); 

    void OnConnectionClosed(Connection::SharedPtr connection); 

protected: 

    boost::asio::io_service   m_io_service; 
    boost::asio::ip::tcp::acceptor  m_acceptor; 
    boost::asio::ip::tcp::socket  m_listenSocket; 
    std::vector<std::thread>   m_threads; 

    mutable std::mutex     m_connectionsMutex; 
    std::vector<Connection::SharedPtr> m_connections; 

    boost::asio::deadline_timer  m_timer; 

    void IoServiceThreadProc(); 

    void DoAccept(); 
    void DoTimer(); 
}; 

//-------------------------------------------------------------------- 

ConnectionManager.cpp

#include "ConnectionManager.h" 

#include <boost/bind.hpp> 
#include <boost/date_time/posix_time/posix_time.hpp> 

#include <system_error> 
#include <cstdio> 

//------------------------------------------------------------------------------ 
ConnectionManager::ConnectionManager(unsigned port, size_t numThreads) 
    : 
    m_io_service () 
    , m_acceptor (m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) 
    , m_listenSocket(m_io_service) 
    , m_threads  (numThreads) 
    , m_timer  (m_io_service) 
{ 
} 

//------------------------------------------------------------------------------ 
ConnectionManager::~ConnectionManager() 
{ 
    Stop(); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::Start() 
{ 
    if (m_io_service.stopped()) 
    { 
     m_io_service.reset(); 
    } 

    DoAccept(); 

    for (auto & thread : m_threads) 
    { 
     if (!thread.joinable()) 
     { 
      thread.swap(std::thread(&ConnectionManager::IoServiceThreadProc, this)); 
     } 
    } 

    DoTimer(); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::Stop() 
{ 
    { 
     std::lock_guard<std::mutex> lock(m_connectionsMutex); 
     m_connections.clear(); 
    } 

    // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get destroyed? 
    //  Because remember they have outstanding ref count to thier shared_ptr in the async handlers 
    m_io_service.stop(); 

    for (auto & thread : m_threads) 
    { 
     if (thread.joinable()) 
     { 
      thread.join(); 
     } 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::IoServiceThreadProc() 
{ 
    try 
    { 
     // Log that we are starting the io_service thread 
     { 
      printf("io_service socket thread starting."); 
     } 

     // Run the asynchronous callbacks from the socket on this thread 
     // Until the io_service is stopped from another thread 
     m_io_service.run(); 
    } 
    catch (std::system_error & e) 
    { 
     printf("System error caught in io_service socket thread. Error Code: %d", e.code().value()); 
    } 
    catch (std::exception & e) 
    { 
     printf("Standard exception caught in io_service socket thread. Exception: %s", e.what()); 
    } 
    catch (...) 
    { 
     printf("Unhandled exception caught in io_service socket thread."); 
    } 

    { 
     printf("io_service socket thread exiting."); 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::DoAccept() 
{ 
    m_acceptor.async_accept(m_listenSocket, 
     [this](const boost::system::error_code errorCode) 
     { 
      if (errorCode) 
      { 
       printf("An error occured while attemping to accept connections. Error Code: %s", Connection::ErrorCodeToString(errorCode).c_str()); 
       return; 
      } 

      // Create the connection from the connected socket 
      std::lock_guard<std::mutex> lock(m_connectionsMutex); 
      Connection::SharedPtr connection = Connection::Create(this, m_listenSocket); 
      m_connections.push_back(connection); 
      connection->Start(); 

      DoAccept(); 
     }); 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection) 
{ 
    std::lock_guard<std::mutex> lock(m_connectionsMutex); 

    auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection); 
    if (itConnection != m_connections.end()) 
    { 
     m_connections.erase(itConnection); 
    } 
} 

//------------------------------------------------------------------------------ 
void ConnectionManager::DoTimer() 
{ 
    if (!m_io_service.stopped()) 
    { 
     // Send messages every second 
     m_timer.expires_from_now(boost::posix_time::seconds(30)); 
     m_timer.async_wait(
      [this](const boost::system::error_code & errorCode) 
      { 
       std::lock_guard<std::mutex> lock(m_connectionsMutex); 
       for (auto connection : m_connections) 
       { 
        connection->Send(std::vector<char>{'b', 'e', 'e', 'p', '#'}); 
       } 

       DoTimer(); 
      }); 
    } 
} 

main.cpp

#include "ConnectionManager.h" 

#include <cstring> 
#include <iostream> 
#include <string> 

int main() 
{ 
    // Start up the server 
    ConnectionManager connectionManager(5000, 2); 
    connectionManager.Start(); 

    // Pretend we are doing other things or just waiting for shutdown 
    std::this_thread::sleep_for(std::chrono::minutes(5)); 

    // Stop the server 
    connectionManager.Stop(); 

    return 0; 
} 
2

Ja, Sie müssen auf den Beendigungshandler warten, bevor Sie erneut async_write aufrufen. Bist du sicher, dass du blockiert wirst? Natürlich hängt es davon ab, wie schnell Sie Ihre Daten generieren, aber selbst wenn, dann gibt es keine Möglichkeit, es schneller zu senden, als Ihr Netzwerk damit umgehen kann. Wenn es wirklich ein Problem ist, sollten Sie größere Stücke schicken.

0

Die async in async_write() bezieht sich auf die Tatsache, dass die Funktion sofort zurückgibt, während das Schreiben im Hintergrund geschieht. Es sollte immer noch nur eine ausstehende Schreiboperation geben.

Sie müssen einen buffer verwenden, wenn Sie einen asynchronen Producer haben, der den neuen Datenblock beiseite legt, bis der gerade aktive Schreibvorgang abgeschlossen ist, und dann einen neuen async_write im Completion-Handler ausgeben.

Das heißt, Connection::Send darf nur async_write einmal aufrufen, den Prozess zu beginnen, in späteren Aufrufen es sollte stattdessen seine Datenpuffer, die in dem Abschluss-Handler des aktuell ausgeführten async_write werden bei Ihnen abgeholt.

Aus Performance-Gründen sollten Sie das Kopieren der Daten in den Puffer zu vermeiden und stattdessen die neue Brocken auf eine Liste von Puffern anhängen und verwenden Sie die scatter-gather overload von async_write, die eine ConstBufferSequence akzeptiert. Es ist auch möglich, einen großen streambuf als Puffer zu verwenden und direkt in ihn einzufügen.

Natürlich muss der Puffer synchronisiert werden, es sei denn, Connection::Send und io_service laufen im selben Thread. Ein leerer Puffer kann als Hinweis darauf verwendet werden, dass keine async_write läuft.

Hier einige Code zu veranschaulichen, was ich meine:

struct Connection 
{ 
    void Connection::Send(std::vector<char>&& data) 
    { 
     std::lock_guard<std::mutex> lock(buffer_mtx); 
     buffers[active_buffer^1].push_back(std::move(data)); // move input data to the inactive buffer 
     doWrite(); 
    } 

private: 

    void Connection::doWrite() 
    { 
     if (buffer_seq.empty()) { // empty buffer sequence == no writing in progress 
      active_buffer ^= 1; // switch buffers 
      for (const auto& data : buffers[active_buffer]) { 
       buffer_seq.push_back(boost::asio::buffer(data)); 
      } 
      boost::asio::async_write(m_socket, buffer_seq, [this] (const boost::system::error_code& ec, size_t bytes_transferred) { 
       std::lock_guard<std::mutex> lock(buffer_mtx); 
       buffers[active_buffer].clear(); 
       buffer_seq.clear(); 
       if (!ec) { 
        if (!buffers[active_buffer^1].empty()) { // have more work 
         doWrite(); 
        } 
       } 
      }); 
     } 
    } 

    std::mutex buffer_mtx; 
    std::vector<std::vector<char>> buffers[2]; // a double buffer 
    std::vector<boost::asio::const_buffer> buffer_seq; 
    int active_buffer = 0; 
    . . . 
}; 

Die komplette Arbeitsquelle kann in this answer finden.

+0

Das ist eine Menge reden über ausgefallene Puffer und Sperrmechanismen um sie herum :) Was? sieht es so aus? Ich habe keine Ahnung, wie ich asio :: streambuf benutzen soll, oder ich darf es modifizieren, nachdem ich einen Anruf getätigt habe, oder wenn ich einen anderen eigenen Puffer mit Locks brauche und dann Dinge ergreife von ihm in den streambuf, wenn ich rufe senden Sie noch einmal von meinem Handler. Ich habe auch keine Ahnung, was mit einer Eingabefolge und einer Ausgabesequenz gemeint ist.Es enthält eine zusammenhängende Reihe von Zeichen, nein? Was ist das? in vs die out? Sicher wäre sauber, wenn t hey hatte etwas davon in ihren Beispielen! –

+0

In dem Code, den Sie nach dem Editieren gepostet haben, ordnen wir, wenn ich richtig folge, jedes Mal neue Puffer zu, wenn send aufgerufen wird, nachdem ein Async-Send gepostet wurde. Wir sammeln jedoch auf dem letzten Puffer, der zugewiesen wurde, bis der Sendevorgang abgeschlossen ist. Es ist am wahrscheinlichsten, auch internen Speicherplatz zuzuweisen, wenn der emplace_back aufgerufen wird (weil das Argumente an den Konstruktor von const_buffer weiterleitet). Ich denke, ich sehe, wie das funktioniert, aber ist das nicht teuer, weise? –

+0

Fast alles in meinem aktuellen Demo-Projekt zum Testen bereit. Ich erhalte jedoch einen Fehler in der Zeile 'buffers-> emplace_back (data);' das sagt, "'boost :: asio :: const_buffer :: const_buffer (boost :: asio :: const_buffer &&)': kann Argument 1 nicht konvertieren von 'const std :: vector >' zu ' const boost :: asio :: mutable_buffer & '" –