2017-10-06 5 views
1

Ich verwende Boost, um einen TCP-Client und -Server zu implementieren. Auf der Client-Seite muss ich mehrere Dateien nacheinander senden. Ich verwende einen separaten Mechanismus, um den Server über eine Dateiübertragung zu benachrichtigen. Wenn der Server bereit ist, die Datei zu empfangen, antwortet er dem Client und die Übertragung wird eingeleitet.So stellen Sie sicher, dass TCP-Dateiübertragung erfolgt ist (C++)

Ich definierte asynchrone Handler zum Schreiben der Daten und dann lassen Sie das OS kümmern, indem Sie io_service.run() aufrufen. Meines Wissens blockiert io_service.run() solange, bis keine weiteren Handler mehr zu versenden sind, was aber nicht bedeutet, dass die Daten tatsächlich auf der Gegenseite korrekt empfangen wurden? Das Problem ist, dass nach io_service.run() kehrt ich die nächste Übertragung initiieren, aber der Server ist nicht fertig mit dem Empfang der ersten.

Muss ich einen externen Mechanismus auf der Remote-Seite implementieren, um den Client zu benachrichtigen, dass die Daten empfangen wurden, oder mache ich etwas falsch?

Client-Implementierung:

#include "StdAfx.h" 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <boost/enable_shared_from_this.hpp> 
#include <boost/thread.hpp> 
#include "AsyncTCPClient.h" 


AsyncTCPClient::AsyncTCPClient(boost::asio::io_service& iIoService, const std::string& iServerIP, const std::string& iPath) 
    : mResolver(iIoService), mSocket(iIoService) 
{ 
    size_t wPos = iServerIP.find(':'); 
    if(wPos==std::string::npos) 
    { 
     return; 
    } 
    std::string wPortStr = iServerIP.substr(wPos + 1); 
    std::string wServerIP = iServerIP.substr(0, wPos); 

    mSourceFile.open(iPath, std::ios_base::binary | std::ios_base::ate); 
    if(!mSourceFile) 
    { 
     LOG(LOGERROR) << "Failed to open file: " << iPath; 
     return; 
    } 
    size_t wFileSize = mSourceFile.tellg(); 
    mSourceFile.seekg(0); 
    std::ostream wRequestStream(&mRequest); 
    wRequestStream << iPath << "\n" << wFileSize << "\n\n"; 

    LOG(LOGINFO) << "File to transfer: " << iPath; 
    LOG(LOGINFO) << "Filesize: " << wFileSize << " bytes"; 

    tcp::resolver::query wQuery(wServerIP, wPortStr); 
    mResolver.async_resolve(wQuery, boost::bind(&AsyncTCPClient::HandleResolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator)); 

} 

AsyncTCPClient::~AsyncTCPClient() 
{ 
} 

void AsyncTCPClient::HandleResolve(const boost::system::error_code & iErr, tcp::resolver::iterator iEndpointIterator) 
{ 
    if(!iErr) 
    { 
     tcp::endpoint wEndpoint = *iEndpointIterator; 
     mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator)); 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error: " << iErr.message(); 
    } 
} 

void AsyncTCPClient::HandleConnect(const boost::system::error_code &iErr, tcp::resolver::iterator iEndpointIterator) 
{ 
    if(!iErr) 
    { 
     boost::asio::async_write(mSocket, mRequest, boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error)); 
    } 
    else if(iEndpointIterator != tcp::resolver::iterator()) 
    { 
     mSocket.close(); 
     tcp::endpoint wEndpoint = *iEndpointIterator; 
     mSocket.async_connect(wEndpoint, boost::bind(&AsyncTCPClient::HandleConnect, this, boost::asio::placeholders::error, ++iEndpointIterator)); 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error: " << iErr.message(); 
    } 
} 

void AsyncTCPClient::HandleWriteFile(const boost::system::error_code& iErr) 
{ 
    if(!iErr) 
    { 
     if(mSourceFile) 
     { 
      mSourceFile.read(mBuffer.c_array(), (std::streamsize)mBuffer.size()); 

      // EOF reached 
      if(mSourceFile.gcount() <= 0) 
      { 
       LOG(LOGINFO) << "File transfer done"; 
       return; 
      } 

      //LOG(LOGTRACE) << "Send " << mSourceFile.gcount() << "bytes, total: " << mSourceFile.tellg() << " bytes.\n"; 
      boost::asio::async_write(mSocket, boost::asio::buffer(mBuffer.c_array(), mSourceFile.gcount()), boost::bind(&AsyncTCPClient::HandleWriteFile, this, boost::asio::placeholders::error)); 
     } 
     else 
     { 
      LOG(LOGINFO) << "File transfer done"; 
      return; 
     } 
    } 
    else 
    { 
     LOG(LOGERROR) << "Error value: " << iErr.value(); 
     LOG(LOGERROR) << "Error message: " << iErr.message(); 
     throw std::exception(); 
    } 
} 

Server-Implementierung:

#include "StdAfx.h" 
#include <boost/array.hpp> 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 
#include <fstream> 
#include <boost/enable_shared_from_this.hpp> 
#include "AsyncTCPClient.h" 
#include "AsyncTCPServer.h" 
#include "Debug.h" 


AsyncTCPServer::AsyncTCPServer(unsigned short iPort, const std::string iFilePath) 
    :mAcceptor(mIoService, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort), true) 
{ 
    mAsyncTCPConnectionPtr wNewConnection(new AsyncTCPConnection(mIoService, iFilePath)); 
    mAcceptor.async_accept(wNewConnection->Socket(), boost::bind(&AsyncTCPServer::HandleAccept, this, wNewConnection, boost::asio::placeholders::error)); 
    mIoService.run(); 
} 

AsyncTCPServer::~AsyncTCPServer() 
{ 
    mIoService.stop(); 
} 

void AsyncTCPServer::HandleAccept(mAsyncTCPConnectionPtr iCurConnection, const boost::system::error_code& iErr) 
{ 
    if (!iErr) 
    { 
     iCurConnection->Start(); 
    } 
    else 
    { 
     BIOLOG(BioSans::LOGERROR) << " " << iErr << ", " << iErr.message(); 
    } 
} 

Anschluss Implementierung:

#include "StdAfx.h" 
#include <boost/bind.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 
#include <fstream> 
#include "Debug.h" 
#include "AsyncTCPConnection.h" 

AsyncTCPConnection::AsyncTCPConnection(boost::asio::io_service& iIoService, const std::string iFilePath) 
    : mSocket(iIoService), mFileSize(0), mFilePath(iFilePath) 
{ 
} 

AsyncTCPConnection::~AsyncTCPConnection() 
{ 
} 

void AsyncTCPConnection::Start() 
{ 
    LOG(LOGINFO) << "Start"; 
    async_read_until(mSocket, mRequestBuffer, "\n\n", boost::bind(&AsyncTCPConnection::HandleReadRequest, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleReadRequest(const boost::system::error_code& iErr, std::size_t iBytesTransferred) 
{ 
    if(iErr) 
    { 
     return HandleError(__FUNCTION__, iErr); 
    } 
    LOG(LOGTRACE) << "(" << iBytesTransferred << ")" << ", in_avail = " << mRequestBuffer.in_avail() << ", size = " << mRequestBuffer.size() << ", max_size = " << mRequestBuffer.max_size(); 

    std::istream wRequestStream(&mRequestBuffer); 
    std::string wFilePath; 
    wRequestStream >> wFilePath; 
    wRequestStream >> mFileSize; 
    wRequestStream.read(mBuffer.c_array(), 2); 

    mOutputFile.open(mFilePath, std::ios_base::binary); 

    if(!mOutputFile) 
    { 
     LOG(LOGERROR) << "Failed to open: " << wFilePath; 
     return; 
    } 
    do 
    { 
     wRequestStream.read(mBuffer.c_array(), (std::streamsize)mBuffer.size()); 
     LOG(LOGTRACE) << "Write " << wRequestStream.gcount() << " bytes"; 
     mOutputFile.write(mBuffer.c_array(), wRequestStream.gcount()); 
    } 
    while(wRequestStream.gcount() > 0); 
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()),boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleReadFileContent(const boost::system::error_code& iErr, std::size_t iBytesTransferred) 
{ 
    if(iBytesTransferred>0) 
    { 
     mOutputFile.write(mBuffer.c_array(), (std::streamsize)iBytesTransferred); 
     LOG(LOGTRACE) << "Received " << mOutputFile.tellp() << " bytes"; 
     if (mOutputFile.tellp()>=(std::streamsize)mFileSize) 
     { 
      return; 
     } 
    } 
    if(iErr) 
    { 
     return HandleError(__FUNCTION__, iErr); 
    } 
    async_read(mSocket, boost::asio::buffer(mBuffer.c_array(), mBuffer.size()), boost::bind(&AsyncTCPConnection::HandleReadFileContent, shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); 
} 

void AsyncTCPConnection::HandleError(const std::string& function_name, const boost::system::error_code& err) 
{ 
    LOG(LOGERROR) << " in " << function_name <<" due to " << err <<" " << err.message(); 
} 

-Code senden Datei:

void SendFile(std::string iFilePath, std::string iServerIP) 
{ 
    static int wRetries = 0; 
    try 
    { 
     boost::asio::io_service wIoService; 
     LOG(LOGINFO) << "Sending data to: " << iServerIP; 
     LOG(LOGINFO) << "Filename is: " << iFilePath; 

     AsyncTCPClient client(wIoService, iServerIP, iFilePath); 
     wIoService.run(); 
     // here I want to make sure that the data got to the remote host 
     // it looks like wIoService.run() returns once bytes are written to the socket 

    } 
    catch(std::exception) 
    { 
     // retry 3 times in case something goes wrong 
     if(wRetries < 3) 
     { 
      wRetries++; 
      LOG(LOGWARNING) << "Problem sending file : " << iFilePath << " to address: " << iServerIP; 
      LOG(LOGWARNING) << "Retry #" << wRetries; 
      SendFile(iFilePath, iServerIP); 
     } 
     else 
     { 
      LOG(LOGERROR) << "Unable to send file: " << iFilePath << " to address: " << iServerIP; 
      wRetries = 0; 
      return; 
     } 
    } 
    wRetries = 0; 
} 
+0

Wenn Sie wissen müssen, ob/wann die Gegenseite die Daten empfangen und/oder verarbeitet hat, muss Ihnen die Fernbedienung dies mitteilen. –

Antwort

0

Sie könnten 'boost :: asio :: io_service :: work' verwenden, um Ihren IO-Service-Thread am Leben zu erhalten, bis Sie Ihren Prozess beenden möchten. Andernfalls wird io_service :: run zurückgegeben, wenn alle veröffentlichten Aufgaben abgeschlossen sind.

http://www.boost.org/doc/libs/1_65_1/doc/html/boost_asio/reference/io_service__work.html

Ich würde denken, dass Sie beenden wollen würden und Fäden immer und immer für jeden Transfer neu erstellen.

Sie können eine Zustandsvariable verwenden, um zu signalisieren, wenn Sie den io_service-Thread herunterfahren und dann das Arbeitsobjekt zerstören oder einfach nur das Arbeitsobjekt zerstören möchten.

Zu wissen, wann der Server alles erhalten hat, was Sie gesendet haben. Sie könnten etwas in Ihrem Protokoll entwerfen oder sich einfach auf die garantierten Aspekte von TCP verlassen. Ich empfehle das Lesen von TCP und IO-Completion im Allgemeinen.

+0

Danke, basierend auf meinem Verständnis muss ich sicherstellen, dass die Daten ordnungsgemäß auf Anwendungsebene empfangen wurden. Ich werde einen Bestätigungsmechanismus implementieren. – ajora

Verwandte Themen