diff --git a/src/lib/asiolink/dummy_io_cb.h b/src/lib/asiolink/dummy_io_cb.h index bde656c348..0006b95cfc 100644 --- a/src/lib/asiolink/dummy_io_cb.h +++ b/src/lib/asiolink/dummy_io_cb.h @@ -36,6 +36,14 @@ namespace asiolink { class DummyIOCallback { public: + /// \brief Asynchronous I/O callback method + /// + /// \param error Unused + void operator()(asio::error_code) + { + // TODO: log an error if this method ever gets called. + } + /// \brief Asynchronous I/O callback method /// /// \param error Unused diff --git a/src/lib/asiolink/io_asio_socket.h b/src/lib/asiolink/io_asio_socket.h index d485610bc2..2e165f4015 100644 --- a/src/lib/asiolink/io_asio_socket.h +++ b/src/lib/asiolink/io_asio_socket.h @@ -50,6 +50,16 @@ public: IOError(file, line, what) {} }; +/// \brief Buffer Overflow +/// +/// Thrown if an attempt is made to receive into an area beyond the end of +/// the receive data buffer. +class BufferOverflow : public IOError { +public: + BufferOverflow(const char* file, size_t line, const char* what) : + IOError(file, line, what) {} +}; + /// Forward declaration of an IOEndpoint class IOEndpoint; @@ -129,32 +139,47 @@ public: /// \return IPPROTO_TCP for TCP sockets virtual int getProtocol() const = 0; - /// \brief Open AsioSocket + /// \brief Is Open() synchronous? /// - /// Opens the socket for asynchronous I/O. On a UDP socket, this is merely - /// an "open()" on the underlying socket (so completes immediately), but on - /// a TCP socket it also connects to the remote end (which is done as an + /// On a UDP socket, an "open" operation is merely a call to "open()" on + /// the underlying socket (so completes immediately), but on a TCP socket it + /// also includings connecting to the remote end (which is done as an /// asynchronous operation). /// /// For TCP, signalling of the completion of the operation is done by /// by calling the callback function in the normal way. This could be done /// for UDP (by posting en event on the event queue); however, that will - /// incur additional overhead in the most common case. Instead, the return - /// value indicates whether the operation was asynchronous or not. If yes, - /// (i.e. TCP) the callback has been posted to the event queue: if no (UDP), - /// no callback has been posted (in which case it is up to the caller as to - /// whether they want to manually post the callback themself.) + /// incur additional overhead in the most common case. So we give the + /// caller the choice for calling this open() method synchronously or + /// asynchronously. + /// + /// Owing to the way that the stackless coroutines are implemented, we need + /// to know _before_ executing the operation whether or not the open is + /// asynchronous. So this method simply provides that information. + /// + /// (The reason there is a need to know is because the call to open() passes + /// in the state of the coroutine at the time the call is made. On an + /// asynchronous I/O, we need to set the state to point to the statement + /// after the call to open() before we pass the corotuine to the open() + /// call. Unfortunately, the macros that do this also yield control - which + /// we don't want to do if the open is synchronous. Hence we need to know + /// before we make the call to open() whether that call will complete + /// asynchronously.) + virtual bool isOpenSynchronous() const = 0; + + /// \brief Open AsioSocket + /// + /// Opens the socket for asynchronous I/O. The open will complete + /// synchronously on UCP or asynchronously on TCP (in which case a callback + /// will be queued): what will happen can be found by calling the method + /// isOpenSynchronous(). /// /// \param endpoint Pointer to the endpoint object. This is ignored for /// a UDP socket (the target is specified in the send call), but should /// be of type TCPEndpoint for a TCP connection. /// \param callback I/O Completion callback, called when the operation has /// completed, but only if the operation was asynchronous. - /// - /// \return true if an asynchronous operation was started and the caller - /// should yield and wait for completion, false if the operation was - /// completed synchronously and no callback was queued. - virtual bool open(const IOEndpoint* endpoint, C& callback) = 0; + virtual void open(const IOEndpoint* endpoint, C& callback) = 0; /// \brief Send Asynchronously /// @@ -167,7 +192,7 @@ public: /// \param endpoint Target of the send /// \param callback Callback object. virtual void asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, C& callback) = 0; + const IOEndpoint* endpoint, C& callback) = 0; /// \brief Receive Asynchronously /// @@ -178,11 +203,11 @@ public: /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer - /// \param cumulative Amount of data that should already be in the buffer. + /// \param offset Offset into buffer where data is to be put /// \param endpoint Source of the communication /// \param callback Callback object - virtual void asyncReceive(void* data, size_t length, size_t cumulative, - IOEndpoint* endpoint, C& callback) = 0; + virtual void asyncReceive(void* data, size_t length, size_t offset, + IOEndpoint* endpoint, C& callback) = 0; /// \brief Checks if the data received is complete. /// @@ -204,7 +229,7 @@ public: /// \return true if the receive is complete, false if another receive is /// needed. virtual bool receiveComplete(void* data, size_t length, - size_t& cumulative) = 0; + size_t& cumulative) = 0; /// \brief Cancel I/O On AsioSocket virtual void cancel() = 0; @@ -251,6 +276,13 @@ public: virtual int getProtocol() const { return (protocol_); } + /// \brief Is socket opening synchronous? + /// + /// \return true - it is for this class. + bool isOpenSynchronous() const { + return true; + } + /// \brief Open AsioSocket /// /// A call that is a no-op on UDP sockets, this opens a connection to the @@ -280,7 +312,7 @@ public: /// /// \param data Unused /// \param length Unused - /// \param cumulative Unused + /// \param offset Unused /// \param endpoint Unused /// \param callback Unused virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*, C&) { diff --git a/src/lib/asiolink/io_endpoint.cc b/src/lib/asiolink/io_endpoint.cc index bf79f61868..97e9c9139c 100644 --- a/src/lib/asiolink/io_endpoint.cc +++ b/src/lib/asiolink/io_endpoint.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include diff --git a/src/lib/asiolink/io_fetch.cc b/src/lib/asiolink/io_fetch.cc index 7fce6074ed..d890a52f49 100644 --- a/src/lib/asiolink/io_fetch.cc +++ b/src/lib/asiolink/io_fetch.cc @@ -19,6 +19,9 @@ #include #include +#include +#include +#include #include #include @@ -28,10 +31,18 @@ #include #include +#include + #include #include +#include #include #include +#include +#include +#include +#include +#include using namespace asio; using namespace isc::dns; @@ -44,13 +55,87 @@ namespace asiolink { isc::log::Logger logger("asio"); +/// \brief IOFetch Data +/// +/// The data for IOFetch is held in a separate struct pointed to by a +/// shared_ptr object. This is because the IOFetch object will be copied +/// often (it is used as a coroutine and passed as callback to many +/// async_*() functions) and we want keep the same data). Organising the +/// data in this way keeps copying to a minimum. +struct IOFetchData { + + // The first two members are shared pointers to a base class because what is + // actually instantiated depends on whether the fetch is over UDP or TCP, + // which is not known until construction of the IOFetch. Use of a shared + //pointer here is merely to ensure deletion when the data object is deleted. + boost::shared_ptr > socket; + ///< Socket to use for I/O + boost::shared_ptr remote; ///< Where the fetch was sent + isc::dns::Question question; ///< Question to be asked + isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question + isc::dns::OutputBufferPtr buffer; ///< Received data held here + boost::shared_array data; ///< Temporary array for data + IOFetch::Callback* callback; ///< Called on I/O Completion + size_t cumulative; ///< Cumulative received amount + bool stopped; ///< Have we stopped running? + asio::deadline_timer timer; ///< Timer to measure timeouts + int timeout; ///< Timeout in ms + IOFetch::Origin origin; ///< Origin of last asynchronous I/O + + /// \brief Constructor + /// + /// Just fills in the data members of the IOFetchData structure + /// + /// \param protocol Either IOFetch::TCP or IOFetch::UDP + /// \param service I/O Service object to handle the asynchronous + /// operations. + /// \param query DNS question to send to the upstream server. + /// \param address IP address of upstream server + /// \param port Port to use for the query + /// \param buff Output buffer into which the response (in wire format) + /// is written (if a response is received). + /// \param cb Callback object containing the callback to be called + /// when we terminate. The caller is responsible for managing this + /// object and deleting it if necessary. + /// \param wait Timeout for the fetch (in ms). + /// + /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554) + IOFetchData(IOFetch::Protocol protocol, IOService& service, + const isc::dns::Question& query, const IOAddress& address, + uint16_t port, isc::dns::OutputBufferPtr& buff, IOFetch::Callback* cb, + int wait) + : + socket((protocol == IOFetch::UDP) ? + static_cast*>( + new UDPSocket(service)) : + static_cast*>( + new TCPSocket(service)) + ), + remote((protocol == IOFetch::UDP) ? + static_cast(new UDPEndpoint(address, port)) : + static_cast(new TCPEndpoint(address, port)) + ), + question(query), + msgbuf(new isc::dns::OutputBuffer(512)), + buffer(buff), + data(new char[IOFetch::MIN_LENGTH]), + callback(cb), + cumulative(0), + stopped(false), + timer(service.get_io_service()), + timeout(wait), + origin(IOFetch::NONE) + {} +}; + + /// IOFetch Constructor - just initialize the private data IOFetch::IOFetch(Protocol protocol, IOService& service, const isc::dns::Question& question, const IOAddress& address, uint16_t port, - isc::dns::OutputBufferPtr& buff, Callback* cb, int wait) + OutputBufferPtr& buff, Callback* cb, int wait) : - data_(new IOFetch::IOFetchData(protocol, service, question, address, + data_(new IOFetchData(protocol, service, question, address, port, buff, cb, wait)) { } @@ -59,7 +144,9 @@ IOFetch::IOFetch(Protocol protocol, IOService& service, /// pattern; see internal/coroutine.h for details. void -IOFetch::operator()(error_code ec, size_t length) { +IOFetch::operator()(asio::error_code ec, size_t length) { + std::cerr << "IOFetch::operator() [" << this << "], origin = " << + data_->origin << ", coroutine = " << get_value() << "\n"; if (data_->stopped) { return; } else if (ec) { @@ -91,7 +178,6 @@ IOFetch::operator()(error_code ec, size_t length) { data_->remote->getAddress().toText()); } - // If we timeout, we stop, which will shutdown everything and // cancel all other attempts to run inside the coroutine if (data_->timeout != -1) { @@ -103,17 +189,26 @@ IOFetch::operator()(error_code ec, size_t length) { // Open a connection to the target system. For speed, if the operation // was completed synchronously (i.e. UDP operation) we bypass the yield. - if (data_->socket->open(data_->remote.get(), *this)) { - data_->origin = OPEN; - CORO_YIELD; + + data_->origin = OPEN; + if (data_->socket->isOpenSynchronous()) { + std::cerr << "IOFetch: Opening socket synchronously\n"; + data_->socket->open(data_->remote.get(), *this); + } else { + std::cerr << "IOFetch: Opening socket asynchronously and yeilding\n"; + CORO_YIELD data_->socket->open(data_->remote.get(), *this); + std::cerr << "IOFetch: Resuming after Opening socket asynchronously\n"; } // Begin an asynchronous send, and then yield. When the send completes // send completes, we will resume immediately after this point. + // Note: A TCP message may not be sent in one piece (depends on the + // implementation in TCP socket). Therefore there may be data_->origin = SEND; + std::cerr << "IOFetch: asynchronous send\n"; CORO_YIELD data_->socket->asyncSend(data_->msgbuf->getData(), data_->msgbuf->getLength(), data_->remote.get(), *this); - + std::cerr << "IOFetch: resuming after asynchronous send\n"; // Now receive the response. Since TCP may not receive the entire // message in one operation, we need to loop until we have received // it. (This can't be done within the asyncReceive() method because @@ -123,9 +218,11 @@ IOFetch::operator()(error_code ec, size_t length) { // we check if the operation is complete and if not, loop to read again. data_->origin = RECEIVE; do { + std::cerr << "IOFetch: asynchronous receive\n"; CORO_YIELD data_->socket->asyncReceive(data_->data.get(), - static_cast(MAX_LENGTH), data_->cumulative, + static_cast(MIN_LENGTH), data_->cumulative, data_->remote.get(), *this); + std::cerr << "IOFetch: resuming after asynchronous receive\n"; } while (!data_->socket->receiveComplete(data_->data.get(), length, data_->cumulative)); @@ -141,6 +238,7 @@ IOFetch::operator()(error_code ec, size_t length) { // Finished with this socket, so close it. data_->origin = CLOSE; + std::cerr << "IOFetch: close\n"; data_->socket->close(); /// We are done @@ -230,7 +328,7 @@ IOFetch::stop(Result result) { // Log an error - called on I/O failure -void IOFetch::logIOFailure(asio::error_code& ec) { +void IOFetch::logIOFailure(asio::error_code ec) { // Get information that will be in all messages static const char* PROTOCOL[2] = {"TCP", "UDP"}; diff --git a/src/lib/asiolink/io_fetch.h b/src/lib/asiolink/io_fetch.h index 369e057156..479c54c8f4 100644 --- a/src/lib/asiolink/io_fetch.h +++ b/src/lib/asiolink/io_fetch.h @@ -17,31 +17,24 @@ #include -#include -#include -#include // for some IPC/network system calls #include #include #include -#include + +#include #include #include #include -#include -#include -#include -#include -#include -#include -#include - - namespace asiolink { +// Forward declarations +class IOAddress; +class IOFetchData; +class IOService; /// \brief Upstream Fetch Processing /// @@ -76,9 +69,9 @@ public: /// even if the contents of the packet indicate that some error occurred. enum Result { SUCCESS = 0, ///< Success, fetch completed - TIME_OUT, ///< Failure, fetch timed out - STOPPED, ///< Control code, fetch has been stopped - NOTSET ///< For testing, indicates value not set + TIME_OUT = 1, ///< Failure, fetch timed out + STOPPED = 2, ///< Control code, fetch has been stopped + NOTSET = 3 ///< For testing, indicates value not set }; // The next enum is a "trick" to allow constants to be defined in a class @@ -86,7 +79,7 @@ public: /// \brief Integer Constants enum { - MAX_LENGTH = 4096 ///< Maximum size of receive buffer + MIN_LENGTH = 4096 ///< Minimum size of receive buffer }; /// \brief I/O Fetch Callback @@ -112,91 +105,14 @@ public: virtual ~Callback() {} - /// \brief Callback method called when the fetch completes /// \brief Origin of Asynchronous I/O Call - /// - - // The next enum is a "trick" to allow constants to be defined in a class - // declaration. - + /// \brief Callback method /// - /// \brief result Result of the fetch + /// This is the method called when the fecth completes. + /// + /// \param result Result of the fetch virtual void operator()(Result result) = 0; }; - /// \brief IOFetch Data - /// - /// The data for IOFetch is held in a separate struct pointed to by a - /// shared_ptr object. This is because the IOFetch object will be copied - /// often (it is used as a coroutine and passed as callback to many - /// async_*() functions) and we want keep the same data). Organising the - /// data in this way keeps copying to a minimum. - struct IOFetchData { - - // The next two members are shared pointers to a base class because what - // is actually instantiated depends on whether the fetch is over UDP or - // TCP, which is not known until construction of the IOFetch. Use of - // a shared pointer here is merely to ensure deletion when the data - // object is deleted. - boost::shared_ptr > socket; - ///< Socket to use for I/O - boost::shared_ptr remote; ///< Where the fetch was sent - isc::dns::Question question; ///< Question to be asked - isc::dns::OutputBufferPtr msgbuf; ///< Wire buffer for question - isc::dns::OutputBufferPtr buffer; ///< Received data held here - boost::shared_array data; ///< Temporary array for data - IOFetch::Callback* callback; ///< Called on I/O Completion - size_t cumulative; ///< Cumulative received amount - bool stopped; ///< Have we stopped running? - asio::deadline_timer timer; ///< Timer to measure timeouts - int timeout; ///< Timeout in ms - Origin origin; ///< Origin of last asynchronous I/O - - /// \brief Constructor - /// - /// Just fills in the data members of the IOFetchData structure - /// - /// \param proto Protocol: either IOFetch::TCP or IOFetch::UDP - /// \param service I/O Service object to handle the asynchronous - /// operations. - /// \param query DNS question to send to the upstream server. - /// \param address IP address of upstream server - /// \param port Port to use for the query - /// \param buff Output buffer into which the response (in wire format) - /// is written (if a response is received). - /// \param cb Callback object containing the callback to be called - /// when we terminate. The caller is responsible for managing this - /// object and deleting it if necessary. - /// \param wait Timeout for the fetch (in ms). - /// - /// TODO: May need to alter constructor (see comment 4 in Trac ticket #554) - IOFetchData(Protocol proto, IOService& service, - const isc::dns::Question& query, const IOAddress& address, - uint16_t port, isc::dns::OutputBufferPtr& buff, Callback* cb, - int wait) - : - socket((proto == UDP) ? - static_cast*>( - new UDPSocket(service)) : - static_cast*>( - new TCPSocket(service)) - ), - remote((proto == UDP) ? - static_cast(new UDPEndpoint(address, port)) : - static_cast(new TCPEndpoint(address, port)) - ), - question(query), - msgbuf(new isc::dns::OutputBuffer(512)), - buffer(buff), - data(new char[IOFetch::MAX_LENGTH]), - callback(cb), - cumulative(0), - stopped(false), - timer(service.get_io_service()), - timeout(wait), - origin(NONE) - {} - }; - /// \brief Constructor. /// /// Creates the object that will handle the upstream fetch. @@ -229,8 +145,16 @@ public: /// /// \param ec Error code, the result of the last asynchronous I/O operation. /// \param length Amount of data received on the last asynchronous read - void operator()(asio::error_code ec = asio::error_code(), - size_t length = 0); + void operator()(asio::error_code ec, size_t length); + + void operator()(asio::error_code ec) { + operator()(ec, 0); + } + + void operator()() { + asio::error_code ec; + operator()(ec); + } /// \brief Terminate query /// @@ -246,7 +170,7 @@ private: /// Records an I/O failure to the log file /// /// \param ec ASIO error code - void logIOFailure(asio::error_code& ec); + void logIOFailure(asio::error_code ec); boost::shared_ptr data_; ///< Private data diff --git a/src/lib/asiolink/tcp_endpoint.h b/src/lib/asiolink/tcp_endpoint.h index 8f6270f3b3..158ca4a97e 100644 --- a/src/lib/asiolink/tcp_endpoint.h +++ b/src/lib/asiolink/tcp_endpoint.h @@ -24,32 +24,33 @@ namespace asiolink { /// \brief The \c TCPEndpoint class is a concrete derived class of -/// \c IOEndpoint that represents an endpoint of a TCP connection. +/// \c IOEndpoint that represents an endpoint of a TCP packet. /// -/// In the current implementation, an object of this class is always -/// instantiated within the wrapper routines. Applications are expected to -/// get access to the object via the abstract base class, \c IOEndpoint. -/// This design may be changed when we generalize the wrapper interface. -/// -/// Note: this implementation is optimized for the case where this object -/// is created from an ASIO endpoint object in a receiving code path -/// by avoiding to make a copy of the base endpoint. For TCP it may not be -/// a big deal, but when we receive UDP packets at a high rate, the copy -/// overhead might be significant. +/// Other notes about \c TCPEndpoint applies to this class, too. class TCPEndpoint : public IOEndpoint { public: /// - /// \name Constructors and Destructor + /// \name Constructors and Destructor. /// //@{ + + /// \brief Default Constructor + /// + /// Creates an internal endpoint. This is expected to be set by some + /// external call. + TCPEndpoint() : + asio_endpoint_placeholder_(new asio::ip::tcp::endpoint()), + asio_endpoint_(*asio_endpoint_placeholder_) + {} + /// \brief Constructor from a pair of address and port. /// /// \param address The IP address of the endpoint. /// \param port The TCP port number of the endpoint. TCPEndpoint(const IOAddress& address, const unsigned short port) : asio_endpoint_placeholder_( - new asio::ip::tcp::endpoint( - asio::ip::address::from_string(address.toText()), port)), + new asio::ip::tcp::endpoint(asio::ip::address::from_string(address.toText()), + port)), asio_endpoint_(*asio_endpoint_placeholder_) {} @@ -59,39 +60,53 @@ public: /// corresponding ASIO class, \c tcp::endpoint. /// /// \param asio_endpoint The ASIO representation of the TCP endpoint. - TCPEndpoint(const asio::ip::tcp::endpoint& asio_endpoint) : + TCPEndpoint(asio::ip::tcp::endpoint& asio_endpoint) : asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint) {} + /// \brief Constructor from an ASIO TCP endpoint. + /// + /// This constructor is designed to be an efficient wrapper for the + /// corresponding ASIO class, \c tcp::endpoint. + /// + /// \param asio_endpoint The ASIO representation of the TCP endpoint. + TCPEndpoint(const asio::ip::tcp::endpoint& asio_endpoint) : + asio_endpoint_placeholder_(new asio::ip::tcp::endpoint(asio_endpoint)), + asio_endpoint_(*asio_endpoint_placeholder_) + {} + /// \brief The destructor. - ~TCPEndpoint() { delete asio_endpoint_placeholder_; } + virtual ~TCPEndpoint() { delete asio_endpoint_placeholder_; } //@} - IOAddress getAddress() const { + virtual IOAddress getAddress() const { return (asio_endpoint_.address()); } - uint16_t getPort() const { + virtual uint16_t getPort() const { return (asio_endpoint_.port()); } - short getProtocol() const { + virtual short getProtocol() const { return (asio_endpoint_.protocol().protocol()); } - short getFamily() const { + virtual short getFamily() const { return (asio_endpoint_.protocol().family()); } // This is not part of the exosed IOEndpoint API but allows // direct access to the ASIO implementation of the endpoint - const asio::ip::tcp::endpoint& getASIOEndpoint() const { + inline const asio::ip::tcp::endpoint& getASIOEndpoint() const { + return (asio_endpoint_); + } + inline asio::ip::tcp::endpoint& getASIOEndpoint() { return (asio_endpoint_); } private: - const asio::ip::tcp::endpoint* asio_endpoint_placeholder_; - const asio::ip::tcp::endpoint& asio_endpoint_; + asio::ip::tcp::endpoint* asio_endpoint_placeholder_; + asio::ip::tcp::endpoint& asio_endpoint_; }; } // namespace asiolink diff --git a/src/lib/asiolink/tcp_socket.h b/src/lib/asiolink/tcp_socket.h index 5a85aaa633..a7cc8e97da 100644 --- a/src/lib/asiolink/tcp_socket.h +++ b/src/lib/asiolink/tcp_socket.h @@ -27,8 +27,13 @@ #include #include +#include +#include + #include +#include + #include #include #include @@ -36,6 +41,15 @@ namespace asiolink { +/// \brief Buffer Too Large +/// +/// Thrown on an attempt to send a buffer > 64k +class BufferTooLarge : public IOError { +public: + BufferTooLarge(const char* file, size_t line, const char* what) : + IOError(file, line, what) {} +}; + /// \brief The \c TCPSocket class is a concrete derived class of \c IOAsioSocket /// that represents a TCP socket. /// @@ -67,27 +81,37 @@ public: /// \brief Destructor virtual ~TCPSocket(); - virtual int getNative() const { return (socket_.native()); } - virtual int getProtocol() const { return (IPPROTO_TCP); } + /// \brief Return file descriptor of underlying socket + virtual int getNative() const { + return (socket_.native()); + } + + /// \brief Return protocol of socket + virtual int getProtocol() const { + return (IPPROTO_TCP); + } + + /// \brief Is "open()" synchronous? + /// + /// Indicates that the opening of a TCP socket is asynchronous. + virtual bool isOpenSynchronous() const { + return (false); + } /// \brief Open Socket /// - /// Opens the TCP socket. In the model for transport-layer agnostic I/O, - /// an "open" operation includes a connection to the remote end (which - /// may take time). This does not happen for TCP, so the method returns - /// "false" to indicate that the operation completed synchronously. + /// Opens the UDP socket. This is an asynchronous operation, completion of + /// which will be signalled via a call to the callback function. /// /// \param endpoint Endpoint to which the socket will connect to. - /// \param callback Unused. - /// - /// \return false to indicate that the "operation" completed synchronously. - virtual bool open(const IOEndpoint* endpoint, C&); + /// \param callback Callback object. + virtual void open(const IOEndpoint* endpoint, C& callback); /// \brief Send Asynchronously /// - /// This corresponds to async_send_to() for TCP sockets and async_send() - /// for TCP. In both cases an endpoint argument is supplied indicating the - /// target of the send - this is ignored for TCP. + /// Calls the underlying socket's async_send() method to send a packet of + /// data asynchronously to the remote endpoint. The callback will be called + /// on completion. /// /// \param data Data to send /// \param length Length of data to send @@ -98,19 +122,17 @@ public: /// \brief Receive Asynchronously /// - /// This correstponds to async_receive_from() for TCP sockets and - /// async_receive() for TCP. In both cases, an endpoint argument is - /// supplied to receive the source of the communication. For TCP it will - /// be filled in with details of the connection. + /// Calls the underlying socket's async_receive() method to read a packet + /// of data from a remote endpoint. Arrival of the data is signalled via a + /// call to the callback function. /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer - /// \param cumulative Amount of data that should already be in the buffer. - /// (This is ignored - every UPD receive fills the buffer from the start.) + /// \param offset Offset into buffer where data is to be put /// \param endpoint Source of the communication /// \param callback Callback object - virtual void asyncReceive(void* data, size_t length, size_t cumulative, - IOEndpoint* endpoint, C& callback); + virtual void asyncReceive(void* data, size_t length, size_t offset, + IOEndpoint* endpoint, C& callback); /// \brief Checks if the data received is complete. /// @@ -144,13 +166,24 @@ private: asio::ip::tcp::socket* socket_ptr_; ///< Pointer to own socket asio::ip::tcp::socket& socket_; ///< Socket bool isopen_; ///< true when socket is open + + // TODO: Remove temporary buffer + // The current implementation copies the buffer passed to asyncSend() into + // a temporary buffer and precedes it with a two-byte count field. As + // ASIO should really be just about sendiong and receiving data, the TCP + // code should not do this. If the protocol using this requires a two-byte + // count, it should add it before calling this code. (This may be best + // achieved by altering isc::dns::buffer to have pairs of methods: + // getLength()/getTCPLength(), getData()/getTCPData(), with the getTCPXxx() + // methods taking into account a two-byte count field.) + isc::dns::OutputBufferPtr send_buffer_; ///< Send buffer }; // Constructor - caller manages socket template TCPSocket::TCPSocket(asio::ip::tcp::socket& socket) : - socket_ptr_(NULL), socket_(socket), isopen_(true) + socket_ptr_(NULL), socket_(socket), isopen_(true), send_buffer_() { } @@ -171,16 +204,16 @@ TCPSocket::~TCPSocket() delete socket_ptr_; } -// Open the socket. Throws an error on failure -// TODO: Make the open more resilient +// Open the socket. -template bool -TCPSocket::open(const IOEndpoint* endpoint, C&) { +template void +TCPSocket::open(const IOEndpoint* endpoint, C& callback) { // Ignore opens on already-open socket. Don't throw a failure because // of uncertainties as to what precedes whan when using asynchronous I/O. // At also allows us a treat a passed-in socket as a self-managed socket. + std::cerr << "TCPSocket::open(): open_ flags is " << isopen_ << "\n"; if (!isopen_) { if (endpoint->getFamily() == AF_INET) { socket_.open(asio::ip::tcp::v4()); @@ -190,35 +223,57 @@ TCPSocket::open(const IOEndpoint* endpoint, C&) { } isopen_ = true; - // TODO: Complete TCPSocket::open() + // Set options on the socket: + // Reuse address - allow the socket to bind to a port even if the port + // is in the TIMED_WAIT state. + socket_.set_option(asio::socket_base::reuse_address(true)); } - return (false); + + // Upconvert to a TCPEndpoint. We need to do this because although + // IOEndpoint is the base class of UDPEndpoint and TCPEndpoint, it does not + // contain a method for getting at the underlying endpoint type - that is in + /// the derived class and the two classes differ on return type. + assert(endpoint->getProtocol() == IPPROTO_TCP); + const TCPEndpoint* tcp_endpoint = + static_cast(endpoint); + + // Connect to the remote endpoint. On success, the handler will be + // called (with one argument - the length argument will default to + // zero). + socket_.async_connect(tcp_endpoint->getASIOEndpoint(), callback); } // Send a message. Should never do this if the socket is not open, so throw // an exception if this is the case. template void -TCPSocket::asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, C& callback) +TCPSocket::asyncSend(const void* data, size_t length, const IOEndpoint*, + C& callback) { if (isopen_) { - // Upconvert to a TCPEndpoint. We need to do this because although - // IOEndpoint is the base class of TCPEndpoint and TCPEndpoint, it - // doing cont contain a method for getting at the underlying endpoint - // type - those are in the derived class and the two classes differ on - // return type. + // Need to copy the data into a temporary buffer and precede it with + // a two-byte count field. + // TODO: arrange for the buffer passed to be preceded by the count + try { + // Ensure it fits into 16 bits + uint16_t count = boost::numeric_cast(length); - assert(endpoint->getProtocol() == IPPROTO_TCP); - const TCPEndpoint* tcp_endpoint = - static_cast(endpoint); - std::cerr << "TCPSocket::asyncSend(): sending to " << - tcp_endpoint->getAddress().toText() << - ", port " << tcp_endpoint->getPort() << "\n"; + // Copy data into a buffer preceded by the count field. + send_buffer_.reset(new isc::dns::OutputBuffer(length + 2)); + send_buffer_->writeUint16(count); + send_buffer_->writeData(data, length); - // TODO: Complete TCPSocket::asyncSend() + // ... and send it + std::cerr << "TCPSocket::asyncSend(): sending " << count << " data bytes\n"; + + socket_.async_send(asio::buffer(send_buffer_->getData(), + send_buffer_->getLength()), callback); + } catch (boost::numeric::bad_numeric_cast& e) { + isc_throw(BufferTooLarge, + "attempt to send buffer larger than 64kB"); + } } else { isc_throw(SocketNotOpen, diff --git a/src/lib/asiolink/tests/Makefile.am b/src/lib/asiolink/tests/Makefile.am index ff4a745ffe..ded145c17c 100644 --- a/src/lib/asiolink/tests/Makefile.am +++ b/src/lib/asiolink/tests/Makefile.am @@ -25,6 +25,8 @@ run_unittests_SOURCES += io_socket_unittest.cc run_unittests_SOURCES += io_service_unittest.cc run_unittests_SOURCES += interval_timer_unittest.cc run_unittests_SOURCES += recursive_query_unittest.cc +run_unittests_SOURCES += tcp_endpoint_unittest.cc +run_unittests_SOURCES += tcp_socket_unittest.cc run_unittests_SOURCES += udp_endpoint_unittest.cc run_unittests_SOURCES += udp_socket_unittest.cc diff --git a/src/lib/asiolink/tests/io_fetch_unittest.cc b/src/lib/asiolink/tests/io_fetch_unittest.cc index d21f03f1ae..a265d6e5ed 100644 --- a/src/lib/asiolink/tests/io_fetch_unittest.cc +++ b/src/lib/asiolink/tests/io_fetch_unittest.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -29,12 +30,14 @@ #include #include +#include +#include #include #include using namespace asio; using namespace isc::dns; -using asio::ip::udp; +using namespace asio::ip; namespace asiolink { @@ -51,13 +54,16 @@ public: IOFetch::Result expected_; ///< Expected result of the callback bool run_; ///< Did the callback run already? Question question_; ///< What to ask - OutputBufferPtr buff_; ///< Buffer to hold result + OutputBufferPtr result_buff_; ///< Buffer to hold result of fetch + OutputBufferPtr msgbuf_; ///< Buffer corresponding to known question IOFetch udp_fetch_; ///< For UDP query test - //IOFetch tcp_fetch_; ///< For TCP query test + IOFetch tcp_fetch_; ///< For TCP query test + IOFetch::Protocol protocol_; ///< Protocol being tested - // The next member is the buffer iin which the "server" (implemented by the - // response handler method) receives the question sent by the fetch object. - char server_buff_[512]; ///< Server buffer + // The next member is the buffer in which the "server" (implemented by the + // response handler methods in this class) receives the question sent by the + // fetch object. + uint8_t server_buff_[512]; ///< Server buffer /// \brief Constructor IOFetchTest() : @@ -65,106 +71,293 @@ public: expected_(IOFetch::NOTSET), run_(false), question_(Name("example.net"), RRClass::IN(), RRType::A()), - buff_(new OutputBuffer(512)), + result_buff_(new OutputBuffer(512)), + msgbuf_(new OutputBuffer(512)), udp_fetch_(IOFetch::UDP, service_, question_, IOAddress(TEST_HOST), - TEST_PORT, buff_, this, 100) - // tcp_fetch_(service_, question_, IOAddress(TEST_HOST), TEST_PORT, - // buff_, this, 100, IPPROTO_UDP) - { } - - /// \brief Fetch completion callback - /// - /// This is the callback's operator() method which is called when the fetch - /// is complete. Check that the data received is the wire format of the - /// question, then send back an arbitrary response. - void operator()(IOFetch::Result result) { - EXPECT_EQ(expected_, result); // Check correct result returned - EXPECT_FALSE(run_); // Check it is run only once - run_ = true; // Note success - service_.stop(); // ... and exit run loop - } - - /// \brief Response handler, pretending to be remote DNS server - /// - /// This checks that the data sent is what we expected to receive, and - /// sends back a test answer. - void respond(udp::endpoint* remote, udp::socket* socket, - asio::error_code ec = asio::error_code(), size_t length = 0) { - + TEST_PORT, result_buff_, this, 100), + tcp_fetch_(IOFetch::TCP, service_, question_, IOAddress(TEST_HOST), + TEST_PORT, result_buff_, this, 1000), + protocol_(IOFetch::TCP) // for initialization - will be changed + { // Construct the data buffer for question we expect to receive. - OutputBuffer msgbuf(512); Message msg(Message::RENDER); msg.setQid(0); msg.setOpcode(Opcode::QUERY()); msg.setRcode(Rcode::NOERROR()); msg.setHeaderFlag(Message::HEADERFLAG_RD); msg.addQuestion(question_); - MessageRenderer renderer(msgbuf); + MessageRenderer renderer(*msgbuf_); msg.toWire(renderer); + } + + /// \brief Read uint16_t from network-byte-order buffer + /// + /// Adapted from isc::dns::InputBuffer::readUint16(). + /// + /// \param data Pointer to at least two bytes of data which are in network + /// byte order. + /// + /// \return uint16_t value in host byte order. + uint16_t readUint16(const void* data) { + const uint8_t* cp = static_cast(data); + + uint16_t value = ((unsigned int)(cp[0])) << 8; + value |= ((unsigned int)(cp[1])); + + return (value); + } + + /// \brief Write uint16_t to network-byte-order buffer + /// + /// Adapted from isc::dns::OutputBuffer::writeUint16(). + /// + /// \param value The 16-bit integer to be written into the buffer. + /// \param data Pointer to buffer at least two bytes long + void writeUint16(uint16_t value, uint8_t* data) { + data[0] = static_cast((value & 0xff00U) >> 8); + data[1] = static_cast(value & 0x00ffU); + } + + /// \brief UDP Response handler (the "remote UDP DNS server") + /// + /// When IOFetch is sending data, this response handler emulates the remote + /// DNS server. It checks that the data sent by the IOFetch object is what + /// was expected to have been sent, then sends back a known buffer of data. + /// + /// \param remote Endpoint to which to send the answer + /// \param socket Socket to use to send the answer + /// \param ec ASIO error code, completion code of asynchronous I/O issued + /// by the "server" to receive data. + /// \param length Amount of data received. + void udpReceiveHandler(udp::endpoint* remote, udp::socket* socket, + error_code ec = error_code(), size_t length = 0) { // The QID in the incoming data is random so set it to 0 for the - // data comparison check. (It was set to 0 when the buffer containing - // the expected data was constructed above.) + // data comparison check. (It is set to 0 in the buffer containing + // the expected data.) server_buff_[0] = server_buff_[1] = 0; - // Check that lengths are identical. - EXPECT_EQ(msgbuf.getLength(), length); - EXPECT_TRUE(memcmp(msgbuf.getData(), server_buff_, length) == 0); + // Check that length of the received data and the expected data are + // identical, then check that the data is identical as well. + EXPECT_EQ(msgbuf_->getLength(), length); + EXPECT_TRUE(memcmp(msgbuf_->getData(), server_buff_, length) == 0); - // ... and return a message back. + // Return a message back to the IOFetch object. socket->send_to(asio::buffer(TEST_DATA, sizeof TEST_DATA), *remote); } + + /// \brief Completion Handler for accepting TCP data + /// + /// Called when the remote system connects to the "server". It issues + /// an asynchronous read on the socket to read data. + /// + /// \param socket Socket on which data will be received + /// \param ec Boost error code, value should be zero. + void tcpAcceptHandler(tcp::socket* socket, error_code ec = error_code()) + { + std::cerr << "TCP Accept Handler\n"; + EXPECT_EQ(0, ec.value()); // Expect no error + + // Initiate a read on the socket + socket->async_receive(asio::buffer(server_buff_, sizeof(server_buff_)), + boost::bind(&IOFetchTest::tcpReceiveHandler, this, socket, _1, _2)); + } + + /// \brief Completion handler for receiving TCP data + /// + /// When IOFetch is sending data, this response handler emulates the remote + /// DNS server. It checks that the data sent by the IOFetch object is what + /// was expected to have been sent, then sends back a known buffer of data. + /// + /// \param socket Socket to use to send the answer + /// \param ec ASIO error code, completion code of asynchronous I/O issued + /// by the "server" to receive data. + /// \param length Amount of data received. + void tcpReceiveHandler(tcp::socket* socket, error_code ec = error_code(), + size_t length = 0) + { + std::cerr << "TCP Receive Handler\n"; + // TODO - need to loop until all the data is received. + + // Interpret the received data. The first two bytes, when converted + // to host byte order, are the count of the length of the message. + EXPECT_GE(2, length); + uint16_t dns_length = readUint16(server_buff_); + EXPECT_EQ(length, dns_length + 2); + + // Check that length of the DNS message received is that expected. + EXPECT_EQ(msgbuf_->getLength(), dns_length); + + // Compare buffers, zeroing the QID in the received buffer to match + // that set in our expected question. Note that due to the length + // field the QID in the received buffer is in the thrid and fourth + // bytes. + server_buff_[2] = server_buff_[3] = 0; + EXPECT_TRUE(memcmp(msgbuf_->getData(), server_buff_ + 2, dns_length) == 0); + + // ... and return a message back. This has to be preceded by a two-byte + // count field. It's simpler to do this as two writes - it shouldn't + // make any difference to the IOFetch object. + uint8_t count[2]; + writeUint16(sizeof(TEST_DATA), count); + socket->async_send(asio::buffer(count, 2), + boost::bind(&IOFetchTest::tcpSendHandler, this, + sizeof(count), _1, _2)); + socket->async_send(asio::buffer(TEST_DATA, sizeof(TEST_DATA)), + boost::bind(&IOFetchTest::tcpSendHandler, this, + sizeof(count), _1, _2)); + } + + /// \brief Completion Handler for Sending TCP data + /// + /// Called when the asynchronous send of data back to the IOFetch object + /// by the TCP "server" in this class has completed. (This send has to + /// be asynchronous because control needs to return to the caller in order + /// for the IOService "run()" method to be called to run the handlers.) + /// + /// \param expected Number of bytes that were expected to have been sent. + /// \param ec Boost error code, value should be zero. + /// \param length Number of bytes sent. + void tcpSendHandler(size_t expected = 0, error_code ec = error_code(), + size_t length = 0) + { + std::cerr << "TCP Send Handler\n"; + EXPECT_EQ(0, ec.value()); // Expect no error + EXPECT_EQ(expected, length); // And that amount sent is as expected + } + + /// \brief Fetch completion callback + /// + /// This is the callback's operator() method which is called when the fetch + /// is complete. It checks that the data received is the wire format of the + /// data sent back by the server. + /// + /// \param result Result indicated by the callback + void operator()(IOFetch::Result result) { + std::cerr << "Fetch completion\n"; + EXPECT_EQ(expected_, result); // Check correct result returned + EXPECT_FALSE(run_); // Check it is run only once + run_ = true; // Note success + + // If the expected result for SUCCESS, then this should have been called + // when one of the "servers" in this class has sent back the TEST_DATA. + // Check the data is as expected/ + if (expected_ == IOFetch::SUCCESS) { + size_t offset = 0; // Offset into start of buffer of data + if (protocol_ == IOFetch::UDP) { + + // Check the length of data received against the amount expected. + EXPECT_EQ(sizeof(TEST_DATA), result_buff_->getLength()); + + } else { + + // Check the length of data received against the amount expected + EXPECT_EQ(sizeof(TEST_DATA) + 2, result_buff_->getLength()); + + // Check the count field. This should be equal to the total + // length of the packet less 2 (the count field is equal to + // the total length of the message less the count field itself - + // RFC 1035, section 4.2.2). + uint16_t count = readUint16(result_buff_->getData()); + EXPECT_EQ(result_buff_->getLength(), count + 2); + + // Update offset and count for the content check. + offset += 2; + } + const void* start = static_cast( + static_cast(result_buff_->getData()) + offset); + EXPECT_TRUE(memcmp(TEST_DATA, start, sizeof(TEST_DATA)) == 0); + } + + // ... and cause the run loop to exit. + service_.stop(); + } + + // The next set of methods are the tests themselves. A number of the TCP + // and UDP tests are very similar. + + /// \brief Check for stop() + /// + /// Test that when we run the query and stop it after it was run, it returns + /// "stopped" correctly. (That is why stop() is posted to the service_ as + /// well instead of calling it.) + /// + /// \param protocol Test protocol + /// \param fetch Fetch object being tested + void stopTest(IOFetch::Protocol protocol, IOFetch& fetch) { + protocol_ = protocol; + expected_ = IOFetch::STOPPED; + + // Post the query + service_.get_io_service().post(fetch); + + // Post query_.stop() (yes, the boost::bind thing is just + // query_.stop()). + service_.get_io_service().post( + boost::bind(&IOFetch::stop, fetch, IOFetch::STOPPED)); + + // Run both of them. run() returns when everything in the I/O service + // queue has completed. + service_.run(); + EXPECT_TRUE(run_); + } + + /// \brief Premature stop test + /// + /// Test that when we queue the query to service_ and call stop() before it + /// gets executed, it acts sanely as well (eg. has the same result as + /// running stop() after - calls the callback). + /// + /// \param protocol Test protocol + /// \param fetch Fetch object being tested + void prematureStopTest(IOFetch::Protocol protocol, IOFetch& fetch) { + protocol_ = protocol; + expected_ = IOFetch::STOPPED; + + // Stop before it is started + fetch.stop(); + service_.get_io_service().post(fetch); + + service_.run(); + EXPECT_TRUE(run_); + } + + /// \brief Timeout test + /// + /// Test that fetch times out when no answer arrives. + /// + /// \param protocol Test protocol + /// \param fetch Fetch object being tested + void timeoutTest(IOFetch::Protocol protocol, IOFetch& fetch) { + protocol_ = protocol; + expected_ = IOFetch::TIME_OUT; + + service_.get_io_service().post(fetch); + service_.run(); + EXPECT_TRUE(run_); + } }; -/// Test that when we run the query and stop it after it was run, -/// it returns "stopped" correctly. -/// -/// That is why stop() is posted to the service_ as well instead -/// of calling it. +/// UDP Stop test - see IOFetchTest::stopTest() header. TEST_F(IOFetchTest, UdpStop) { - expected_ = IOFetch::STOPPED; - - // Post the query - service_.get_io_service().post(udp_fetch_); - - // Post query_.stop() (yes, the boost::bind thing is just - // query_.stop()). - service_.get_io_service().post( - boost::bind(&IOFetch::stop, udp_fetch_, IOFetch::STOPPED)); - - // Run both of them. run() returns when everything in the I/O service - // queue has completed. - service_.run(); - EXPECT_TRUE(run_); + stopTest(IOFetch::UDP, udp_fetch_); } -// Test that when we queue the query to service_ and call stop() before it gets -// executed, it acts sanely as well (eg. has the same result as running stop() -// after - calls the callback). +/// UDP premature stop test - see IOFetchTest::prematureStopTest() header. TEST_F(IOFetchTest, UdpPrematureStop) { - expected_ = IOFetch::STOPPED; - - // Stop before it is started - udp_fetch_.stop(); - service_.get_io_service().post(udp_fetch_); - - service_.run(); - EXPECT_TRUE(run_); + prematureStopTest(IOFetch::UDP, udp_fetch_); } -// Test that it will timeout when no answer arrives. +/// UDP premature stop test - see IOFetchTest::timeoutTest() header. TEST_F(IOFetchTest, UdpTimeout) { - expected_ = IOFetch::TIME_OUT; - - service_.get_io_service().post(udp_fetch_); - service_.run(); - EXPECT_TRUE(run_); + timeoutTest(IOFetch::UDP, udp_fetch_); } -// Test that it will succeed when we fake an answer and stores the same data we -// send. This is done through a real socket on the loopback address. -TEST_F(IOFetchTest, UdpReceive) { +// UDP SendReceive test. Set up a UDP server then ports a UDP fetch object. +// This will send question_ to the server and receive the answer back from it. +TEST_F(IOFetchTest, UdpSendReceive) { + protocol_ = IOFetch::UDP; expected_ = IOFetch::SUCCESS; udp::socket socket(service_.get_io_service(), udp::v4()); @@ -174,15 +367,56 @@ TEST_F(IOFetchTest, UdpReceive) { udp::endpoint remote; socket.async_receive_from(asio::buffer(server_buff_, sizeof(server_buff_)), remote, - boost::bind(&IOFetchTest::respond, this, &remote, &socket, _1, _2)); + boost::bind(&IOFetchTest::udpReceiveHandler, this, &remote, &socket, + _1, _2)); service_.get_io_service().post(udp_fetch_); service_.run(); socket.close(); - EXPECT_TRUE(run_); - ASSERT_EQ(sizeof TEST_DATA, buff_->getLength()); - EXPECT_EQ(0, memcmp(TEST_DATA, buff_->getData(), sizeof TEST_DATA)); + EXPECT_TRUE(run_);; +} + +// Do the same tests for TCP transport + +TEST_F(IOFetchTest, TcpStop) { + stopTest(IOFetch::TCP, tcp_fetch_); +} + +TEST_F(IOFetchTest, TcpPrematureStop) { + prematureStopTest(IOFetch::TCP, tcp_fetch_); +} + +TEST_F(IOFetchTest, TcpTimeout) { + timeoutTest(IOFetch::TCP, tcp_fetch_); +} + +TEST_F(IOFetchTest, TcpSendReceive) { + protocol_ = IOFetch::TCP; + expected_ = IOFetch::SUCCESS; + + std::cerr << "Creating socket\n"; + // Socket into which the connection will be accepted + tcp::socket socket(service_.get_io_service()); + + std::cerr << "Creating acceptor\n"; + // Acceptor object - called when the connection is made, the handler will + // initiate a read on the socket. + tcp::acceptor acceptor(service_.get_io_service(), + tcp::endpoint(tcp::v4(), TEST_PORT)); + std::cerr << "Issuing async accept call\n"; + acceptor.async_accept(socket, + boost::bind(&IOFetchTest::tcpAcceptHandler, this, &socket, _1)); + + // Post the TCP fetch object to send the query and receive the response. + std::cerr << "Posting TCP fetch\n"; + service_.get_io_service().post(tcp_fetch_); + + // ... and execute all the callbacks. This exits when the fetch completes. + service_.run(); + EXPECT_TRUE(run_); // Make sure the callback did execute + + socket.close(); } } // namespace asiolink diff --git a/src/lib/asiolink/tests/tcp_endpoint_unittest.cc b/src/lib/asiolink/tests/tcp_endpoint_unittest.cc new file mode 100644 index 0000000000..3787e1c152 --- /dev/null +++ b/src/lib/asiolink/tests/tcp_endpoint_unittest.cc @@ -0,0 +1,55 @@ +// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#include + +#include + +#include + +#include +#include +#include + +using namespace asiolink; +using namespace std; + +// This test checks that the endpoint can manage its own internal +// asio::ip::tcp::endpoint object. + +TEST(TCPEndpointTest, v4Address) { + const string test_address("192.0.2.1"); + const unsigned short test_port = 5301; + + IOAddress address(test_address); + TCPEndpoint endpoint(address, test_port); + + EXPECT_TRUE(address == endpoint.getAddress()); + EXPECT_EQ(test_port, endpoint.getPort()); + EXPECT_EQ(IPPROTO_TCP, endpoint.getProtocol()); + EXPECT_EQ(AF_INET, endpoint.getFamily()); +} + +TEST(TCPEndpointTest, v6Address) { + const string test_address("2001:db8::1235"); + const unsigned short test_port = 5302; + + IOAddress address(test_address); + TCPEndpoint endpoint(address, test_port); + + EXPECT_TRUE(address == endpoint.getAddress()); + EXPECT_EQ(test_port, endpoint.getPort()); + EXPECT_EQ(IPPROTO_TCP, endpoint.getProtocol()); + EXPECT_EQ(AF_INET6, endpoint.getFamily()); +} diff --git a/src/lib/asiolink/tests/tcp_socket_unittest.cc b/src/lib/asiolink/tests/tcp_socket_unittest.cc new file mode 100644 index 0000000000..d37f23677d --- /dev/null +++ b/src/lib/asiolink/tests/tcp_socket_unittest.cc @@ -0,0 +1,349 @@ +// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +/// \brief Test of TCPSocket +/// +/// Tests the fuctionality of a TCPSocket by working through an open-send- +/// receive-close sequence and checking that the asynchronous notifications +/// work. + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include + +#include + +#include +#include +#include + +using namespace asio; +using namespace asio::ip; +using namespace asiolink; +using namespace std; + +namespace { + +const char SERVER_ADDRESS[] = "127.0.0.1"; +const unsigned short SERVER_PORT = 5303; + +// TODO: Shouldn't we send something that is real message? +const char OUTBOUND_DATA[] = "Data sent from client to server"; +const char INBOUND_DATA[] = "Returned data from server to client"; +} + +/// +/// An instance of this object is passed to the asynchronous I/O functions +/// and the operator() method is called when when an asynchronous I/O +/// completes. The arguments to the completion callback are stored for later +/// retrieval. +class TCPCallback { +public: + /// \brief Operations the server is doing + enum Operation { + ACCEPT = 0, ///< accept() was issued + OPEN = 1, /// Client connected to server + READ = 2, ///< Asynchronous read completed + WRITE = 3, ///< Asynchronous write completed + NONE = 4 ///< "Not set" state + }; + + /// \brief Minimim size of buffers + enum { + MIN_SIZE = 4096 + }; + + struct PrivateData { + PrivateData() : + error_code_(), length_(0), name_(""), queued_(NONE), called_(NONE) + {} + + asio::error_code error_code_; ///< Completion error code + size_t length_; ///< Number of bytes transferred + std::string name_; ///< Which of the objects this is + Operation queued_; ///< Queued operation + Operation called_; ///< Which callback called + }; + + /// \brief Constructor + /// + /// Constructs the object. It also creates the data member pointed to by + /// a shared pointer. When used as a callback object, this is copied as it + /// is passed into the asynchronous function. This means that there are two + /// objects and inspecting the one we passed in does not tell us anything. + /// + /// Therefore we use a boost::shared_ptr. When the object is copied, the + /// shared pointer is copied, which leaves both objects pointing to the same + /// data. + /// + /// \param which Which of the two callback objects this is + TCPCallback(std::string which) : ptr_(new PrivateData()) + { + setName(which); + } + + /// \brief Destructor + /// + /// No code needed, destroying the shared pointer destroys the private data. + virtual ~TCPCallback() + {} + + /// \brief Client Callback Function + /// + /// Called when an asynchronous connect is completed by the client, this + /// stores the origin of the operation in the client_called_ data member. + /// + /// \param ec I/O completion error code passed to callback function. + /// \param length Number of bytes transferred + void operator()(asio::error_code ec = asio::error_code(), + size_t length = 0) + { + setCode(ec.value()); + setCalled(getQueued()); + setLength(length); + } + + /// \brief Get I/O completion error code + int getCode() { + return (ptr_->error_code_.value()); + } + + /// \brief Set I/O completion code + /// + /// \param code New value of completion code + void setCode(int code) { + ptr_->error_code_ = asio::error_code(code, asio::error_code().category()); + } + + /// \brief Get number of bytes transferred in I/O + size_t getLength() { + return (ptr_->length_); + } + + /// \brief Set number of bytes transferred in I/O + /// + /// \param length New value of length parameter + void setLength(size_t length) { + ptr_->length_ = length; + } + + /// \brief Get flag to say what was queued + Operation getQueued() { + return (ptr_->queued_); + } + + /// \brief Set flag to say what is being queued + /// + /// \param called New value of queued parameter + void setQueued(Operation queued) { + ptr_->queued_ = queued; + } + + /// \brief Get flag to say when callback was called + Operation getCalled() { + return (ptr_->called_); + } + + /// \brief Set flag to say when callback was called + /// + /// \param called New value of called parameter + void setCalled(Operation called) { + ptr_->called_ = called; + } + + /// \brief Return instance of callback name + std::string getName() { + return (ptr_->name_); + } + + /// \brief Set callback name + /// + /// \param name New value of the callback name + void setName(const std::string& name) { + ptr_->name_ = name; + } + +private: + boost::shared_ptr ptr_; ///< Pointer to private data +}; + +// TODO: Need to add a test to check the cancel() method + +// Tests the operation of a TCPSocket by opening it, sending an asynchronous +// message to a server, receiving an asynchronous message from the server and +// closing. +TEST(TCPSocket, SequenceTest) { + + // Common objects. + IOService service; // Service object for async control + + // Server + IOAddress server_address(SERVER_ADDRESS); + // Address of target server + TCPCallback server_cb("Server"); // Server callback + TCPEndpoint server_endpoint(server_address, SERVER_PORT); + // Endpoint describing server + TCPEndpoint server_remote_endpoint; // Address where server received message from + tcp::socket server_socket(service.get_io_service()); + // Socket used for server + char server_data[TCPCallback::MIN_SIZE]; + // Data received by server + ASSERT_GT(sizeof(server_data), sizeof(OUTBOUND_DATA)); + // Make sure it's large enough + + // The client - the TCPSocket being tested + TCPSocket client(service);// Socket under test + TCPCallback client_cb("Client"); // Async I/O callback function + TCPEndpoint client_remote_endpoint; // Where client receives message from + char client_data[TCPCallback::MIN_SIZE]; + // Data received by client + ASSERT_GT(sizeof(client_data), sizeof(OUTBOUND_DATA)); + // Make sure it's large enough + //size_t client_cumulative = 0; // Cumulative data received + + // The server - with which the client communicates. For convenience, we + // use the same io_service, and use the endpoint object created for + // the client to send to as the endpoint object in the constructor. + + std::cerr << "Setting up acceptor\n"; + // Set up the server to accept incoming connections. + server_cb.setQueued(TCPCallback::ACCEPT); + server_cb.setCalled(TCPCallback::NONE); + server_cb.setCode(42); // Some error + tcp::acceptor acceptor(service.get_io_service(), + tcp::endpoint(tcp::v4(), SERVER_PORT)); + acceptor.set_option(tcp::acceptor::reuse_address(true)); + acceptor.async_accept(server_socket, server_cb); + + std::cerr << "Setting up client\n"; + // Open the client socket - the operation should be asynchronous + client_cb.setQueued(TCPCallback::OPEN); + client_cb.setCalled(TCPCallback::NONE); + client_cb.setCode(43); // Some error + EXPECT_FALSE(client.isOpenSynchronous()); + client.open(&server_endpoint, client_cb); + + // Run the open and the accept callback and check that they ran. + service.run_one(); + service.run_one(); + + EXPECT_EQ(TCPCallback::ACCEPT, server_cb.getCalled()); + EXPECT_EQ(0, server_cb.getCode()); + EXPECT_EQ(TCPCallback::OPEN, client_cb.getCalled()); + EXPECT_EQ(0, client_cb.getCode()); + + // Write something to the server using the client and read it on ther server. + server_cb.setCalled(TCPCallback::NONE); + server_cb.setQueued(TCPCallback::READ); + server_cb.setCode(142); // Arbitrary number + server_cb.setLength(0); + server_socket.async_receive(buffer(server_data, sizeof(server_data)), server_cb); + + client_cb.setCalled(TCPCallback::NONE); + client_cb.setQueued(TCPCallback::WRITE); + client_cb.setCode(143); // Arbitrary number + client_cb.setLength(0); + client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb); + + // Run the write and read callback and check they ran + service.run_one(); + service.run_one(); + + // Check lengths. As the client wrote the buffer, currently two bytes + // will be prepended by the client containing the length. + EXPECT_EQ(TCPCallback::READ, server_cb.getCalled()); + EXPECT_EQ(0, server_cb.getCode()); + EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, server_cb.getLength()); + + EXPECT_EQ(TCPCallback::WRITE, client_cb.getCalled()); + EXPECT_EQ(0, client_cb.getCode()); + EXPECT_EQ(sizeof(OUTBOUND_DATA) + 2, client_cb.getLength()); + + // Check that the first two bytes of the buffer are in fact the remaining + // length of the buffer (code copied from isc::dns::buffer.h) + uint16_t count = ((unsigned int)(server_data[0])) << 8; + count |= ((unsigned int)(server_data[1])); + EXPECT_EQ(sizeof(OUTBOUND_DATA), count); + + // ... and check data received by server is what we expect + EXPECT_TRUE(equal(&server_data[2], &server_data[server_cb.getLength() - 1], + OUTBOUND_DATA)); + + // TODO: Complete this server test + // TODO: Add in loop for server to read data - read 2 bytes, then as much as needed + + /* + // Now return data from the server to the client. Issue the read on the + // client. + client_cb.setCalled(TCPCallback::NONE); + client_cb.setQueued(TCPCallback::READ); + client_cb.setCode(143); // Arbitrary number + client_cb.setLength(0); + client.asyncReceive(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &server_endpoint, client_cb); + + client_cb.setLength(12345); // Arbitrary number + client_cb.setCalled(false); + client_cb.setCode(32); // Arbitrary number + client.asyncReceive(data, sizeof(data), client_cumulative, + &client_remote_endpoint, client_cb); + + // Issue the write on the server side to the source of the data it received. + server_cb.setLength(22345); // Arbitrary number + server_cb.setCalled(false); + server_cb.setCode(232); // Arbitrary number + server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)), + server_remote_endpoint.getASIOEndpoint(), server_cb); + + // Expect two callbacks to run + service.get_io_service().poll(); + //service.run_one(); + + EXPECT_TRUE(client_cb.getCalled()); + EXPECT_EQ(0, client_cb.getCode()); + EXPECT_EQ(sizeof(INBOUND_DATA), client_cb.getLength()); + + EXPECT_TRUE(server_cb.getCalled()); + EXPECT_EQ(0, server_cb.getCode()); + EXPECT_EQ(sizeof(INBOUND_DATA), server_cb.getLength()); + + EXPECT_TRUE(equal(&data[0], &data[server_cb.getLength() - 1], INBOUND_DATA)); + + // Check that the address/port received by the client corresponds to the + // address and port the server is listening on. + EXPECT_TRUE(server_address == client_remote_endpoint.getAddress()); + EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort()); + + // Finally, check that the receive received a complete buffer's worth of data. + EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(), + client_cumulative)); + EXPECT_EQ(client_cb.getLength(), client_cumulative); + + // Close client and server. + EXPECT_NO_THROW(client.close()); + EXPECT_NO_THROW(server.close()); + * */ +} diff --git a/src/lib/asiolink/tests/udp_socket_unittest.cc b/src/lib/asiolink/tests/udp_socket_unittest.cc index 7332d29ffe..7b81a6205a 100644 --- a/src/lib/asiolink/tests/udp_socket_unittest.cc +++ b/src/lib/asiolink/tests/udp_socket_unittest.cc @@ -12,21 +12,6 @@ // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR // PERFORMANCE OF THIS SOFTWARE. -// Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC") -// -// Permission to use, copy, modify, and/or distribute this software for any -// purpose with or without fee is hereby granted, provided that the above -// copyright notice and this permission notice appear in all copies. -// -// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH -// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY -// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, -// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM -// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE -// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -// PERFORMANCE OF THIS SOFTWARE. - - /// \brief Test of UDPSocket /// /// Tests the fuctionality of a UDPSocket by working through an open-send- @@ -208,11 +193,12 @@ TEST(UDPSocket, SequenceTest) { server.set_option(socket_base::reuse_address(true)); // Assertion to ensure that the server buffer is large enough - char data[UDPSocket::MAX_SIZE]; + char data[UDPSocket::MIN_SIZE]; ASSERT_GT(sizeof(data), sizeof(OUTBOUND_DATA)); // Open the client socket - the operation should be synchronous - EXPECT_FALSE(client.open(&server_endpoint, client_cb)); + EXPECT_TRUE(client.isOpenSynchronous()); + client.open(&server_endpoint, client_cb); // Issue read on the server. Completion callback should not have run. server_cb.setCalled(false); diff --git a/src/lib/asiolink/udp_endpoint.h b/src/lib/asiolink/udp_endpoint.h index 0958af6e4d..99dc27ffee 100644 --- a/src/lib/asiolink/udp_endpoint.h +++ b/src/lib/asiolink/udp_endpoint.h @@ -64,6 +64,17 @@ public: asio_endpoint_placeholder_(NULL), asio_endpoint_(asio_endpoint) {} + /// \brief Constructor from an ASIO UDP endpoint. + /// + /// This constructor is designed to be an efficient wrapper for the + /// corresponding ASIO class, \c udp::endpoint. + /// + /// \param asio_endpoint The ASIO representation of the TCP endpoint. + UDPEndpoint(const asio::ip::udp::endpoint& asio_endpoint) : + asio_endpoint_placeholder_(new asio::ip::udp::endpoint(asio_endpoint)), + asio_endpoint_(*asio_endpoint_placeholder_) + {} + /// \brief The destructor. virtual ~UDPEndpoint() { delete asio_endpoint_placeholder_; } //@} diff --git a/src/lib/asiolink/udp_socket.h b/src/lib/asiolink/udp_socket.h index 56a9bb0972..0df6fba175 100644 --- a/src/lib/asiolink/udp_socket.h +++ b/src/lib/asiolink/udp_socket.h @@ -48,7 +48,7 @@ private: public: enum { - MAX_SIZE = 4096 // Send and receive size + MIN_SIZE = 4096 // Minimum send and receive size }; /// \brief Constructor from an ASIO UDP socket. @@ -79,24 +79,26 @@ public: return (IPPROTO_UDP); } + /// \brief Is "open()" synchronous? + /// + /// Indicates that the opening of a UDP socket is synchronous. + virtual bool isOpenSynchronous() const { + return true; + } + /// \brief Open Socket /// - /// Opens the UDP socket. In the model for transport-layer agnostic I/O, - /// an "open" operation includes a connection to the remote end (which - /// may take time). This does not happen for UDP, so the method returns - /// "false" to indicate that the operation completed synchronously. + /// Opens the UDP socket. This is a synchronous operation. /// /// \param endpoint Endpoint to which the socket will connect to. /// \param callback Unused. - /// - /// \return false to indicate that the "operation" completed synchronously. - virtual bool open(const IOEndpoint* endpoint, C&); + virtual void open(const IOEndpoint* endpoint, C&); /// \brief Send Asynchronously /// - /// This corresponds to async_send_to() for UDP sockets and async_send() - /// for TCP. In both cases an endpoint argument is supplied indicating the - /// target of the send - this is ignored for TCP. + /// Calls the underlying socket's async_send_to() method to send a packet of + /// data asynchronously to the remote endpoint. The callback will be called + /// on completion. /// /// \param data Data to send /// \param length Length of data to send @@ -107,19 +109,17 @@ public: /// \brief Receive Asynchronously /// - /// This correstponds to async_receive_from() for UDP sockets and - /// async_receive() for TCP. In both cases, an endpoint argument is - /// supplied to receive the source of the communication. For TCP it will - /// be filled in with details of the connection. + /// Calls the underlying socket's async_receive_from() method to read a + /// packet of data from a remote endpoint. Arrival of the data is + /// signalled via a call to the callback function. /// /// \param data Buffer to receive incoming message /// \param length Length of the data buffer - /// \param cumulative Amount of data that should already be in the buffer. - /// (This is ignored - every UPD receive fills the buffer from the start.) + /// \param offset Offset into buffer where data is to be put /// \param endpoint Source of the communication /// \param callback Callback object - virtual void asyncReceive(void* data, size_t length, size_t cumulative, - IOEndpoint* endpoint, C& callback); + virtual void asyncReceive(void* data, size_t length, size_t offset, + IOEndpoint* endpoint, C& callback); /// \brief Checks if the data received is complete. /// @@ -133,7 +133,7 @@ public: /// I/O. On output, the total amount of data received to date. /// /// \return true if the receive is complete, false if another receive is - /// needed. + /// needed. Always true for a UDP socket. virtual bool receiveComplete(void*, size_t length, size_t& cumulative) { cumulative = length; return (true); @@ -180,10 +180,9 @@ UDPSocket::~UDPSocket() delete socket_ptr_; } -// Open the socket. Throws an error on failure -// TODO: Make the open more resilient +// Open the socket. -template bool +template void UDPSocket::open(const IOEndpoint* endpoint, C&) { // Ignore opens on already-open socket. Don't throw a failure because @@ -203,21 +202,18 @@ UDPSocket::open(const IOEndpoint* endpoint, C&) { asio::ip::udp::socket::send_buffer_size snd_size; socket_.get_option(snd_size); - if (snd_size.value() < MAX_SIZE) { - snd_size = MAX_SIZE; + if (snd_size.value() < MIN_SIZE) { + snd_size = MIN_SIZE; socket_.set_option(snd_size); } asio::ip::udp::socket::receive_buffer_size rcv_size; socket_.get_option(rcv_size); - if (rcv_size.value() < MAX_SIZE) { - rcv_size = MAX_SIZE; + if (rcv_size.value() < MIN_SIZE) { + rcv_size = MIN_SIZE; socket_.set_option(rcv_size); } } - - // Nothing was done asynchronously, so tell the caller that. - return (false); } // Send a message. Should never do this if the socket is not open, so throw @@ -225,7 +221,7 @@ UDPSocket::open(const IOEndpoint* endpoint, C&) { template void UDPSocket::asyncSend(const void* data, size_t length, - const IOEndpoint* endpoint, C& callback) + const IOEndpoint* endpoint, C& callback) { if (isopen_) { @@ -252,8 +248,8 @@ UDPSocket::asyncSend(const void* data, size_t length, // the need for the socket to be open. template void -UDPSocket::asyncReceive(void* data, size_t length, size_t, - IOEndpoint* endpoint, C& callback) +UDPSocket::asyncReceive(void* data, size_t length, size_t offset, + IOEndpoint* endpoint, C& callback) { if (isopen_) { @@ -261,7 +257,15 @@ UDPSocket::asyncReceive(void* data, size_t length, size_t, assert(endpoint->getProtocol() == IPPROTO_UDP); UDPEndpoint* udp_endpoint = static_cast(endpoint); - socket_.async_receive_from(asio::buffer(data, length), + // Ensure we can write into the buffer + if (offset >= length) { + isc_throw(BufferOverflow, "attempt to read into area beyond end of " + "UDP receive buffer"); + } + void* buffer_start = static_cast(static_cast(data) + offset); + + // Issue the read + socket_.async_receive_from(asio::buffer(buffer_start, length - offset), udp_endpoint->getASIOEndpoint(), callback); } else { isc_throw(SocketNotOpen,