diff --git a/src/lib/dhcp_ddns/ncr_io.cc b/src/lib/dhcp_ddns/ncr_io.cc index 7e7174ac00..22fa7ac12b 100644 --- a/src/lib/dhcp_ddns/ncr_io.cc +++ b/src/lib/dhcp_ddns/ncr_io.cc @@ -23,7 +23,7 @@ namespace dhcp_ddns { NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) { if (boost::iequals(protocol_str, "UDP")) { return (NCR_UDP); - } + } if (boost::iequals(protocol_str, "TCP")) { return (NCR_TCP); @@ -162,10 +162,7 @@ NameChangeSender::NameChangeSender(RequestSendHandler& send_handler, send_queue_max_(send_queue_max) { // Queue size must be big enough to hold at least 1 entry. - if (send_queue_max == 0) { - isc_throw(NcrSenderError, "NameChangeSender constructor" - " queue size must be greater than zero"); - } + setQueueMaxSize(send_queue_max); } void @@ -318,5 +315,57 @@ NameChangeSender::clearSendQueue() { send_queue_.clear(); } +void +NameChangeSender::setQueueMaxSize(const size_t new_max) { + if (new_max == 0) { + isc_throw(NcrSenderError, "NameChangeSender:" + " queue size must be greater than zero"); + } + + send_queue_max_ = new_max; + +} +const NameChangeRequestPtr& +NameChangeSender::peekAt(const size_t index) const { + if (index >= getQueueSize()) { + isc_throw(NcrSenderError, + "NameChangeSender::peekAt peek beyond end of queue attempted" + << " index: " << index << " queue size: " << getQueueSize()); + } + + return (send_queue_.at(index)); +} + + +void +NameChangeSender::assumeQueue(NameChangeSender& sourceSender) { + if (sourceSender.amSending()) { + isc_throw(NcrSenderError, "Cannot assume queue:" + " source sender is actively sending"); + } + + if (amSending()) { + isc_throw(NcrSenderError, "Cannot assume queue:" + " target sender is actively sending"); + } + + if (getQueueMaxSize() < sourceSender.getQueueSize()) { + isc_throw(NcrSenderError, "Cannot assume queue:" + " source queue count exceeds target queue max"); + } + + if (send_queue_.size() != 0) { + isc_throw(NcrSenderError, "Cannot assume queue:" + " target queue is not empty"); + } + + send_queue_.swap(sourceSender.getSendQueue()); +} + +int +NameChangeSender::getSelectFd() { + isc_throw(NotImplemented, "NameChangeSender::getSelectFd is not supported"); +} + } // namespace isc::dhcp_ddns } // namespace isc diff --git a/src/lib/dhcp_ddns/ncr_io.h b/src/lib/dhcp_ddns/ncr_io.h index a5e513a31e..10d39d0a38 100644 --- a/src/lib/dhcp_ddns/ncr_io.h +++ b/src/lib/dhcp_ddns/ncr_io.h @@ -547,6 +547,36 @@ public: /// capacity. void sendRequest(NameChangeRequestPtr& ncr); + /// @brief Move all queued requests from a given sender into the send queue + /// + /// Moves all of the entries in the given sender's queue and places them + /// into send queue. This provides a mechanism of reassigning queued + /// messages from one sender to another. This is useful for dealing with + /// dynamic configuration changes. + /// + /// @param Sender from whom the queued messages will be taken + /// + /// @throw NcrSenderError if either sender is in send mode, if the number of + /// messages in the source sender's queue is larger than this sender's + /// maxium queue size, or if this sender's queue is not empty. + void assumeQueue(NameChangeSender& fromSender); + + /// @brief Returns a file description suitable for use with select + /// + /// The value returned is an open file descriptor which can be used with + /// select() system call to monitor the sender for IO events. This allows + /// NameChangeSenders to be used in applications which use select, rather + /// than IOService to wait for IO events to occur. + /// + /// @note Attempting other use of this value may lead to unpredictable + /// behavior in the sender. + /// + /// @return Returns an "open" file descriptor + /// + /// @throw NcrSenderError if the sender is not in send mode, + /// NotImplemented if the implementation does not support such an fd. + virtual int getSelectFd(); + protected: /// @brief Dequeues and sends the next request on the send queue. /// @@ -659,11 +689,39 @@ public: return (send_queue_max_); } + /// @brief Sets the maxium queue size to the given value. + /// + /// Sets the maximum number of entries allowed in the queue to the + /// the given value. + /// + /// @param new_max the new value to use as the maximum + /// + /// @throw NcrSenderError if the value is less than one. + void setQueueMaxSize(const size_t new_max); + /// @brief Returns the number of entries currently in the send queue. size_t getQueueSize() const { return (send_queue_.size()); } + /// @brief Returns the entry at a given position in the queue. + /// + /// Note that the entry is not removed from the queue. + /// @param index the index of the entry in the queue to fetch. + /// Valid values are 0 (front of the queue) to (queue size - 1). + /// + /// @return Pointer reference to the queue entry. + /// + /// @throw NcrSenderError if the given index is beyond the + /// end of the queue. + const NameChangeRequestPtr& peekAt(const size_t index) const; + +protected: + /// @brief Returns a reference to the send queue. + SendQueue& getSendQueue() { + return (send_queue_); + } + private: /// @brief Sets the sending indicator to the given value. /// diff --git a/src/lib/dhcp_ddns/ncr_udp.cc b/src/lib/dhcp_ddns/ncr_udp.cc index 9e83f7ccb2..29d6e9458e 100644 --- a/src/lib/dhcp_ddns/ncr_udp.cc +++ b/src/lib/dhcp_ddns/ncr_udp.cc @@ -256,6 +256,8 @@ NameChangeUDPSender::open(isc::asiolink::IOService& io_service) { server_port_)); send_callback_->setDataSource(server_endpoint_); + + watch_socket_.reset(new WatchSocket()); } void @@ -282,6 +284,8 @@ NameChangeUDPSender::close() { } socket_.reset(); + + watch_socket_.reset(); } void @@ -298,11 +302,17 @@ NameChangeUDPSender::doSend(NameChangeRequestPtr& ncr) { // Call the socket's asychronous send, passing our callback socket_->asyncSend(send_callback_->getData(), send_callback_->getPutLen(), send_callback_->getDataSource().get(), *send_callback_); + + // Set IO ready marker so sender activity is visible to select() or poll(). + watch_socket_->markReady(); } void NameChangeUDPSender::sendCompletionHandler(const bool successful, const UDPCallback *send_callback) { + // Clear the IO ready marker. + watch_socket_->clearReady(); + Result result; if (successful) { result = SUCCESS; @@ -324,5 +334,17 @@ NameChangeUDPSender::sendCompletionHandler(const bool successful, // Call the application's registered request send handler. invokeSendHandler(result); } + +int +NameChangeUDPSender::getSelectFd() { + if (!amSending()) { + isc_throw(NotImplemented, "NameChangeUDPSender::getSelectFd" + " not in send mode"); + } + + return(watch_socket_->getSelectFd()); +} + + }; // end of isc::dhcp_ddns namespace }; // end of isc namespace diff --git a/src/lib/dhcp_ddns/ncr_udp.h b/src/lib/dhcp_ddns/ncr_udp.h index 7648a614e4..03ac5bf33f 100644 --- a/src/lib/dhcp_ddns/ncr_udp.h +++ b/src/lib/dhcp_ddns/ncr_udp.h @@ -112,10 +112,12 @@ #include #include #include +#include #include #include + /// responsibility of the completion handler to perform the steps necessary /// to interpret the raw data provided by the service outcome. The /// UDPCallback operator implementation is mostly a pass through. @@ -524,6 +526,21 @@ public: void sendCompletionHandler(const bool successful, const UDPCallback* send_callback); + /// @brief Returns a file description suitable for use with select + /// + /// The value returned is an open file descriptor which can be used with + /// select() system call to monitor the sender for IO events. This allows + /// NameChangeUDPSenders to be used in applications which use select, + /// rather than IOService to wait for IO events to occur. + /// + /// @note Attempting other use of this value may lead to unpredictable + /// behavior in the sender. + /// + /// @return Returns an "open" file descriptor + /// + /// @throw NcrSenderError if the sender is not in send mode, + virtual int getSelectFd(); + private: /// @brief IP address from which to send. isc::asiolink::IOAddress ip_address_; @@ -554,6 +571,8 @@ private: /// @brief Flag which enables the reuse address socket option if true. bool reuse_address_; + + WatchSocketPtr watch_socket_; }; } // namespace isc::dhcp_ddns diff --git a/src/lib/dhcp_ddns/tests/Makefile.am b/src/lib/dhcp_ddns/tests/Makefile.am index 5bfd8c1ef9..75bdcb12a4 100644 --- a/src/lib/dhcp_ddns/tests/Makefile.am +++ b/src/lib/dhcp_ddns/tests/Makefile.am @@ -29,6 +29,7 @@ TESTS += libdhcp_ddns_unittests libdhcp_ddns_unittests_SOURCES = run_unittests.cc libdhcp_ddns_unittests_SOURCES += ncr_unittests.cc libdhcp_ddns_unittests_SOURCES += ncr_udp_unittests.cc +libdhcp_ddns_unittests_SOURCES += test_utils.cc test_utils.h libdhcp_ddns_unittests_SOURCES += watch_socket_unittests.cc libdhcp_ddns_unittests_CPPFLAGS = $(AM_CPPFLAGS) $(GTEST_INCLUDES) $(LOG4CPLUS_INCLUDES) diff --git a/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc b/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc index 997ad4f65e..5f438c3f75 100644 --- a/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc +++ b/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -23,6 +24,8 @@ #include #include +#include + using namespace std; using namespace isc; using namespace isc::dhcp_ddns; @@ -68,6 +71,7 @@ const char *valid_msgs[] = }; const char* TEST_ADDRESS = "127.0.0.1"; +//const char* TEST_ADDRESS = "192.0.2.10"; const uint32_t LISTENER_PORT = 5301; const uint32_t SENDER_PORT = LISTENER_PORT+1; const long TEST_TIMEOUT = 5 * 1000; @@ -113,6 +117,7 @@ TEST(NameChangeUDPListenerBasicTest, basicListenTests) { // Verify that we can start listening. EXPECT_NO_THROW(listener->startListening(io_service)); + // Verify that we are in listening mode. EXPECT_TRUE(listener->amListening()); // Verify that a read is in progress. @@ -310,8 +315,8 @@ TEST(NameChangeUDPSenderBasicTest, constructionTests) { /// @brief Tests NameChangeUDPSender basic send functionality /// This test verifies that: TEST(NameChangeUDPSenderBasicTest, basicSendTests) { - isc::asiolink::IOAddress ip_address(TEST_ADDRESS); - uint32_t port = SENDER_PORT; + isc::asiolink::IOAddress ip_address("127.0.0.1"); + uint32_t port = 5301; isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler; @@ -320,7 +325,8 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) { // Create the sender, setting the queue max equal to the number of // messages we will have in the list. - NameChangeUDPSender sender(ip_address, port, ip_address, port, + isc::asiolink::IOAddress any("0.0.0.0"); + NameChangeUDPSender sender(any, 0, ip_address, port, FMT_JSON, ncr_handler, num_msgs); // Verify that we can start sending. @@ -341,30 +347,55 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) { EXPECT_NO_THROW(sender.startSending(io_service)); EXPECT_TRUE(sender.amSending()); + // Fetch the sender's select-fd. + int select_fd = sender.getSelectFd(); + + // Verify select_fd is valid and currently shows no ready to read. + ASSERT_NE(dhcp_ddns::WatchSocket::INVALID_SOCKET, select_fd); + ASSERT_EQ(0, selectCheck(select_fd)); + // Iterate over a series of messages, sending each one. Since we // do not invoke IOService::run, then the messages should accumulate // in the queue. NameChangeRequestPtr ncr; + NameChangeRequestPtr ncr2; for (int i = 0; i < num_msgs; i++) { ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); EXPECT_NO_THROW(sender.sendRequest(ncr)); // Verify that the queue count increments in step with each send. EXPECT_EQ(i+1, sender.getQueueSize()); + + // Verify that peekAt(i) returns the NCR we just added. + ASSERT_NO_THROW(ncr2 = sender.peekAt(i)); + ASSERT_TRUE(ncr2); + EXPECT_TRUE(*ncr == *ncr2); } + // Verify that attempting to peek beyond the end of the queue, throws. + ASSERT_THROW(sender.peekAt(sender.getQueueSize()+1), NcrSenderError); + // Verify that attempting to send an additional message results in a // queue full exception. EXPECT_THROW(sender.sendRequest(ncr), NcrSenderQueueFull); - // Loop for the number of valid messages and invoke IOService::run_one. - // This should send exactly one message and the queue count should - // decrement accordingly. + // Loop for the number of valid messages. So long as there is at least + // on NCR in the queue, select-fd indicate ready to read. Invoke + // IOService::run_one. This should complete the send of exactly one + // message and the queue count should decrement accordingly. for (int i = num_msgs; i > 0; i--) { + // Verify that sender shows IO ready. + ASSERT_TRUE(selectCheck(select_fd) > 0); + + // Execute at one ready handler. io_service.run_one(); + // Verify that the queue count decrements in step with each run. EXPECT_EQ(i-1, sender.getQueueSize()); } + // Verify that sender shows no IO ready. + EXPECT_EQ(0, selectCheck(select_fd)); + // Verify that the queue is empty. EXPECT_EQ(0, sender.getQueueSize()); @@ -395,6 +426,72 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) { EXPECT_EQ(0, sender.getQueueSize()); } +/// @brief Test the NameChangeSender::assumeQueue method. +TEST(NameChangeSender, assumeQueue) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + uint32_t port = SENDER_PORT; + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + NameChangeRequestPtr ncr; + + // Tests are based on a list of messages, get the count now. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Create two senders with queue max equal to the number of + // messages we will have in the list. + NameChangeUDPSender sender1(ip_address, port, ip_address, port, + FMT_JSON, ncr_handler, num_msgs); + + NameChangeUDPSender sender2(ip_address, port+1, ip_address, port, + FMT_JSON, ncr_handler, num_msgs); + + // Place sender1 into send mode and queue up messages. + ASSERT_NO_THROW(sender1.startSending(io_service)); + for (int i = 0; i < num_msgs; i++) { + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + ASSERT_NO_THROW(sender1.sendRequest(ncr)); + } + + // Make sure sender1's queue count is as expected. + ASSERT_EQ(num_msgs, sender1.getQueueSize()); + + // Verify sender1 is sending, sender2 is not. + ASSERT_TRUE(sender1.amSending()); + ASSERT_FALSE(sender2.amSending()); + + // Transfer from sender1 to sender2 should fail because + // sender1 is in send mode. + ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError); + + // Take sender1 out of send mode. + ASSERT_NO_THROW(sender1.stopSending()); + ASSERT_FALSE(sender1.amSending()); + + // Transfer should succeed. Verify sender1 has none, + // and sender2 has num_msgs queued. + EXPECT_NO_THROW(sender2.assumeQueue(sender1)); + EXPECT_EQ(0, sender1.getQueueSize()); + EXPECT_EQ(num_msgs, sender2.getQueueSize()); + + // Reduce sender1's max queue size. + ASSERT_NO_THROW(sender1.setQueueMaxSize(num_msgs - 1)); + + // Transfer should fail as sender1's queue is not large enough. + ASSERT_THROW(sender1.assumeQueue(sender2), NcrSenderError); + + // Place sender1 into send mode and queue up a message. + ASSERT_NO_THROW(sender1.startSending(io_service)); + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0])); + ASSERT_NO_THROW(sender1.sendRequest(ncr)); + + // Take sender1 out of send mode. + ASSERT_NO_THROW(sender1.stopSending()); + + // Try to transfer from sender1 to sender2. This should fail + // as sender2's queue is not empty. + ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError); +} + /// @brief Text fixture that allows testing a listener and sender together /// It derives from both the receive and send handler classes and contains /// and instance of UDP listener and UDP sender. diff --git a/src/lib/dhcp_ddns/tests/test_utils.cc b/src/lib/dhcp_ddns/tests/test_utils.cc new file mode 100644 index 0000000000..34d669e553 --- /dev/null +++ b/src/lib/dhcp_ddns/tests/test_utils.cc @@ -0,0 +1,43 @@ +// Copyright (C) 2014 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#include + +#include +#include + +using namespace std; + +namespace isc { +namespace dhcp_ddns { + +int selectCheck(int fd_to_check) { + fd_set read_fds; + int maxfd = 0; + + FD_ZERO(&read_fds); + + // Add this socket to listening set + FD_SET(fd_to_check, &read_fds); + maxfd = fd_to_check; + + struct timeval select_timeout; + select_timeout.tv_sec = 0; + select_timeout.tv_usec = 0; + + return (select(maxfd + 1, &read_fds, NULL, NULL, &select_timeout)); +} + +}; // namespace isc::d2 +}; // namespace isc diff --git a/src/lib/dhcp_ddns/tests/test_utils.h b/src/lib/dhcp_ddns/tests/test_utils.h new file mode 100644 index 0000000000..71e23e945d --- /dev/null +++ b/src/lib/dhcp_ddns/tests/test_utils.h @@ -0,0 +1,37 @@ +// Copyright (C) 2014 Internet Systems Consortium, Inc. ("ISC") +// +// Permission to use, copy, modify, and/or distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH +// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +// AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, +// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM +// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE +// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +// PERFORMANCE OF THIS SOFTWARE. + +#ifndef TEST_UTILS_H +#define TEST_UTILS_H + +/// @file test_utils.h Common dhcp_ddns testing elements + +#include + + +namespace isc { +namespace dhcp_ddns { + +/// @brief Returns the result of select() given an fd to check for read status. +/// +/// @param fd_to_check The file descriptor to test +/// +/// @return Returns less than one on an error, 0 if the fd is not ready to +/// read, > 0 if it is ready to read. +int selectCheck(int fd_to_check); + +}; // namespace isc::dhcp_ddns; +}; // namespace isc; + +#endif diff --git a/src/lib/dhcp_ddns/tests/watch_socket_unittests.cc b/src/lib/dhcp_ddns/tests/watch_socket_unittests.cc index 7e8f1c8c55..c54ec99749 100644 --- a/src/lib/dhcp_ddns/tests/watch_socket_unittests.cc +++ b/src/lib/dhcp_ddns/tests/watch_socket_unittests.cc @@ -13,6 +13,7 @@ // PERFORMANCE OF THIS SOFTWARE. #include +#include #include @@ -25,31 +26,6 @@ using namespace isc::dhcp_ddns; namespace { -/// @brief Returns the result of select() given an fd to check for read status. -/// -/// @param fd_to_check The file descriptor to test -/// @return Returns less than one on an error, 0 if the fd is not ready to -/// read, > 0 if it is ready to read. -int selectCheck(int fd_to_check). - -int selectCheck(int fd_to_check) { - fd_set read_fds; - int maxfd = 0; - - FD_ZERO(&read_fds); - - // Add this socket to listening set - FD_SET(fd_to_check, &read_fds); - maxfd = fd_to_check; - - struct timeval select_timeout; - select_timeout.tv_sec = 0; - select_timeout.tv_usec = 0; - - return (select(maxfd + 1, &read_fds, NULL, NULL, &select_timeout)); -} - - /// @brief Tests the basic functionality of WatchSocket. TEST(WatchSocketTest, basics) { WatchSocketPtr watch;