diff --git a/src/lib/dhcp_ddns/ncr_io.cc b/src/lib/dhcp_ddns/ncr_io.cc index 9068f13289..7349362879 100644 --- a/src/lib/dhcp_ddns/ncr_io.cc +++ b/src/lib/dhcp_ddns/ncr_io.cc @@ -8,12 +8,18 @@ #include #include #include +#include #include +#include + namespace isc { namespace dhcp_ddns { +using namespace isc::util; +using namespace std; + NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) { if (boost::iequals(protocol_str, "UDP")) { return (NCR_UDP); @@ -154,7 +160,7 @@ NameChangeListener::invokeRecvHandler(const Result result, NameChangeSender::NameChangeSender(RequestSendHandler& send_handler, size_t send_queue_max) : sending_(false), send_handler_(send_handler), - send_queue_max_(send_queue_max), io_service_(NULL) { + send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) { // Queue size must be big enough to hold at least 1 entry. setQueueMaxSize(send_queue_max); @@ -167,18 +173,28 @@ NameChangeSender::startSending(isc::asiolink::IOService& io_service) { isc_throw(NcrSenderError, "NameChangeSender is already sending"); } - // Clear send marker. - ncr_to_send_.reset(); - // Call implementation dependent open. try { - // Remember io service we're given. - io_service_ = &io_service; - open(io_service); + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + startSendingInternal(io_service); + } else { + startSendingInternal(io_service); + } } catch (const isc::Exception& ex) { stopSending(); isc_throw(NcrSenderOpenError, "Open failed: " << ex.what()); } +} + +void +NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) { + // Clear send marker. + ncr_to_send_.reset(); + + // Remember io service we're given. + io_service_ = &io_service; + open(io_service); // Set our status to sending. setSending(true); @@ -229,10 +245,20 @@ NameChangeSender::sendRequest(NameChangeRequestPtr& ncr) { isc_throw(NcrSenderError, "request to send is empty"); } + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + sendRequestInternal(ncr); + } else { + sendRequestInternal(ncr); + } +} + +void +NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) { if (send_queue_.size() >= send_queue_max_) { isc_throw(NcrSenderQueueFull, "send queue has reached maximum capacity: " - << send_queue_max_ ); + << send_queue_max_); } // Put it on the queue. @@ -267,6 +293,16 @@ NameChangeSender::sendNext() { void NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) { + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + invokeSendHandlerInternal(result); + } else { + invokeSendHandlerInternal(result); + } +} + +void +NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) { // @todo reset defense timer if (result == SUCCESS) { // It shipped so pull it off the queue. @@ -318,6 +354,16 @@ NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) { void NameChangeSender::skipNext() { + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + skipNextInternal(); + } else { + skipNextInternal(); + } +} + +void +NameChangeSender::skipNextInternal() { if (!send_queue_.empty()) { // Discards the request at the front of the queue. send_queue_.pop_front(); @@ -330,7 +376,12 @@ NameChangeSender::clearSendQueue() { isc_throw(NcrSenderError, "Cannot clear queue while sending"); } - send_queue_.clear(); + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + send_queue_.clear(); + } else { + send_queue_.clear(); + } } void @@ -341,19 +392,54 @@ NameChangeSender::setQueueMaxSize(const size_t new_max) { } send_queue_max_ = new_max; - } + +size_t +NameChangeSender::getQueueSize() const { + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + return (getQueueSizeInternal()); + } else { + return (getQueueSizeInternal()); + } +} + +size_t +NameChangeSender::getQueueSizeInternal() const { + return (send_queue_.size()); +} + const NameChangeRequestPtr& NameChangeSender::peekAt(const size_t index) const { - if (index >= getQueueSize()) { + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + return (peekAtInternal(index)); + } else { + return (peekAtInternal(index)); + } +} + +const NameChangeRequestPtr& +NameChangeSender::peekAtInternal(const size_t index) const { + auto size = getQueueSizeInternal(); + if (index >= size) { isc_throw(NcrSenderError, "NameChangeSender::peekAt peek beyond end of queue attempted" - << " index: " << index << " queue size: " << getQueueSize()); + << " index: " << index << " queue size: " << size); } return (send_queue_.at(index)); } +bool +NameChangeSender::isSendInProgress() const { + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + return ((ncr_to_send_) ? true : false); + } else { + return ((ncr_to_send_) ? true : false); + } +} void NameChangeSender::assumeQueue(NameChangeSender& source_sender) { @@ -372,6 +458,16 @@ NameChangeSender::assumeQueue(NameChangeSender& source_sender) { " source queue count exceeds target queue max"); } + if (MultiThreadingMgr::instance().getMode()) { + lock_guard lock(*mutex_); + assumeQueueInternal(source_sender); + } else { + assumeQueueInternal(source_sender); + } +} + +void +NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) { if (!send_queue_.empty()) { isc_throw(NcrSenderError, "Cannot assume queue:" " target queue is not empty"); @@ -399,6 +495,5 @@ NameChangeSender::runReadyIO() { io_service_->get_io_service().poll_one(); } - -} // namespace isc::dhcp_ddns -} // namespace isc +} // namespace dhcp_ddns +} // namespace isc diff --git a/src/lib/dhcp_ddns/ncr_io.h b/src/lib/dhcp_ddns/ncr_io.h index dce72a5407..c9618b8b92 100644 --- a/src/lib/dhcp_ddns/ncr_io.h +++ b/src/lib/dhcp_ddns/ncr_io.h @@ -46,14 +46,16 @@ /// communications that is independent of the IO layer mechanisms. While the /// type and details of the IO mechanism are not relevant to either class, it /// is presumed to use isc::asiolink library for asynchronous event processing. -/// #include #include #include #include +#include + #include +#include namespace isc { namespace dhcp_ddns { @@ -68,7 +70,7 @@ enum NameChangeProtocol { NCR_TCP }; -/// @brief Function which converts labels to NameChangeProtocol enum values. +/// @brief Function which converts text labels to @ref NameChangeProtocol enums. /// /// @param protocol_str text to convert to an enum. /// Valid string values: "UDP", "TCP" @@ -79,7 +81,7 @@ enum NameChangeProtocol { /// enum value. extern NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str); -/// @brief Function which converts NameChangeProtocol enums to text labels. +/// @brief Function which converts @ref NameChangeProtocol enums to text labels. /// /// @param protocol enum value to convert to label /// @@ -167,10 +169,10 @@ public: /// @brief Defines the outcome of an asynchronous NCR receive enum Result { - SUCCESS, - TIME_OUT, - STOPPED, - ERROR + SUCCESS, + TIME_OUT, + STOPPED, + ERROR }; /// @brief Abstract class for defining application layer receive callbacks. @@ -179,7 +181,8 @@ public: /// derivation of this class to the listener constructor in order to /// receive NameChangeRequests. class RequestReceiveHandler { - public: + public: + /// @brief Function operator implementing a NCR receive callback. /// /// This method allows the application to receive the inbound @@ -190,6 +193,7 @@ public: /// @param ncr is a pointer to the newly received NameChangeRequest if /// result is NameChangeListener::SUCCESS. It is indeterminate other /// wise. + /// /// @throw This method MUST NOT throw. virtual void operator ()(const Result result, NameChangeRequestPtr& ncr) = 0; @@ -296,12 +300,15 @@ protected: virtual void doReceive() = 0; public: + /// @brief Returns true if the listener is listening, false otherwise. /// /// A true value indicates that the IO source has been opened successfully, /// and that receive loop logic is active. This implies that closing the /// IO source will interrupt that operation, resulting in a callback /// invocation. + /// + /// @return The listening mode. bool amListening() const { return (listening_); } @@ -315,6 +322,8 @@ public: /// deleted while there is an IO call pending. This can result in the /// IO service attempting to invoke methods on objects that are no longer /// valid. + /// + /// @return The pending flag. bool isIoPending() const { return (io_pending_); } @@ -477,7 +486,8 @@ public: /// derivation of this class to the sender constructor in order to /// receive send outcome notifications. class RequestSendHandler { - public: + public: + /// @brief Function operator implementing a NCR send callback. /// /// This method allows the application to receive the outcome of @@ -504,7 +514,7 @@ public: /// send queue. Once the maximum number is reached, all calls to /// sendRequest will fail with an exception. NameChangeSender(RequestSendHandler& send_handler, - size_t send_queue_max = MAX_QUEUE_DEFAULT); + size_t send_queue_max = MAX_QUEUE_DEFAULT); /// @brief Destructor virtual ~NameChangeSender() { @@ -573,13 +583,61 @@ public: /// @return true if the sender has at IO ready, false otherwise. virtual bool ioReady() = 0; +private: + + /// @brief Prepares the IO for transmission in a thread safe context. + /// + /// @param io_service is the IOService that will handle IO event processing. + void startSendingInternal(isc::asiolink::IOService & io_service); + + /// @brief Queues the given request to be sent in a thread safe context. + /// + /// @param ncr is the NameChangeRequest to send. + /// + /// @throw NcrSenderQueueFull if the send queue has reached capacity. + void sendRequestInternal(NameChangeRequestPtr& ncr); + + /// @brief Move all queued requests from a given sender into the send queue + /// in a thread safe context. + /// + /// @param source_sender from whom the queued messages will be taken + /// + /// @throw NcrSenderError if this sender's queue is not empty. + void assumeQueueInternal(NameChangeSender& source_sender); + + /// @brief Calls the NCR send completion handler registered with the + /// sender in a thread safe context. + /// + /// @param result contains that send outcome status. + void invokeSendHandlerInternal(const NameChangeSender::Result result); + + /// @brief Removes the request at the front of the send queue in a thread + /// safe context. + void skipNextInternal(); + + /// @brief Returns the number of entries currently in the send queue in a + /// thread safe context. + /// + /// @return the queue size. + size_t getQueueSizeInternal() const; + + /// @brief Returns the entry at a given position in the queue in a thread + /// safe context. + /// + /// @return Pointer reference to the queue entry. + /// + /// @throw NcrSenderError if the given index is beyond the + /// end of the queue. + const NameChangeRequestPtr& peekAtInternal(const size_t index) const; + protected: - /// @brief Dequeues and sends the next request on the send queue. + + /// @brief Dequeues and sends the next request on the send queue in a thread + /// safe context. /// /// If there is already a send in progress just return. If there is not /// a send in progress and the send queue is not empty the grab the next /// message on the front of the queue and call doSend(). - /// void sendNext(); /// @brief Calls the NCR send completion handler registered with the @@ -587,8 +645,8 @@ protected: /// /// This is the hook by which the sender's caller's NCR send completion /// handler is called. This method MUST be invoked by the derivation's - /// implementation of doSend. Note that if the send was a success, - /// the entry at the front of the queue is removed from the queue. + /// implementation of doSend. Note that if the send was a success, the + /// entry at the front of the queue is removed from the queue. /// If not we leave it there so we can retry it. After we invoke the /// handler we clear the pending ncr value and queue up the next send. /// @@ -643,6 +701,7 @@ protected: virtual void doSend(NameChangeRequestPtr& ncr) = 0; public: + /// @brief Removes the request at the front of the send queue /// /// This method can be used to avoid further retries of a failed @@ -653,7 +712,7 @@ public: /// It is presumed that sends will only fail due to some sort of /// communications issue. In the unlikely event that a request is /// somehow tainted and causes an send failure based on its content, - /// this method provides a means to remove th message. + /// this method provides a means to remove the message. void skipNext(); /// @brief Flushes all entries in the send queue @@ -661,6 +720,7 @@ public: /// This method can be used to discard all of the NCRs currently in the /// the send queue. Note it may not be called while the sender is in /// the sending state. + /// /// @throw NcrSenderError if called and sender is in sending state. void clearSendQueue(); @@ -668,6 +728,8 @@ public: /// /// A true value indicates that the IO sink has been opened successfully, /// and that send loop logic is active. + /// + /// @return The send mode. bool amSending() const { return (sending_); } @@ -676,11 +738,13 @@ public: /// /// A true value indicates that a request is actively in the process of /// being delivered. - bool isSendInProgress() const { - return ((ncr_to_send_) ? true : false); - } + /// + /// @return The send in progress flag. + bool isSendInProgress() const; /// @brief Returns the maximum number of entries allowed in the send queue. + /// + /// @return The queue maximum size. size_t getQueueMaxSize() const { return (send_queue_max_); } @@ -696,13 +760,14 @@ public: void setQueueMaxSize(const size_t new_max); /// @brief Returns the number of entries currently in the send queue. - size_t getQueueSize() const { - return (send_queue_.size()); - } + /// + /// @return The queue size. + size_t getQueueSize() const; /// @brief Returns the entry at a given position in the queue. /// /// Note that the entry is not removed from the queue. + /// /// @param index the index of the entry in the queue to fetch. /// Valid values are 0 (front of the queue) to (queue size - 1). /// @@ -731,16 +796,19 @@ public: /// By running only one handler at time, we ensure that NCR IO activity /// doesn't starve other processing. It is unclear how much of a real /// threat this poses but for now it is best to err on the side of caution. - /// virtual void runReadyIO(); protected: + /// @brief Returns a reference to the send queue. + /// + /// @return The send queue. SendQueue& getSendQueue() { return (send_queue_); } private: + /// @brief Sets the sending indicator to the given value. /// /// Note, this method is private as it is used the base class is solely @@ -748,7 +816,7 @@ private: /// /// @param value is the new value to assign to the indicator. void setSending(bool value) { - sending_ = value; + sending_ = value; } /// @brief Boolean indicator which tracks sending status. @@ -771,12 +839,15 @@ private: /// reference. Use a raw pointer to store it. This value should never be /// exposed and is only valid while in send mode. asiolink::IOService* io_service_; + + /// @brief The mutex used to protect internal state. + const boost::scoped_ptr mutex_; }; /// @brief Defines a smart pointer to an instance of a sender. typedef boost::shared_ptr NameChangeSenderPtr; -} // namespace isc::dhcp_ddns -} // namespace isc +} // namespace dhcp_ddns +} // namespace isc #endif diff --git a/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc b/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc index 7d492cbfc6..84ab0113a7 100644 --- a/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc +++ b/src/lib/dhcp_ddns/tests/ncr_udp_unittests.cc @@ -6,22 +6,26 @@ #include +#include + #include #include #include +#include #include #include #include #include #include -#include + #include #include using namespace std; using namespace isc; +using namespace isc::util; using namespace isc::dhcp_ddns; namespace { @@ -231,7 +235,7 @@ public: /// This test verifies that a listener can enter listening mode and /// receive NCRs in wire format on its UDP socket; reconstruct the /// NCRs and delivery them to the "application" layer. -TEST_F(NameChangeUDPListenerTest, basicReceivetest) { +TEST_F(NameChangeUDPListenerTest, basicReceiveTests) { // Verify we can enter listening mode. ASSERT_FALSE(listener_->amListening()); ASSERT_NO_THROW(listener_->startListening(io_service_)); @@ -283,13 +287,64 @@ public: int error_count_; }; +/// @brief Text fixture for testing NameChangeUDPListener +class NameChangeUDPSenderBasicTest : public virtual ::testing::Test { +public: + NameChangeUDPSenderBasicTest() { + // Disable multi-threading + MultiThreadingMgr::instance().setMode(false); + } + + ~NameChangeUDPSenderBasicTest() { + // Disable multi-threading + MultiThreadingMgr::instance().setMode(false); + } +}; + /// @brief Tests the NameChangeUDPSender constructors. /// This test verifies that: /// 1. Constructing with a max queue size of 0 is not allowed /// 2. Given valid parameters, the sender constructor works /// 3. Default construction provides default max queue size /// 4. Construction with a custom max queue size works -TEST(NameChangeUDPSenderBasicTest, constructionTests) { +TEST_F(NameChangeUDPSenderBasicTest, constructionTests) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + uint32_t port = SENDER_PORT; + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Verify that constructing with an queue size of zero is not allowed. + EXPECT_THROW(NameChangeUDPSender(ip_address, port, + ip_address, port, FMT_JSON, ncr_handler, 0), NcrSenderError); + + NameChangeSenderPtr sender; + // Verify that valid constructor works. + EXPECT_NO_THROW(sender.reset( + new NameChangeUDPSender(ip_address, port, ip_address, port, + FMT_JSON, ncr_handler))); + + // Verify that send queue default max is correct. + size_t expected = NameChangeSender::MAX_QUEUE_DEFAULT; + EXPECT_EQ(expected, sender->getQueueMaxSize()); + + // Verify that constructor with a valid custom queue size works. + EXPECT_NO_THROW(sender.reset( + new NameChangeUDPSender(ip_address, port, ip_address, port, + FMT_JSON, ncr_handler, 100))); + + EXPECT_EQ(100, sender->getQueueMaxSize()); +} + +/// @brief Tests the NameChangeUDPSender constructors. +/// This test verifies that: +/// 1. Constructing with a max queue size of 0 is not allowed +/// 2. Given valid parameters, the sender constructor works +/// 3. Default construction provides default max queue size +/// 4. Construction with a custom max queue size works +TEST_F(NameChangeUDPSenderBasicTest, constructionTestsMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); uint32_t port = SENDER_PORT; isc::asiolink::IOService io_service; @@ -318,8 +373,135 @@ TEST(NameChangeUDPSenderBasicTest, constructionTests) { } /// @brief Tests NameChangeUDPSender basic send functionality -/// This test verifies that: -TEST(NameChangeUDPSenderBasicTest, basicSendTests) { +TEST_F(NameChangeUDPSenderBasicTest, basicSendTests) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Tests are based on a list of messages, get the count now. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Create the sender, setting the queue max equal to the number of + // messages we will have in the list. + NameChangeUDPSender sender(ip_address, SENDER_PORT, ip_address, + LISTENER_PORT, FMT_JSON, ncr_handler, + num_msgs, true); + + // Verify that we can start sending. + EXPECT_NO_THROW(sender.startSending(io_service)); + EXPECT_TRUE(sender.amSending()); + + // Verify that attempting to send when we already are is an error. + EXPECT_THROW(sender.startSending(io_service), NcrSenderError); + + // Verify that we can stop sending. + EXPECT_NO_THROW(sender.stopSending()); + EXPECT_FALSE(sender.amSending()); + + // Verify that attempting to stop sending when we are not is ok. + EXPECT_NO_THROW(sender.stopSending()); + + // Verify that we can re-enter sending after stopping. + EXPECT_NO_THROW(sender.startSending(io_service)); + EXPECT_TRUE(sender.amSending()); + + // Fetch the sender's select-fd. + int select_fd = sender.getSelectFd(); + + // Verify select_fd is valid and currently shows no ready to read. + ASSERT_NE(util::WatchSocket::SOCKET_NOT_VALID, select_fd); + + // Make sure select_fd does evaluates to not ready via select and + // that ioReady() method agrees. + ASSERT_EQ(0, selectCheck(select_fd)); + ASSERT_FALSE(sender.ioReady()); + + // Iterate over a series of messages, sending each one. Since we + // do not invoke IOService::run, then the messages should accumulate + // in the queue. + NameChangeRequestPtr ncr; + NameChangeRequestPtr ncr2; + for (int i = 0; i < num_msgs; i++) { + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + EXPECT_NO_THROW(sender.sendRequest(ncr)); + // Verify that the queue count increments in step with each send. + EXPECT_EQ(i+1, sender.getQueueSize()); + + // Verify that peekAt(i) returns the NCR we just added. + ASSERT_NO_THROW(ncr2 = sender.peekAt(i)); + ASSERT_TRUE(ncr2); + EXPECT_TRUE(*ncr == *ncr2); + } + + // Verify that attempting to peek beyond the end of the queue, throws. + ASSERT_THROW(sender.peekAt(sender.getQueueSize()+1), NcrSenderError); + + // Verify that attempting to send an additional message results in a + // queue full exception. + EXPECT_THROW(sender.sendRequest(ncr), NcrSenderQueueFull); + + // Loop for the number of valid messages. So long as there is at least + // on NCR in the queue, select-fd indicate ready to read. Invoke + // IOService::run_one. This should complete the send of exactly one + // message and the queue count should decrement accordingly. + for (int i = num_msgs; i > 0; i--) { + // Make sure select_fd does evaluates to ready via select and + // that ioReady() method agrees. + ASSERT_TRUE(selectCheck(select_fd) > 0); + ASSERT_TRUE(sender.ioReady()); + + // Execute at one ready handler. + ASSERT_NO_THROW(sender.runReadyIO()); + + // Verify that the queue count decrements in step with each run. + EXPECT_EQ(i-1, sender.getQueueSize()); + } + + // Make sure select_fd does evaluates to not ready via select and + // that ioReady() method agrees. + ASSERT_EQ(0, selectCheck(select_fd)); + ASSERT_FALSE(sender.ioReady()); + + // Verify that the queue is empty. + EXPECT_EQ(0, sender.getQueueSize()); + + // Verify that we can add back to the queue + EXPECT_NO_THROW(sender.sendRequest(ncr)); + EXPECT_EQ(1, sender.getQueueSize()); + + // Verify that we can remove the current entry at the front of the queue. + EXPECT_NO_THROW(sender.skipNext()); + EXPECT_EQ(0, sender.getQueueSize()); + + // Verify that flushing the queue is not allowed in sending state. + EXPECT_THROW(sender.clearSendQueue(), NcrSenderError); + + // Put num_msgs messages on the queue. + for (int i = 0; i < num_msgs; i++) { + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + EXPECT_NO_THROW(sender.sendRequest(ncr)); + } + + // Make sure we have number of messages expected. + EXPECT_EQ(num_msgs, sender.getQueueSize()); + + // Verify that we can gracefully stop sending. + EXPECT_NO_THROW(sender.stopSending()); + EXPECT_FALSE(sender.amSending()); + + // Verify that the queue is preserved after leaving sending state. + EXPECT_EQ(num_msgs - 1, sender.getQueueSize()); + + // Verify that flushing the queue works when not sending. + EXPECT_NO_THROW(sender.clearSendQueue()); + EXPECT_EQ(0, sender.getQueueSize()); +} + +/// @brief Tests NameChangeUDPSender basic send functionality +TEST_F(NameChangeUDPSenderBasicTest, basicSendTestsMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler; @@ -445,7 +627,62 @@ TEST(NameChangeUDPSenderBasicTest, basicSendTests) { /// @brief Tests that sending gets kick-started if the queue isn't empty /// when startSending is called. -TEST(NameChangeUDPSenderBasicTest, autoStart) { +TEST_F(NameChangeUDPSenderBasicTest, autoStart) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Tests are based on a list of messages, get the count now. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Create the sender, setting the queue max equal to the number of + // messages we will have in the list. + NameChangeUDPSender sender(ip_address, SENDER_PORT, ip_address, + LISTENER_PORT, FMT_JSON, ncr_handler, + num_msgs, true); + + // Verify that we can start sending. + EXPECT_NO_THROW(sender.startSending(io_service)); + EXPECT_TRUE(sender.amSending()); + + // Queue up messages. + NameChangeRequestPtr ncr; + for (int i = 0; i < num_msgs; i++) { + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + EXPECT_NO_THROW(sender.sendRequest(ncr)); + } + // Make sure queue count is what we expect. + EXPECT_EQ(num_msgs, sender.getQueueSize()); + + // Stop sending. + ASSERT_NO_THROW(sender.stopSending()); + ASSERT_FALSE(sender.amSending()); + + // We should have completed the first message only. + EXPECT_EQ(--num_msgs, sender.getQueueSize()); + + // Restart sending. + EXPECT_NO_THROW(sender.startSending(io_service)); + + // We should be able to loop through remaining messages and send them. + for (int i = num_msgs; i > 0; i--) { + // ioReady() should evaluate to true. + ASSERT_TRUE(sender.ioReady()); + + // Execute at one ready handler. + ASSERT_NO_THROW(sender.runReadyIO()); + } + + // Verify that the queue is empty. + EXPECT_EQ(0, sender.getQueueSize()); +} + +/// @brief Tests that sending gets kick-started if the queue isn't empty +/// when startSending is called. +TEST_F(NameChangeUDPSenderBasicTest, autoStartMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler; @@ -496,7 +733,45 @@ TEST(NameChangeUDPSenderBasicTest, autoStart) { } /// @brief Tests NameChangeUDPSender basic send with INADDR_ANY and port 0. -TEST(NameChangeUDPSenderBasicTest, anyAddressSend) { +TEST_F(NameChangeUDPSenderBasicTest, anyAddressSend) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOAddress any_address("0.0.0.0"); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Tests are based on a list of messages, get the count now. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Create the sender, setting the queue max equal to the number of + // messages we will have in the list. + NameChangeUDPSender sender(any_address, 0, ip_address, LISTENER_PORT, + FMT_JSON, ncr_handler, num_msgs); + + // Enter send mode. + ASSERT_NO_THROW(sender.startSending(io_service)); + EXPECT_TRUE(sender.amSending()); + + // Create and queue up a message. + NameChangeRequestPtr ncr; + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0])); + EXPECT_NO_THROW(sender.sendRequest(ncr)); + EXPECT_EQ(1, sender.getQueueSize()); + + // Verify we have a ready IO, then execute at one ready handler. + ASSERT_TRUE(sender.ioReady()); + ASSERT_NO_THROW(sender.runReadyIO()); + + // Verify that sender shows no IO ready. + // and that the queue is empty. + ASSERT_FALSE(sender.ioReady()); + EXPECT_EQ(0, sender.getQueueSize()); +} + +/// @brief Tests NameChangeUDPSender basic send with INADDR_ANY and port 0. +TEST_F(NameChangeUDPSenderBasicTest, anyAddressSendMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOAddress any_address("0.0.0.0"); isc::asiolink::IOService io_service; @@ -531,7 +806,79 @@ TEST(NameChangeUDPSenderBasicTest, anyAddressSend) { } /// @brief Test the NameChangeSender::assumeQueue method. -TEST(NameChangeSender, assumeQueue) { +TEST_F(NameChangeUDPSenderBasicTest, assumeQueue) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + uint32_t port = SENDER_PORT; + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + NameChangeRequestPtr ncr; + + // Tests are based on a list of messages, get the count now. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Create two senders with queue max equal to the number of + // messages we will have in the list. + NameChangeUDPSender sender1(ip_address, port, ip_address, port, + FMT_JSON, ncr_handler, num_msgs); + + NameChangeUDPSender sender2(ip_address, port+1, ip_address, port, + FMT_JSON, ncr_handler, num_msgs); + + // Place sender1 into send mode and queue up messages. + ASSERT_NO_THROW(sender1.startSending(io_service)); + for (int i = 0; i < num_msgs; i++) { + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + ASSERT_NO_THROW(sender1.sendRequest(ncr)); + } + + // Make sure sender1's queue count is as expected. + ASSERT_EQ(num_msgs, sender1.getQueueSize()); + + // Verify sender1 is sending, sender2 is not. + ASSERT_TRUE(sender1.amSending()); + ASSERT_FALSE(sender2.amSending()); + + // Transfer from sender1 to sender2 should fail because + // sender1 is in send mode. + ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError); + + // Take sender1 out of send mode. + ASSERT_NO_THROW(sender1.stopSending()); + ASSERT_FALSE(sender1.amSending()); + // Stopping should have completed the first message. + --num_msgs; + EXPECT_EQ(num_msgs, sender1.getQueueSize()); + + // Transfer should succeed. Verify sender1 has none, + // and sender2 has num_msgs queued. + EXPECT_NO_THROW(sender2.assumeQueue(sender1)); + EXPECT_EQ(0, sender1.getQueueSize()); + EXPECT_EQ(num_msgs, sender2.getQueueSize()); + + // Reduce sender1's max queue size. + ASSERT_NO_THROW(sender1.setQueueMaxSize(num_msgs - 1)); + + // Transfer should fail as sender1's queue is not large enough. + ASSERT_THROW(sender1.assumeQueue(sender2), NcrSenderError); + + // Place sender1 into send mode and queue up a message. + ASSERT_NO_THROW(sender1.startSending(io_service)); + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0])); + ASSERT_NO_THROW(sender1.sendRequest(ncr)); + + // Take sender1 out of send mode. + ASSERT_NO_THROW(sender1.stopSending()); + + // Try to transfer from sender1 to sender2. This should fail + // as sender2's queue is not empty. + ASSERT_THROW(sender2.assumeQueue(sender1), NcrSenderError); +} + +/// @brief Test the NameChangeSender::assumeQueue method. +TEST_F(NameChangeUDPSenderBasicTest, assumeQueueMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); uint32_t port = SENDER_PORT; isc::asiolink::IOService io_service; @@ -634,6 +981,13 @@ public: test_timer_.setup(boost::bind(&NameChangeUDPTest::testTimeoutHandler, this), TEST_TIMEOUT); + // Disble multi-threading + MultiThreadingMgr::instance().setMode(false); + } + + ~NameChangeUDPTest() { + // Disble multi-threading + MultiThreadingMgr::instance().setMode(false); } void reset_results() { @@ -670,7 +1024,63 @@ public: /// Conducts a "round-trip" test using a sender to transmit a set of valid /// NCRs to a listener. The test verifies that what was sent matches what /// was received both in quantity and in content. -TEST_F (NameChangeUDPTest, roundTripTest) { +TEST_F(NameChangeUDPTest, roundTripTest) { + // Place the listener into listening state. + ASSERT_NO_THROW(listener_->startListening(io_service_)); + EXPECT_TRUE(listener_->amListening()); + + // Get the number of messages in the list of test messages. + int num_msgs = sizeof(valid_msgs)/sizeof(char*); + + // Place the sender into sending state. + ASSERT_NO_THROW(sender_->startSending(io_service_)); + EXPECT_TRUE(sender_->amSending()); + + for (int i = 0; i < num_msgs; i++) { + NameChangeRequestPtr ncr; + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + sender_->sendRequest(ncr); + EXPECT_EQ(i+1, sender_->getQueueSize()); + } + + // Execute callbacks until we have sent and received all of messages. + while (sender_->getQueueSize() > 0 || (received_ncrs_.size() < num_msgs)) { + EXPECT_NO_THROW(io_service_.run_one()); + } + + // Send queue should be empty. + EXPECT_EQ(0, sender_->getQueueSize()); + + // We should have the same number of sends and receives as we do messages. + ASSERT_EQ(num_msgs, sent_ncrs_.size()); + ASSERT_EQ(num_msgs, received_ncrs_.size()); + + // Verify that what we sent matches what we received. + for (int i = 0; i < num_msgs; i++) { + EXPECT_TRUE (checkSendVsReceived(sent_ncrs_[i], received_ncrs_[i])); + } + + // Verify that we can gracefully stop listening. + EXPECT_NO_THROW(listener_->stopListening()); + EXPECT_FALSE(listener_->amListening()); + + // Verify that IO pending is false, after cancel event occurs. + EXPECT_NO_THROW(io_service_.run_one()); + EXPECT_FALSE(listener_->isIoPending()); + + // Verify that we can gracefully stop sending. + EXPECT_NO_THROW(sender_->stopSending()); + EXPECT_FALSE(sender_->amSending()); +} + +/// @brief Uses a sender and listener to test UDP-based NCR delivery +/// Conducts a "round-trip" test using a sender to transmit a set of valid +/// NCRs to a listener. The test verifies that what was sent matches what +/// was received both in quantity and in content. +TEST_F(NameChangeUDPTest, roundTripTestMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + // Place the listener into listening state. ASSERT_NO_THROW(listener_->startListening(io_service_)); EXPECT_TRUE(listener_->amListening()); @@ -721,7 +1131,42 @@ TEST_F (NameChangeUDPTest, roundTripTest) { // Tests error handling of a failure to mark the watch socket ready, when // sendRequest() is called. -TEST(NameChangeUDPSenderBasicTest, watchClosedBeforeSendRequest) { +TEST_F(NameChangeUDPSenderBasicTest, watchClosedBeforeSendRequest) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Create the sender and put into send mode. + NameChangeUDPSender sender(ip_address, 0, ip_address, LISTENER_PORT, + FMT_JSON, ncr_handler, 100, true); + ASSERT_NO_THROW(sender.startSending(io_service)); + ASSERT_TRUE(sender.amSending()); + + // Create an NCR. + NameChangeRequestPtr ncr; + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[0])); + + // Tamper with the watch socket by closing the select-fd. + close(sender.getSelectFd()); + + // Send should fail as we interfered by closing the select-fd. + ASSERT_THROW(sender.sendRequest(ncr), util::WatchSocketError); + + // Verify we didn't invoke the handler. + EXPECT_EQ(0, ncr_handler.pass_count_); + EXPECT_EQ(0, ncr_handler.error_count_); + + // Request remains in the queue. Technically it was sent but its + // completion handler won't get called. + EXPECT_EQ(1, sender.getQueueSize()); +} + +// Tests error handling of a failure to mark the watch socket ready, when +// sendRequest() is called. +TEST_F(NameChangeUDPSenderBasicTest, watchClosedBeforeSendRequestMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler; @@ -753,7 +1198,7 @@ TEST(NameChangeUDPSenderBasicTest, watchClosedBeforeSendRequest) { // Tests error handling of a failure to mark the watch socket ready, when // sendNext() is called during completion handling. -TEST(NameChangeUDPSenderBasicTest, watchClosedAfterSendRequest) { +TEST_F(NameChangeUDPSenderBasicTest, watchClosedAfterSendRequest) { isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler; @@ -790,9 +1235,103 @@ TEST(NameChangeUDPSenderBasicTest, watchClosedAfterSendRequest) { EXPECT_EQ(1, sender.getQueueSize()); } +// Tests error handling of a failure to mark the watch socket ready, when +// sendNext() is called during completion handling. +TEST_F(NameChangeUDPSenderBasicTest, watchClosedAfterSendRequestMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Create the sender and put into send mode. + NameChangeUDPSender sender(ip_address, 0, ip_address, LISTENER_PORT, + FMT_JSON, ncr_handler, 100, true); + ASSERT_NO_THROW(sender.startSending(io_service)); + ASSERT_TRUE(sender.amSending()); + + // Build and queue up 2 messages. No handlers will get called yet. + for (int i = 0; i < 2; i++) { + NameChangeRequestPtr ncr; + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + sender.sendRequest(ncr); + EXPECT_EQ(i+1, sender.getQueueSize()); + } + + // Tamper with the watch socket by closing the select-fd. + close (sender.getSelectFd()); + + // Run one handler. This should execute the send completion handler + // after sending the first message. Doing completion handling, we will + // attempt to queue the second message which should fail. + ASSERT_NO_THROW(sender.runReadyIO()); + + // Verify handler got called twice. First request should have be sent + // without error, second call should have failed to send due to watch + // socket markReady failure. + EXPECT_EQ(1, ncr_handler.pass_count_); + EXPECT_EQ(1, ncr_handler.error_count_); + + // The second request should still be in the queue. + EXPECT_EQ(1, sender.getQueueSize()); +} + + // Tests error handling of a failure to clear the watch socket during // completion handling. -TEST(NameChangeUDPSenderBasicTest, watchSocketBadRead) { +TEST_F(NameChangeUDPSenderBasicTest, watchSocketBadRead) { + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); + isc::asiolink::IOService io_service; + SimpleSendHandler ncr_handler; + + // Create the sender and put into send mode. + NameChangeUDPSender sender(ip_address, 0, ip_address, LISTENER_PORT, + FMT_JSON, ncr_handler, 100, true); + ASSERT_NO_THROW(sender.startSending(io_service)); + ASSERT_TRUE(sender.amSending()); + + // Build and queue up 2 messages. No handlers will get called yet. + for (int i = 0; i < 2; i++) { + NameChangeRequestPtr ncr; + ASSERT_NO_THROW(ncr = NameChangeRequest::fromJSON(valid_msgs[i])); + sender.sendRequest(ncr); + EXPECT_EQ(i+1, sender.getQueueSize()); + } + + // Fetch the sender's select-fd. + int select_fd = sender.getSelectFd(); + + // Verify that select_fd appears ready. + ASSERT_TRUE(selectCheck(select_fd) > 0); + + // Interfere by reading part of the marker from the select-fd. + uint32_t buf = 0; + ASSERT_EQ((read (select_fd, &buf, 1)), 1); + ASSERT_NE(util::WatchSocket::MARKER, buf); + + // Run one handler. This should execute the send completion handler + // after sending the message. Doing completion handling clearing the + // watch socket should fail, which will close the socket, but not + // result in a throw. + ASSERT_NO_THROW(sender.runReadyIO()); + + // Verify handler got called twice. First request should have be sent + // without error, second call should have failed to send due to watch + // socket markReady failure. + EXPECT_EQ(1, ncr_handler.pass_count_); + EXPECT_EQ(1, ncr_handler.error_count_); + + // The second request should still be in the queue. + EXPECT_EQ(1, sender.getQueueSize()); +} + +// Tests error handling of a failure to clear the watch socket during +// completion handling. +TEST_F(NameChangeUDPSenderBasicTest, watchSocketBadReadMultiThreading) { + // Enable multi-threading + MultiThreadingMgr::instance().setMode(true); + isc::asiolink::IOAddress ip_address(TEST_ADDRESS); isc::asiolink::IOService io_service; SimpleSendHandler ncr_handler;