2017-04-03 8 views
0

Ja. Ich weiß, dass es um diese time_out in boost::asio einige Fragen gab. Mein Problem könnte zu einfach für die asio Jungs hier lösen.boost: asio :: lesen oder boosten: asio :: async_read mit timeout

Ich verwende boost::asio auf TCP-Protokoll, um Daten über ein Netzwerk kontinuierlich in einer Schleife so schnell wie möglich zu lesen.

Folgende Funktion ReadData() wird kontinuierlich von einem Arbeiter std::thread in einer While-Schleife aufgerufen.

std::size_t ReadData(std::vector<unsigned char> & buffer, unsigned int size_to_read) { 

boost::system::error_code error_code; 
buffer.resize(size_to_read); 

// Receive body 
std::size_t bytes_read = boost::asio::read(*m_socket, boost::asio::buffer(buffer), error_code); 

if (bytes_read == 0) { 
    // log error 
    return; 
} 

return bytes_read; 
} 

Es funktioniert gut. Gibt die Daten zurück. Alles ist gut.

All Ich möchte, ist zu eine für die boost::asio::readtime_out verwenden. Ich habe gelernt, dass ich boost::asio::async_read mit boost::asio::async_wait für die time_out-Technik verwenden muss, um zu arbeiten.

Eine boost example schlägt vor, boost::asio::async_read_until zu verwenden?

Sollte ich boost::asio::async_read oder boost::asio::async_read_until verwenden?

Es spielt keine Rolle, ob ich boost::asio::async_read oder boost::asio::async_read_until oder boost::asio::read verwende. Aber ich möchte den asio::read Anruf ausgelöst werden & innerhalb des Aufrufs an meine Methode ReadData getan, so dass der Client-Code nicht betroffen wird.

Wie kann ich das erreichen? Bitte legen nahe,

+0

Sie wissen, dass Sie verwenden können, 'Buchse :: abbrechen() 'um eine asynchrone Operation abzubrechen, richtig? –

+0

ja. Ich weiß, dass ich den Socket abbrechen sollte, wenn das time_out erreicht ist. Aber wie verwende ich ein time_out in der Async-Lese an erster Stelle? –

+0

@TheQuantumPhysiker. Funktioniert 'socket :: cancel()' bei einer synchronen Leseoperation? –

Antwort

1

OK, so etwas wie dies Ihr Zweck passen sollten:

Begründung:

Sie scheinen verwenden zu wollen Operationen zu blockieren. Da dies der Fall ist, besteht eine gute Chance, dass Sie keinen Thread ausführen, um die Io-Schleife zu pumpen.

So treten wir gleichzeitig zwei Asynchron-Aufgaben auf der io-Schleife der Buchse und dann Laichen einen Thread aus:

a) zurückgesetzt (tatsächlich neu starten) die Schleife für den Fall, es ist schon

b) Lauf erschöpft die Schleife bis zur Erschöpfung (könnten wir gescheiter hier sein und es nur laufen, bis der Handler, dass eine bestimmte Bedingung erfüllt ist angegeben hat, aber das ist eine Lektion für einen anderen Tag)

#include <type_traits> 

template<class Stream, class ConstBufferSequence, class Handler> 
auto async_read_with_timeout(Stream& stream, ConstBufferSequence&& sequence, std::size_t millis, Handler&& handler) 
{ 
    using handler_type = std::decay_t<Handler>; 
    using buffer_sequence_type = std::decay_t<ConstBufferSequence>; 
    using stream_type = Stream; 

    struct state_machine : std::enable_shared_from_this<state_machine> 
    { 
     state_machine(stream_type& stream, buffer_sequence_type sequence, handler_type handler) 
       : stream_(stream) 
       , sequence_(std::move(sequence)) 
       , handler_(std::move(handler)) 
     {} 
     void start(std::size_t millis) 
     { 
      timer_.expires_from_now(boost::posix_time::milliseconds(millis)); 
      timer_.async_wait(strand_.wrap([self = this->shared_from_this()](auto&& ec) { 
       self->handle_timeout(ec); 
      })); 
      boost::asio::async_read(stream_, sequence_, 
            strand_.wrap([self = this->shared_from_this()](auto&& ec, auto size){ 
       self->handle_read(ec, size); 
      })); 
     } 

     void handle_timeout(boost::system::error_code const& ec) 
     { 
      if (not ec and not completed_) 
      { 
       boost::system::error_code sink; 
       stream_.cancel(sink); 
      } 
     } 

     void handle_read(boost::system::error_code const& ec, std::size_t size) 
     { 
      assert(not completed_); 
      boost::system::error_code sink; 
      timer_.cancel(sink); 
      completed_ = true; 
      handler_(ec, size); 
     } 

     stream_type& stream_; 
     buffer_sequence_type sequence_; 
     handler_type handler_; 
     boost::asio::io_service::strand strand_ { stream_.get_io_service() }; 
     boost::asio::deadline_timer timer_ { stream_.get_io_service() }; 
     bool completed_ = false; 
    }; 

    auto psm = std::make_shared<state_machine>(stream, 
               std::forward<ConstBufferSequence>(sequence), 
               std::forward<Handler>(handler)); 
    psm->start(millis); 
} 

std::size_t ReadData(boost::asio::ip::tcp::socket& socket, 
        std::vector<unsigned char> & buffer, 
        unsigned int size_to_read, 
        boost::system::error_code& ec) { 

    buffer.resize(size_to_read); 

    ec.clear(); 
    std::size_t bytes_read = 0; 
    auto& executor = socket.get_io_service(); 
    async_read_with_timeout(socket, boost::asio::buffer(buffer), 
          2000, // 2 seconds for example 
          [&](auto&& err, auto size){ 
     ec = err; 
     bytes_read = size; 
    }); 

    // todo: use a more scalable executor than spawning threads 
    auto future = std::async(std::launch::async, [&] { 
     if (executor.stopped()) { 
      executor.reset(); 
     } 
     executor.run(); 
    }); 
    future.wait(); 

    return bytes_read; 
} 
+0

das sieht sehr ähnlich aus, was ich brauche. Ja du hast Recht. Meine Lese-Methode läuft in einer Schleife in einem Worker-Thread (das ist "std :: thread"). Ich kenne mich jedoch nicht mit Strängen aus. Ich muss das lesen. –

+0

Was ist 'stream_type' hier? error sagt Member Referenz Basistyp 'stream_type' (aka 'int (int, int, int)') ist keine Struktur oder Union boost :: asio :: io_service :: strang strand_ {stream_.get_io_service()}; –

+0

können Sie nur einmal überprüfen? vielleicht gibt es einen kleinen fehler in der antwort? kann ich 'cancel' in' stream' wie diese 'stream_.cancel (sink)' aufrufen? Abbrechen sollte am Socket aufgerufen werden. Recht ? –

Verwandte Themen