From ad418dc7853679f1d79c280af5993b82c43dc51a Mon Sep 17 00:00:00 2001 From: Stephen Morris Date: Wed, 16 Feb 2011 11:52:51 +0000 Subject: [PATCH] [trac554] Update function names and prepare for TCP looping Updated function names to match convention. Also added receiveComplete() to prepare for the fact that a TCP receive may require multiple reads before the complete message is read. --- src/lib/asiolink/io_fetch.cc | 15 +++++----- src/lib/asiolink/io_fetch.h | 5 ++-- src/lib/asiolink/io_socket.cc | 16 +++++++++-- src/lib/asiolink/io_socket.h | 28 +++++++++++++++++-- src/lib/asiolink/tcp_socket.h | 22 +++++++++++++-- src/lib/asiolink/tests/udp_socket_unittest.cc | 17 +++++++---- src/lib/asiolink/udp_socket.cc | 8 ++++-- src/lib/asiolink/udp_socket.h | 26 +++++++++++++++-- 8 files changed, 109 insertions(+), 28 deletions(-) diff --git a/src/lib/asiolink/io_fetch.cc b/src/lib/asiolink/io_fetch.cc index 06691140df..5ab64794ec 100644 --- a/src/lib/asiolink/io_fetch.cc +++ b/src/lib/asiolink/io_fetch.cc @@ -86,6 +86,7 @@ IOFetch::IOFetchData::IOFetchData(IOService& io_service, msgbuf(new OutputBuffer(512)), // TODO: Why this number? data(new char[IOFetch::MAX_LENGTH]), callback(cb), + rcv_amount(0), stopped(false), timer(io_service.get_io_service()), timeout(wait) @@ -127,6 +128,10 @@ IOFetch::operator()(error_code ec, size_t length) { msg.addQuestion(data_->question); MessageRenderer renderer(*data_->msgbuf); msg.toWire(renderer); + + // As this is a new fetch, clear the amount of data received + data_->rcv_amount = 0; + dlog("Sending " + msg.toText() + " to " + data_->remote->getAddress().toText()); } @@ -142,9 +147,9 @@ IOFetch::operator()(error_code ec, size_t length) { } // Open a connection to the target system. For speed, if the operation - // was a no-op (i.e. UDP operation) we bypass the yield. - bool do_yield = data_->socket->open(data->remote.get(), *this); - if (do_yield) { + // was completed synchronously (i.e. UDP operation) we bypass the yield. + bool asynch = data_->socket->open(data->remote.get(), *this); + if (asynch) { CORO_YIELD; } @@ -153,10 +158,6 @@ IOFetch::operator()(error_code ec, size_t length) { CORO_YIELD data_->socket->async_send(data_->msgbuf->getData(), data_->msgbuf->getLength(), data_->remote.get(), *this); - /// Allocate space for the response. (XXX: This should be - /// optimized by maintaining a free list of pre-allocated blocks) - data_->data.reset(new char[MAX_LENGTH]); - /// Begin an asynchronous receive, and yield. When the receive /// completes, we will resume immediately after this point. CORO_YIELD data_->socket->async_receive(data_->data.get(), diff --git a/src/lib/asiolink/io_fetch.h b/src/lib/asiolink/io_fetch.h index 82f5c1f67a..00f276c0cc 100644 --- a/src/lib/asiolink/io_fetch.h +++ b/src/lib/asiolink/io_fetch.h @@ -44,9 +44,7 @@ namespace asiolink { /// \brief Upstream Fetch Processing /// /// IOFetch is the class used to send upstream fetches and to handle responses. -/// It is a base class containing most of the logic, although the ASIO will -/// actually instantiate one of the derived classes TCPFetch or UDPFetch. -/// (These differ in the type of socket and endpoint.) +/// It is more or less transport-agnostic, although the class IOFetch : public IOCompletionCallback { public: @@ -114,6 +112,7 @@ public: isc::dns::OutputBufferPtr msgbuf; ///< ... and here boost::shared_array data; ///< Temporary array for the data Callback* callback; ///< Called on I/O Completion + size_t rcv_amount; ///< Received amount bool stopped; ///< Have we stopped running? asio::deadline_timer timer; ///< Timer to measure timeouts int timeout; ///< Timeout in ms diff --git a/src/lib/asiolink/io_socket.cc b/src/lib/asiolink/io_socket.cc index 222291990e..11b0194e1e 100644 --- a/src/lib/asiolink/io_socket.cc +++ b/src/lib/asiolink/io_socket.cc @@ -74,7 +74,7 @@ public: /// \param length Unused /// \param endpoint Unused /// \param callback Unused - virtual void async_send(const void*, size_t, const IOEndpoint*, + virtual void asyncSend(const void*, size_t, const IOEndpoint*, IOCompletionCallback&) { } @@ -84,12 +84,24 @@ public: /// /// \param data Unused /// \param length Unused + /// \param cumulative Unused /// \param endpoint Unused /// \param callback Unused - virtual void async_receive(void* data, size_t, IOEndpoint*, + virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, IOCompletionCallback&) { } + /// \brief Checks if the data received is complete. + /// + /// \param data Unused + /// \param length Unused + /// \param cumulative Unused + /// + /// \return Always true + virtual bool receiveComplete(void*, size_t, size_t&) { + return (true); + } + /// \brief Cancel I/O On Socket /// /// Must be supplied as it is abstract in the base class. diff --git a/src/lib/asiolink/io_socket.h b/src/lib/asiolink/io_socket.h index d594fe1e64..4fb68f8af0 100644 --- a/src/lib/asiolink/io_socket.h +++ b/src/lib/asiolink/io_socket.h @@ -133,7 +133,7 @@ public: /// \param length Length of data to send /// \param endpoint Target of the send /// \param callback Callback object. - virtual void async_send(const void* data, size_t length, + virtual void asyncSend(const void* data, size_t length, const IOEndpoint* endpoint, IOCompletionCallback& callback) = 0; /// \brief Receive Asynchronously @@ -145,10 +145,32 @@ public: /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer + /// \param cumulative Amount of data that should already be in the buffer. /// \param endpoint Source of the communication /// \param callback Callback object - virtual void async_receive(void* data, size_t length, IOEndpoint* endpoint, - IOCompletionCallback& callback) = 0; + virtual void asyncReceive(void* data, size_t length, size_t cumulative, + IOEndpoint* endpoint, IOCompletionCallback& callback) = 0; + + /// \brief Checks if the data received is complete. + /// + /// This applies to TCP receives, where the data is a byte stream and a + /// receive is not guaranteed to receive the entire message. DNS messages + /// over TCP are prefixed by a two-byte count field. This method takes the + /// amount received so far and the amount received in this I/O and checks + /// if the message is complete, returning the appropriate indication. As + /// a side-effect, it also updates the amount received. + /// + /// For a UDP receive, all the data is received in one I/O, so this is + /// effectively a no-op (although it does update the amount received). + /// + /// \param data Data buffer containing data to date + /// \param length Amount of data received in last asynchronous I/O + /// \param cumulative On input, amount of data received before the last + /// I/O. On output, the total amount of data received to date. + /// + /// \return true if the receive is complete, false if another receive is + /// needed. + virtual bool receiveComplete(void* data, size_t length, size_t& cumulative) = 0; /// \brief Cancel I/O On Socket virtual void cancel() = 0; diff --git a/src/lib/asiolink/tcp_socket.h b/src/lib/asiolink/tcp_socket.h index b791c18ba9..abcc3d8c3d 100644 --- a/src/lib/asiolink/tcp_socket.h +++ b/src/lib/asiolink/tcp_socket.h @@ -75,7 +75,7 @@ public: /// \param length Length of data to send /// \param endpoint Target of the send /// \param callback Callback object. - virtual void async_send(const void*, size_t, + virtual void asyncSend(const void*, size_t, const IOEndpoint*, IOCompletionCallback&) { } @@ -88,12 +88,30 @@ public: /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer + /// \param cumulative Amount of data that should already be in the buffer. /// \param endpoint Source of the communication /// \param callback Callback object - virtual void async_receive(void* data, size_t, IOEndpoint*, + virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, IOCompletionCallback&) { } + /// \brief Checks if the data received is complete. + /// + /// Checks that the total data received is the amount expected by the + /// two-byte header to the message. + /// + /// \param data Data buffer containing data to date + /// \param length Amount of data received in last asynchronous I/O + /// \param cumulative On input, amount of data received before the last + /// I/O. On output, the total amount of data received to date. + /// + /// \return true if the receive is complete, false if another receive is + /// needed. + virtual bool receiveComplete(void*, size_t length, size_t& cumulative) { + cumulative = length; + return (true); + } + /// \brief Cancel I/O On Socket virtual void cancel() { } diff --git a/src/lib/asiolink/tests/udp_socket_unittest.cc b/src/lib/asiolink/tests/udp_socket_unittest.cc index b24a869e0d..6950c6e865 100644 --- a/src/lib/asiolink/tests/udp_socket_unittest.cc +++ b/src/lib/asiolink/tests/udp_socket_unittest.cc @@ -64,10 +64,10 @@ using namespace std; namespace { -const char* SERVER_ADDRESS = "127.0.0.1"; +const char SERVER_ADDRESS[] = "127.0.0.1"; const unsigned short SERVER_PORT = 5301; -// FIXME Shouldn't we send something that is real message? +// TODO: Shouldn't we send something that is real message? const char OUTBOUND_DATA[] = "Data sent from client to server"; const char INBOUND_DATA[] = "Returned data from server to client"; } @@ -179,6 +179,7 @@ private: boost::shared_ptr ptr_; ///< Pointer to private data }; +// TODO: Need to add a test to check the cancel() method // Tests the operation of a UDPSocket by opening it, sending an asynchronous // message to a server, receiving an asynchronous message from the server and @@ -193,6 +194,7 @@ TEST(UDPSocket, SequenceTest) { // The client - the UDPSocket being tested UDPSocket client(service); // Socket under test UDPCallback client_cb("Client"); // Async I/O callback function + size_t client_cumulative = 0; // Cumulative data received // The server - with which the client communicates. For convenience, we // use the same io_service, and use the endpoint object created for @@ -220,7 +222,7 @@ TEST(UDPSocket, SequenceTest) { // be called until we call the io_service.run() method. client_cb.setCalled(false); client_cb.setCode(7); // Arbitrary number - client.async_send(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &endpoint, client_cb); + client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &endpoint, client_cb); EXPECT_FALSE(client_cb.getCalled()); // Execute the two callbacks. @@ -243,7 +245,8 @@ TEST(UDPSocket, SequenceTest) { client_cb.setCalled(false); client_cb.setCode(32); // Arbitrary number UDPEndpoint client_remote_endpoint; // To receive address of remote system - client.async_receive(data, sizeof(data), &client_remote_endpoint, client_cb); + client.asyncReceive(data, sizeof(data), client_cumulative, + &client_remote_endpoint, client_cb); // Issue the write on the server side to the source of the data it received. server_cb.setLength(22345); // Arbitrary number @@ -252,7 +255,6 @@ TEST(UDPSocket, SequenceTest) { server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)), server_remote_endpoint.getASIOEndpoint(), server_cb); - // Expect two callbacks to run service.run_one(); service.run_one(); @@ -272,6 +274,11 @@ TEST(UDPSocket, SequenceTest) { EXPECT_TRUE(server_address == client_remote_endpoint.getAddress()); EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort()); + // Finally, check that the receive received a complete buffer's worth of data. + EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(), + client_cumulative)); + EXPECT_EQ(client_cb.getLength(), client_cumulative); + // Close client and server. EXPECT_NO_THROW(client.close()); EXPECT_NO_THROW(server.close()); diff --git a/src/lib/asiolink/udp_socket.cc b/src/lib/asiolink/udp_socket.cc index fb6ab9cf77..d1bd9aaa8a 100644 --- a/src/lib/asiolink/udp_socket.cc +++ b/src/lib/asiolink/udp_socket.cc @@ -84,7 +84,7 @@ UDPSocket::open(const IOEndpoint* endpoint, IOCompletionCallback&) { // Send a message. void -UDPSocket::async_send(const void* data, size_t length, +UDPSocket::asyncSend(const void* data, size_t length, const IOEndpoint* endpoint, IOCompletionCallback& callback) { // Upconverting. Not nice, but we have the problem that in the abstract @@ -99,10 +99,12 @@ UDPSocket::async_send(const void* data, size_t length, callback); } -// UDPSocket::receive_from +// Receive a message. Note that the "cumulative" argument is ignored - every UDP +// receive is put into the buffer beginning at the start - there is no concept +// receiving a subsequent part of a message. void -UDPSocket::async_receive(void* data, size_t length, IOEndpoint* endpoint, +UDPSocket::asyncReceive(void* data, size_t length, size_t, IOEndpoint* endpoint, IOCompletionCallback& callback) { // Upconvert the endpoint again. diff --git a/src/lib/asiolink/udp_socket.h b/src/lib/asiolink/udp_socket.h index 4522141be8..9b1af8786f 100644 --- a/src/lib/asiolink/udp_socket.h +++ b/src/lib/asiolink/udp_socket.h @@ -82,7 +82,7 @@ public: /// \param length Length of data to send /// \param endpoint Target of the send /// \param callback Callback object. - virtual void async_send(const void* data, size_t length, + virtual void asyncSend(const void* data, size_t length, const IOEndpoint* endpoint, IOCompletionCallback& callback); /// \brief Receive Asynchronously @@ -94,10 +94,30 @@ public: /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer + /// \param cumulative Amount of data that should already be in the buffer. + /// (This is ignored - every UPD receive fills the buffer from the start.) /// \param endpoint Source of the communication /// \param callback Callback object - virtual void async_receive(void* data, size_t length, IOEndpoint* endpoint, - IOCompletionCallback& callback); + virtual void asyncReceive(void* data, size_t length, size_t cumulative, + IOEndpoint* endpoint, IOCompletionCallback& callback); + + /// \brief Checks if the data received is complete. + /// + /// As all the data is received in one I/O, so this is, this is effectively + /// a no-op (although it does update the amount of data received). + /// + /// \param data Data buffer containing data to date. (This is ignored + /// for UDP receives.) + /// \param length Amount of data received in last asynchronous I/O + /// \param cumulative On input, amount of data received before the last + /// I/O. On output, the total amount of data received to date. + /// + /// \return true if the receive is complete, false if another receive is + /// needed. + virtual bool receiveComplete(void*, size_t length, size_t& cumulative) { + cumulative = length; + return (true); + } /// \brief Cancel I/O On Socket virtual void cancel();