Monday, February 28, 2022

boost::asio tcp socket read with timeout

 You want to do a simple socket read in C++, using boost::asio. You're already in a thread dedicated to handling this connection, so it can be synchronous. Great.

size_t receivedLength = boost::asio::read(*socket, boost::asio::buffer(receiveBuffer), sizeof(receiveBuffer));

But then you realize you'll need a timeout in case something goes wrong on the other end, or it's taking too long or outside forces want you to abort. Should be simple, just add a timeout parameter, right? Wrong. Doesn't exist.

OK, well, you probably know that posix socket interface has optional timeouts. Just set the socket option SO_RCVTIMEO  and you should be good, right? WRONG! At least in Linux (and possibly elsewhere), boost::asio tcp read operations will not return at the timeout. They keep trying to read (see the aside here). It's a moot point, since it won't work anyway, but it's also finicky since the call to setsockopt is platform dependent. For posterity, this does the job:

// Set socket send & receive timeouts, after reading rcv default timeout
#define  RW_TIMEOUT_SECS 5

char timeoutBuf[16];
uint32_t sockoptlen = 16;
::getsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, &timeoutBuf, &sockoptlen);
if (sockoptlen == sizeof(struct timeval)) {
    std::cout << "Default receive timeout (s): " << ((struct timeval *) timeoutBuf)->tv_sec << " (us): " << ((struct timeval *) timeoutBuf)->tv_usec << std::endl;
} else { // sockoptlen == 4
    std::cout << "Default receive timeout (ms): " << *((uint32_t *)timeoutBuf) << std::endl;
}

#ifdef WIN32
const uint32_t timeout = RW_TIMEOUT_SECS * 1000;
#else
const struct timeval timeout = {RW_TIMEOUT_SECS, 0};
#endif
::setsockopt(socket->native(), SOL_SOCKET, SO_RCVTIMEO, (const char *)&timeout, sizeof(timeout));
::setsockopt(socket->native(), SOL_SOCKET, SO_SNDTIMEO, (const char *)&timeout, sizeof(timeout));

(There are asio methods to do this, but they are platform dependent and in linux/mac you have to implement a big class to use them.)


One option for having a timeout option would be to make the socket non-blocking, so async reads return immediately. Then, you poll to enforce the timeout yourself. So, you could do:

#include <chrono>
#define RD_TIMEOUT_SECS 5

socket->non_blocking(true);
size_t receivedLength = 0;
std::array<char, 1024> receiveBuffer;
auto error = boost::asio::error::would_block; clock_t start = clock(); while ( error == boost::asio::error::would_block
        && clock() < start + CLOCKS_PER_SEC * RD_TIMEOUT_SECS ) {
    receivedLength = socket->receive_from(boost::asio::buffer(receiveBuffer), 
                                         sender_endpoint, 0, error);
}
if (receivedLength > 0) { std::cout.write(recv_buf.data(), receivedLen); }

This is pretty clear and compact, but it kind of defeats the whole idea of asio, and wastes CPU time. Plus now you have to do more work to retrieve the number of bytes you expect/need. And finally, you probably also need to make sure that the connect function has a timeout, and this won't help you here. There are three other, better options, all of which work, each with slightly different drawbacks. 


The first option is the traditional, preferred asio method. It uses a deadline/steady timer and works with at least boost 1.44+. An example is here. In essence, you need to use an asynchronous read, create a timer with a timeout callback that would cancel the read if called, run the io_service (io_context in later versions) in a loop, and then check if there was an error (in the case of a timeout).

const int RD_TIMEOUT_SECS = 5
char receiveBuffer[1024];
size_t receivedLength = 0; boost::system::error_code error = boost::asio::error::would_block; boost::asio::async_read(*socket, boost::asio::buffer(receiveBuffer, sizeof(receiveBuffer), [&](const boost::system::error_code& result_error, std::size_t result_n) { error = result_error; receivedLength = result_n; }); try { // Set a deadline for the read operation. deadline_->expires_from_now(boost::posix_time::seconds(RD_TIMEOUT_SECS)); // Start the deadline actor to abort if still not connected deadline_->async_wait(boost::bind(&MyClass::checkConnectTimeout, this, _1, socket)); // Block until the asynchronous operation has completed. do { ioService_->run_one(); } while (error == boost::asio::error::would_block); } catch(std::runtime_error &e) { error = boost::asio::error::fault; } if (error) { std::cout << "Timeout reading socket." << std::endl; }

You'll also need to define your callback function to cancel the app. Both callback and boost::asio::deadline_timer deadline_ are class members.

void MyClass::checkConnectTimeout(const boost::system::error_code& ec, boost::asio::ip::tcp::socket *socket) {
    if (ec == boost::asio::error::operation_aborted) {
        // Timer was aborted.
        return;
    }

    // Check whether the deadline has passed. We compare the deadline against
    // the current time since a new asynchronous operation may have moved the
    // deadline before this actor had a chance to run.
    if (deadline_->expires_at() <= boost::asio::deadline_timer::traits_type::now()) {
        // The deadline has passed. The socket is closed so that any outstanding
        // asynchronous operations are cancelled and return an error code
        socket->cancel();
    }
}

A more compact application of this same approach can be seen here (to my knowledge it was first elaborated by Christopher Kolhoff on the boost mailing lists).


A simpler approach is possible from boost 1.68+, since you can run the io_context for a fixed amount of time using run_for(). This is shown in an example here. Basically, you need to create a new function to run the io_context and cancel the async read operation if it's not yet done when the time is up.

  void run(boost::asio::chrono::steady_clock::duration timeout)
  {
    // Restart the io_context, as it may have been left in the "stopped" state
    // by a previous operation.
    io_context_.restart();

    // Block until the asynchronous operation has completed, or timed out. 
    io_context_.run_for(timeout);

    // If the asynchronous operation completed successfully then the io_context
    // would have been stopped due to running out of work. If it was not
    // stopped, then the io_context::run_for call must have timed out.
    if (!io_context_.stopped())
    {
      // Close the socket to cancel the outstanding asynchronous operation.
      socket_.close();

      // Run the io_context again until the close operation completes.
      io_context_.run();
    }
  }

Then, you simply call it after the async read above, and before checking for an error.

const int RD_TIMEOUT_SECS = 5;
size_t receivedLength = 0;
boost::system::error_code error;
boost::asio::async_read(*s, boost::asio::buffer(receiveBuffer, sizeof(receiveBuffer)),
                        [&](const boost::system::error_code& result_error,
                            std::size_t result_n)
                        {
                            error = result_error;
                            receivedLength = result_n;
                        });

run(boost::asio::chrono::seconds(RD_TIMEOUT_SECS));

if (error) {
    LOG(lg::warning) << "Timeout reading socket" << std::endl; 
}

Similar techniques can be used for writing or connecting to the socket.


Finally, it is possible to use C++11's futures, as documented here

const int RD_TIMEOUT_SECS = 5;
std::future<size_t> futureReceivedLength =
  boost::asio::async_read(*socket,       
        boost::asio::buffer(receiveBuffer, sizeof(receiveBuffer)), 
        boost::asio::use_future);

if (futureReceivedLength.wait_for(std::chrono::seconds(RD_TIMEOUT_SECS)) 
    != std::future_status::ready) {
    std::cout << "Timeout reading socket." << std::endl;
    socket->cancel(); // cancel the operation.
    return; // do something appropriate; don't continue.
}

// this won't block because we know the future is ready. 
size_t receivedLength = futureReceivedLength.get();
// We can check the size and process the buffer

This is clean and standard, and you don't need any auxiliary functions. But you need to have the io_context run()ing in its own thread, which is probably wasteful depending upon your archittecture.

Hope this helps clear things up and save you some of the time I wasted figuring this all out.