From 00d6396b3b49e21061fc80744eccd03e25cb5b25 Mon Sep 17 00:00:00 2001 From: Razvan Becheriu Date: Wed, 5 Feb 2020 14:15:07 +0200 Subject: [PATCH] [#892] create pkt thread pool and handle processing using multiple threads --- src/bin/dhcp4/ctrl_dhcp4_srv.cc | 42 +++++++++++------- src/bin/dhcp4/ctrl_dhcp4_srv.h | 15 +++---- src/bin/dhcp4/dhcp4_srv.cc | 73 +++++++++++++++++++++++++++---- src/bin/dhcp4/dhcp4_srv.h | 77 +++++++++++++++++++++++---------- src/bin/dhcp4/main.cc | 35 ++++++++++++--- src/bin/dhcp6/ctrl_dhcp6_srv.cc | 53 ++++++++++++++--------- src/bin/dhcp6/ctrl_dhcp6_srv.h | 24 +++++----- src/bin/dhcp6/dhcp6_srv.cc | 60 ++++++++++++++++++++++++- src/bin/dhcp6/dhcp6_srv.h | 61 ++++++++++++++++++++++---- src/bin/dhcp6/main.cc | 49 +++++++++++++-------- src/lib/dhcpsrv/srv_config.cc | 5 ++- src/lib/dhcpsrv/srv_config.h | 38 +++++++++++++++- 12 files changed, 408 insertions(+), 124 deletions(-) diff --git a/src/bin/dhcp4/ctrl_dhcp4_srv.cc b/src/bin/dhcp4/ctrl_dhcp4_srv.cc index c5f4931330..08efdf03b9 100644 --- a/src/bin/dhcp4/ctrl_dhcp4_srv.cc +++ b/src/bin/dhcp4/ctrl_dhcp4_srv.cc @@ -5,30 +5,34 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include -#include + #include +#include +#include #include #include -#include -#include #include #include #include #include #include +#include +#include +#include #include #include #include -#include #include + #include using namespace isc::config; -using namespace isc::db; using namespace isc::data; +using namespace isc::db; using namespace isc::dhcp; using namespace isc::hooks; using namespace isc::stats; +using namespace isc::util; using namespace std; namespace { @@ -124,8 +128,6 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) { // configuration from a JSON file. isc::data::ConstElementPtr json; - isc::data::ConstElementPtr dhcp4; - isc::data::ConstElementPtr logger; isc::data::ConstElementPtr result; // Basic sanity check: file name must not be empty. @@ -204,7 +206,6 @@ ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) { ConstElementPtr ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) { - /// @todo delete any stored CalloutHandles referring to the old libraries /// Get list of currently loaded libraries and reload them. HookLibsCollection loaded = HooksManager::getLibraryInfo(); @@ -223,7 +224,6 @@ ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) { ConstElementPtr ControlledDhcpv4Srv::commandConfigReloadHandler(const string&, ConstElementPtr /*args*/) { - // Get configuration file name. std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile(); try { @@ -269,6 +269,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, if (filename.empty()) { // filename parameter was not specified, so let's use whatever we remember + // from the command-line filename = getConfigFile(); } @@ -303,7 +304,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&, ConstElementPtr ControlledDhcpv4Srv::commandConfigSetHandler(const string&, ConstElementPtr args) { - const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error + const int status_code = CONTROL_RESULT_ERROR; ConstElementPtr dhcp4; string message; @@ -618,6 +619,16 @@ ControlledDhcpv4Srv::processCommand(const string& command, return (no_srv); } + if (Dhcpv4Srv::threadCount()) { + if (srv->pkt_thread_pool_.size()) { + srv->pkt_thread_pool_.stop(); + } + MultiThreadingMgr::instance().setMode(true); + srv->pkt_thread_pool_.start(Dhcpv4Srv::threadCount()); + } else { + MultiThreadingMgr::instance().setMode(false); + } + try { if (command == "shutdown") { return (srv->commandShutdownHandler(command, args)); @@ -776,6 +787,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) { return (isc::config::createAnswer(1, err.str())); } + // Setup config backend polling, if configured for it. auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo(); if (ctl_info) { long fetch_time = static_cast(ctl_info->getConfigFetchWaitTime()); @@ -966,8 +978,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() { CommandMgr::instance().deregisterCommand("build-report"); CommandMgr::instance().deregisterCommand("config-backend-pull"); CommandMgr::instance().deregisterCommand("config-get"); - CommandMgr::instance().deregisterCommand("config-reload"); CommandMgr::instance().deregisterCommand("config-set"); + CommandMgr::instance().deregisterCommand("config-reload"); CommandMgr::instance().deregisterCommand("config-test"); CommandMgr::instance().deregisterCommand("config-write"); CommandMgr::instance().deregisterCommand("dhcp-disable"); @@ -995,8 +1007,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() { ; } - server_ = NULL; // forget this instance. Noone should call any handlers at - // this stage. + server_ = NULL; // forget this instance. There should be no callback anymore + // at this stage anyway. } void ControlledDhcpv4Srv::sessionReader(void) { @@ -1133,5 +1145,5 @@ ControlledDhcpv4Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg, } } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp4/ctrl_dhcp4_srv.h b/src/bin/dhcp4/ctrl_dhcp4_srv.h index 0111c9f7b7..3242028fcd 100644 --- a/src/bin/dhcp4/ctrl_dhcp4_srv.h +++ b/src/bin/dhcp4/ctrl_dhcp4_srv.h @@ -33,7 +33,7 @@ public: uint16_t client_port = 0); /// @brief Destructor. - ~ControlledDhcpv4Srv(); + virtual ~ControlledDhcpv4Srv(); /// @brief Initializes the server. /// @@ -43,12 +43,12 @@ public: /// This method may throw if initialization fails. void init(const std::string& config_file); - /// @brief Loads specific config file + /// @brief Loads specific configuration file /// /// This utility method is called whenever we know a filename of the config /// and need to load it. It calls config-set command once the content of /// the file has been loaded and verified to be a sane JSON configuration. - /// config-set handler will process the config file (load it as current + /// config-set handler will process the config file (apply it as current /// configuration). /// /// @param file_name name of the file to be loaded @@ -121,8 +121,8 @@ public: return (server_); } - private: + /// @brief Callback that will be called from iface_mgr when data /// is received over control socket. /// @@ -249,7 +249,6 @@ private: commandDhcpEnableHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @Brief handler for processing 'version-get' command /// /// This handler processes version-get command, which returns @@ -416,7 +415,7 @@ private: /// @brief Static pointer to the sole instance of the DHCP server. /// /// This is required for config and command handlers to gain access to - /// the server + /// the server. Some of them need to be static methods. static ControlledDhcpv4Srv* server_; /// @brief IOService object, used for all ASIO operations. @@ -429,7 +428,7 @@ private: TimerMgrPtr timer_mgr_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif diff --git a/src/bin/dhcp4/dhcp4_srv.cc b/src/bin/dhcp4/dhcp4_srv.cc index f671fe3ae1..bf1ff3a487 100644 --- a/src/bin/dhcp4/dhcp4_srv.cc +++ b/src/bin/dhcp4/dhcp4_srv.cc @@ -39,7 +39,6 @@ #include #include #include -#include #include #include #include @@ -47,7 +46,6 @@ #include #include #include -#include #include #include #include @@ -80,6 +78,7 @@ using namespace isc::dhcp_ddns; using namespace isc::hooks; using namespace isc::log; using namespace isc::stats; +using namespace isc::util; using namespace std; namespace { @@ -102,8 +101,8 @@ struct Dhcp4Hooks { hook_index_pkt4_receive_ = HooksManager::registerHook("pkt4_receive"); hook_index_subnet4_select_ = HooksManager::registerHook("subnet4_select"); hook_index_leases4_committed_ = HooksManager::registerHook("leases4_committed"); - hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send"); hook_index_lease4_release_ = HooksManager::registerHook("lease4_release"); + hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send"); hook_index_buffer4_send_ = HooksManager::registerHook("buffer4_send"); hook_index_lease4_decline_ = HooksManager::registerHook("lease4_decline"); hook_index_host4_identifier_ = HooksManager::registerHook("host4_identifier"); @@ -209,7 +208,7 @@ Dhcpv4Exchange::Dhcpv4Exchange(const AllocEnginePtr& alloc_engine, .arg(query_->getLabel()) .arg(classes.toText()); } -}; +} void Dhcpv4Exchange::initResponse() { @@ -470,14 +469,15 @@ const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_"); Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port, const bool use_bcast, const bool direct_response_desired) - : io_service_(new IOService()), shutdown_(true), alloc_engine_(), - use_bcast_(use_bcast), server_port_(server_port), - client_port_(client_port), + : io_service_(new IOService()), server_port_(server_port), + client_port_(client_port), shutdown_(true), + alloc_engine_(), use_bcast_(use_bcast), network_state_(new NetworkState(NetworkState::DHCPv4)), cb_control_(new CBControlDHCPv4()) { LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET) .arg(server_port); + try { // Port 0 is used for testing purposes where we don't open broadcast // capable sockets. So, set the packet filter handling direct traffic @@ -801,6 +801,11 @@ Dhcpv4Srv::run() { } } + // destroying the thread pool + if (Dhcpv4Srv::threadCount()) { + pkt_thread_pool_.reset(); + } + return (true); } @@ -811,6 +816,17 @@ Dhcpv4Srv::run_one() { Pkt4Ptr rsp; try { + + // Do not read more packets from socket if there are enough + // packets to be processed in the packet thread pool queue + const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize(); + const auto queue_full_wait = std::chrono::milliseconds(1); + size_t pkt_queue_size = pkt_thread_pool_.count(); + if (pkt_queue_size >= Dhcpv4Srv::threadCount() * + max_queued_pkt_per_thread) { + return; + } + // Set select() timeout to 1s. This value should not be modified // because it is important that the select() returns control // frequently so as the IOService can be polled for ready handlers. @@ -884,9 +900,33 @@ Dhcpv4Srv::run_one() { .arg(query->getLabel()); return; } else { - processPacket(query, rsp); + if (Dhcpv4Srv::threadCount()) { + typedef function CallBack; + boost::shared_ptr call_back = + boost::make_shared(std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow, + this, query, rsp)); + pkt_thread_pool_.add(call_back); + } else { + processPacketAndSendResponse(query, rsp); + } } +} +void +Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) { + try { + processPacketAndSendResponse(query, rsp); + } catch (const std::exception& e) { + LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION) + .arg(e.what()); + } catch (...) { + LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION); + } +} + +void +Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) { + processPacket(query, rsp); if (!rsp) { return; } @@ -3800,6 +3840,23 @@ int Dhcpv4Srv::getHookIndexLease4Decline() { return (Hooks.hook_index_lease4_decline_); } +uint32_t Dhcpv4Srv::threadCount() { + uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount(); + if (sys_threads) { + return sys_threads; + } + sys_threads = std::thread::hardware_concurrency(); + return sys_threads * 0; +} + +uint32_t Dhcpv4Srv::maxThreadQueueSize() { + uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize(); + if (max_thread_queue_size) { + return max_thread_queue_size; + } + return 4; +} + void Dhcpv4Srv::discardPackets() { // Clear any packets held by the callhout handle store and // all parked packets diff --git a/src/bin/dhcp4/dhcp4_srv.h b/src/bin/dhcp4/dhcp4_srv.h index 13265a5e29..6789837bab 100644 --- a/src/bin/dhcp4/dhcp4_srv.h +++ b/src/bin/dhcp4/dhcp4_srv.h @@ -9,23 +9,22 @@ #include #include -#include #include #include #include #include +#include #include #include +#include #include #include -#include #include #include #include #include #include - -#include +#include #include #include @@ -164,12 +163,16 @@ private: /// @brief Pointer to the allocation engine used by the server. AllocEnginePtr alloc_engine_; + /// @brief Pointer to the DHCPv4 message sent by the client. Pkt4Ptr query_; + /// @brief Pointer to the DHCPv4 message to be sent to the client. Pkt4Ptr resp_; + /// @brief Context for use with allocation engine. AllocEngine::ClientContext4Ptr context_; + /// @brief Configured option list. /// @note The configured option list is an *ordered* list of /// @c CfgOption objects used to append options to the response. @@ -234,9 +237,9 @@ public: /// @brief Destructor. Used during DHCPv4 service shutdown. virtual ~Dhcpv4Srv(); - /// @brief Checks if the server is running in a test mode. + /// @brief Checks if the server is running in unit test mode. /// - /// @return true if the server is running in the test mode, + /// @return true if the server is running in unit test mode, /// false otherwise. bool inTestMode() const { return (server_port_ == 0); @@ -265,6 +268,12 @@ public: /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion() static std::string getVersion(bool extended); + /// @brief returns Kea DHCPv4 server thread count. + static uint32_t threadCount(); + + /// @brief returns Kea DHCPv4 server max thread queue size. + static uint32_t maxThreadQueueSize(); + /// @brief Main server processing loop. /// /// Main server processing loop. Call the processing step routine @@ -280,6 +289,24 @@ public: /// a response. void run_one(); + /// @brief Process a single incoming DHCPv4 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp); + + /// @brief Process a single incoming DHCPv4 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp); + /// @brief Process a single incoming DHCPv4 packet. /// /// It verifies correctness of the passed packet, call per-type processXXX @@ -351,7 +378,9 @@ public: NameChangeSender::Result result, dhcp_ddns::NameChangeRequestPtr& ncr); - /// @brief Discard all in-progress packets + /// @brief Discards cached and parked packets + /// Clears the call_handle store and packet parking lots + /// of all packets. Called during reconfigure and shutdown. void discardPackets(); protected: @@ -879,10 +908,6 @@ protected: bool& drop, bool sanity_only = false) const; - /// indicates if shutdown is in progress. Setting it to true will - /// initiate server shutdown procedure. - volatile bool shutdown_; - /// @brief dummy wrapper around IfaceMgr::receive4 /// /// This method is useful for testing purposes, where its replacement @@ -961,11 +986,6 @@ protected: void processPacketBufferSend(hooks::CalloutHandlePtr& callout_handle, Pkt4Ptr& rsp); - /// @brief Allocation Engine. - /// Pointer to the allocation engine that we are currently using - /// It must be a pointer, because we will support changing engines - /// during normal operation (e.g. to use different allocators) - boost::shared_ptr alloc_engine_; private: @@ -984,17 +1004,27 @@ private: /// @return Option that contains netmask information static OptionPtr getNetmaskOption(const Subnet4Ptr& subnet); - /// Should broadcast be enabled on sockets (if true). - bool use_bcast_; - protected: /// UDP port number on which server listens. uint16_t server_port_; - /// UDP port number to which server sends responses. + /// UDP port number to which server sends all responses. uint16_t client_port_; + /// Indicates if shutdown is in progress. Setting it to true will + /// initiate server shutdown procedure. + volatile bool shutdown_; + + /// @brief Allocation Engine. + /// Pointer to the allocation engine that we are currently using + /// It must be a pointer, because we will support changing engines + /// during normal operation (e.g. to use different allocators) + boost::shared_ptr alloc_engine_; + + /// Should broadcast be enabled on sockets (if true). + bool use_bcast_; + /// @brief Holds information about disabled DHCP service and/or /// disabled subnet/network scopes. NetworkStatePtr network_state_; @@ -1002,6 +1032,9 @@ protected: /// @brief Controls access to the configuration backends. CBControlDHCPv4Ptr cb_control_; + /// @brief Packet processing thread pool + isc::util::ThreadPool> pkt_thread_pool_; + public: /// Class methods for DHCPv4-over-DHCPv6 handler @@ -1042,7 +1075,7 @@ public: static int getHookIndexLease4Decline(); }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif // DHCP4_SRV_H diff --git a/src/bin/dhcp4/main.cc b/src/bin/dhcp4/main.cc index a6b70ef803..9c47d36af4 100644 --- a/src/bin/dhcp4/main.cc +++ b/src/bin/dhcp4/main.cc @@ -7,15 +7,16 @@ #include #include +#include #include #include #include #include -#include #include +#include #include #include -#include +#include #include @@ -31,6 +32,9 @@ using namespace std; /// instantiates ControlledDhcpv4Srv class that is responsible for establishing /// connection with msgq (receiving commands and configuration) and also /// creating Dhcpv4 server object as well. +/// +/// For detailed explanation or relations between main(), ControlledDhcpv4Srv, +/// Dhcpv4Srv and other classes, see \ref dhcpv4Session. namespace { @@ -55,9 +59,11 @@ usage() { << "(useful for testing only)" << endl; cerr << " -P number: specify non-standard client port number 1-65535 " << "(useful for testing only)" << endl; + cerr << " -N number: specify thread count 0-65535 " + << "(0 means multi-threading disabled)" << endl; exit(EXIT_FAILURE); } -} // end of anonymous namespace +} // namespace int main(int argc, char* argv[]) { @@ -66,6 +72,8 @@ main(int argc, char* argv[]) { int server_port_number = DHCP4_SERVER_PORT; // Not zero values are useful for testing only. int client_port_number = 0; + // Number of threads. 0 means multi-threading disabled + int thread_count = 0; bool verbose_mode = false; // Should server be verbose? bool check_mode = false; // Check syntax @@ -98,7 +106,7 @@ main(int argc, char* argv[]) { config_file = optarg; break; - case 'p': + case 'p': // server port number try { server_port_number = boost::lexical_cast(optarg); } catch (const boost::bad_lexical_cast &) { @@ -113,7 +121,7 @@ main(int argc, char* argv[]) { } break; - case 'P': + case 'P': // client port number try { client_port_number = boost::lexical_cast(optarg); } catch (const boost::bad_lexical_cast &) { @@ -128,6 +136,21 @@ main(int argc, char* argv[]) { } break; + case 'N': // number of threads + try { + thread_count = boost::lexical_cast(optarg); + } catch (const boost::bad_lexical_cast &) { + cerr << "Failed to parse thread count number: [" << optarg + << "], 0-65535 allowed." << endl; + usage(); + } + if (thread_count < 0 || thread_count > 65535) { + cerr << "Failed to parse thread count number: [" << optarg + << "], 0-65535 allowed." << endl; + usage(); + } + break; + default: usage(); } @@ -138,7 +161,6 @@ main(int argc, char* argv[]) { usage(); } - // Configuration file is required. if (config_file.empty()) { cerr << "Configuration file not specified." << endl; @@ -150,7 +172,6 @@ main(int argc, char* argv[]) { if (check_mode) { try { - // We need to initialize logging, in case any error messages are to be printed. // This is just a test, so we don't care about lockfile. setenv("KEA_LOCKFILE_DIR", "none", 0); diff --git a/src/bin/dhcp6/ctrl_dhcp6_srv.cc b/src/bin/dhcp6/ctrl_dhcp6_srv.cc index 0aab657707..4fde6fe2b0 100644 --- a/src/bin/dhcp6/ctrl_dhcp6_srv.cc +++ b/src/bin/dhcp6/ctrl_dhcp6_srv.cc @@ -5,30 +5,34 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include -#include + #include +#include +#include #include #include -#include -#include #include #include #include #include #include +#include +#include +#include #include #include #include -#include #include + #include using namespace isc::config; -using namespace isc::db; using namespace isc::data; +using namespace isc::db; using namespace isc::dhcp; using namespace isc::hooks; using namespace isc::stats; +using namespace isc::util; using namespace std; namespace { @@ -96,8 +100,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) { // configuration from a JSON file. isc::data::ConstElementPtr json; - isc::data::ConstElementPtr dhcp6; - isc::data::ConstElementPtr logger; isc::data::ConstElementPtr result; // Basic sanity check: file name must not be empty. @@ -160,7 +162,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) { return (result); } - void ControlledDhcpv6Srv::init(const std::string& file_name) { // Keep the call timestamp. @@ -168,6 +169,7 @@ ControlledDhcpv6Srv::init(const std::string& file_name) { // Configure the server using JSON file. ConstElementPtr result = loadConfigFile(file_name); + int rcode; ConstElementPtr comment = isc::config::parseAnswer(rcode, result); if (rcode != 0) { @@ -192,11 +194,10 @@ void ControlledDhcpv6Srv::cleanup() { // Nothing to do here. No need to disconnect from anything. } - ConstElementPtr ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) { - if (ControlledDhcpv6Srv::server_) { - ControlledDhcpv6Srv::server_->shutdown(); + if (ControlledDhcpv6Srv::getInstance()) { + ControlledDhcpv6Srv::getInstance()->shutdown(); } else { LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING); ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure."); @@ -378,6 +379,7 @@ ControlledDhcpv6Srv::commandConfigSetHandler(const string&, isc::config::parseAnswer(rcode, result); if (rcode == CONTROL_RESULT_SUCCESS) { CfgMgr::instance().getStagingCfg()->applyLoggingCfg(); + // Use new configuration. CfgMgr::instance().commit(); } else { @@ -618,6 +620,16 @@ ControlledDhcpv6Srv::processCommand(const std::string& command, return (no_srv); } + if (Dhcpv6Srv::threadCount()) { + if (srv->pkt_thread_pool_.size()) { + srv->pkt_thread_pool_.stop(); + } + MultiThreadingMgr::instance().setMode(true); + srv->pkt_thread_pool_.start(Dhcpv6Srv::threadCount()); + } else { + MultiThreadingMgr::instance().setMode(false); + } + try { if (command == "shutdown") { return (srv->commandShutdownHandler(command, args)); @@ -872,11 +884,11 @@ ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port, uint16_t client_port) : Dhcpv6Srv(server_port, client_port), io_service_(), timer_mgr_(TimerMgr::instance()) { - if (server_) { + if (getInstance()) { isc_throw(InvalidOperation, "There is another Dhcpv6Srv instance already."); } - server_ = this; // remember this instance for use in callback + server_ = this; // remember this instance for later use in handlers // TimerMgr uses IO service to run asynchronous timers. TimerMgr::instance()->setIOService(getIOService()); @@ -1021,8 +1033,8 @@ ControlledDhcpv6Srv::~ControlledDhcpv6Srv() { void ControlledDhcpv6Srv::sessionReader(void) { // Process one asio event. If there are more events, iface_mgr will call // this callback more than once. - if (server_) { - server_->io_service_.run_one(); + if (getInstance()) { + getInstance()->io_service_.run_one(); } } @@ -1061,12 +1073,13 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) { if (reopened) { // Cancel the timer. if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) { - TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); } + TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); + } // Set network state to service enabled network_state_->enableService(); - // Toss the reconnct control, we're done with it + // Toss the reconnect control, we're done with it db_reconnect_ctl.reset(); } else { if (!db_reconnect_ctl->checkRetries()) { @@ -1084,7 +1097,7 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) { if (!TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) { TimerMgr::instance()->registerTimer("Dhcp6DbReconnectTimer", boost::bind(&ControlledDhcpv6Srv::dbReconnect, this, - db_reconnect_ctl), + db_reconnect_ctl), db_reconnect_ctl->retryInterval(), asiolink::IntervalTimer::ONE_SHOT); } @@ -1150,5 +1163,5 @@ ControlledDhcpv6Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg, } } -}; // end of isc::dhcp namespace -}; // end of isc namespace +} // namespace dhcp +} // namespace isc diff --git a/src/bin/dhcp6/ctrl_dhcp6_srv.h b/src/bin/dhcp6/ctrl_dhcp6_srv.h index f4085fab90..951c39a40e 100644 --- a/src/bin/dhcp6/ctrl_dhcp6_srv.h +++ b/src/bin/dhcp6/ctrl_dhcp6_srv.h @@ -65,7 +65,7 @@ public: /// @brief Initiates shutdown procedure for the whole DHCPv6 server. void shutdown(); - /// @brief command processor + /// @brief Command processor /// /// This method is uniform for all config backends. It processes received /// command (as a string + JSON arguments). Internally, it's just a @@ -75,9 +75,9 @@ public: /// Currently supported commands are: /// - config-reload /// - config-test - /// - leases-reclaim - /// - libreload /// - shutdown + /// - libreload + /// - leases-reclaim /// ... /// /// @note It never throws. @@ -89,7 +89,7 @@ public: static isc::data::ConstElementPtr processCommand(const std::string& command, isc::data::ConstElementPtr args); - /// @brief configuration processor + /// @brief Configuration processor /// /// This is a method for handling incoming configuration updates. /// This method should be called by all configuration backends when the @@ -114,7 +114,7 @@ public: isc::data::ConstElementPtr checkConfig(isc::data::ConstElementPtr new_config); - /// @brief returns pointer to the sole instance of Dhcpv6Srv + /// @brief Returns pointer to the sole instance of Dhcpv6Srv /// /// @return server instance (may return NULL, if called before server is spawned) static ControlledDhcpv6Srv* getInstance() { @@ -131,7 +131,7 @@ private: /// (that was sent from some yet unspecified sender). static void sessionReader(void); - /// @brief handler for processing 'shutdown' command + /// @brief Handler for processing 'shutdown' command /// /// This handler processes shutdown command, which initializes shutdown /// procedure. @@ -143,7 +143,7 @@ private: commandShutdownHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @brief handler for processing 'libreload' command + /// @brief Handler for processing 'libreload' command /// /// This handler processes libreload command, which unloads all hook /// libraries and reloads them. @@ -156,7 +156,7 @@ private: commandLibReloadHandler(const std::string& command, isc::data::ConstElementPtr args); - /// @brief handler for processing 'config-reload' command + /// @brief Handler for processing 'config-reload' command /// /// This handler processes config-reload command, which processes /// configuration specified in args parameter. @@ -348,7 +348,6 @@ private: const bool remove_lease, const uint16_t max_unwarned_cycles); - /// @brief Deletes reclaimed leases and reschedules the timer. /// /// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6. @@ -373,6 +372,7 @@ private: /// /// If the maximum number of retries has been exhausted an error is logged /// and the server shuts down. + /// /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the /// configured reconnect parameters /// @@ -394,6 +394,8 @@ private: /// /// @param db_reconnect_ctl pointer to the ReconnectCtl containing the /// configured reconnect parameters + /// + /// @return false if reconnect is not configured, true otherwise bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl); /// @brief Callback invoked periodically to fetch configuration updates @@ -426,7 +428,7 @@ private: TimerMgrPtr timer_mgr_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif diff --git a/src/bin/dhcp6/dhcp6_srv.cc b/src/bin/dhcp6/dhcp6_srv.cc index ef4c272029..a032398dd7 100644 --- a/src/bin/dhcp6/dhcp6_srv.cc +++ b/src/bin/dhcp6/dhcp6_srv.cc @@ -46,7 +46,6 @@ #include #include #include - #include #include #include @@ -472,6 +471,11 @@ bool Dhcpv6Srv::run() { } } + // destroying the thread pool + if (Dhcpv6Srv::threadCount()) { + pkt_thread_pool_.reset(); + } + return (true); } @@ -481,6 +485,17 @@ void Dhcpv6Srv::run_one() { Pkt6Ptr rsp; try { + + // Do not read more packets from socket if there are enough + // packets to be processed in the packet thread pool queue + const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize(); + const auto queue_full_wait = std::chrono::milliseconds(1); + size_t pkt_queue_size = pkt_thread_pool_.count(); + if (pkt_queue_size >= Dhcpv6Srv::threadCount() * + max_queued_pkt_per_thread) { + return; + } + // Set select() timeout to 1s. This value should not be modified // because it is important that the select() returns control // frequently so as the IOService can be polled for ready handlers. @@ -558,9 +573,33 @@ void Dhcpv6Srv::run_one() { .arg(query->getLabel()); return; } else { - processPacket(query, rsp); + if (Dhcpv6Srv::threadCount()) { + typedef function CallBack; + boost::shared_ptr call_back = + boost::make_shared(std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow, + this, query, rsp)); + pkt_thread_pool_.add(call_back); + } else { + processPacketAndSendResponse(query, rsp); + } } +} +void +Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) { + try { + processPacketAndSendResponse(query, rsp); + } catch (const std::exception& e) { + LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION) + .arg(e.what()); + } catch (...) { + LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION); + } +} + +void +Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) { + processPacket(query, rsp); if (!rsp) { return; } @@ -3997,6 +4036,23 @@ Dhcpv6Srv::requestedInORO(const Pkt6Ptr& query, const uint16_t code) const { return (false); } +uint32_t Dhcpv6Srv::threadCount() { + uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount(); + if (sys_threads) { + return sys_threads; + } + sys_threads = std::thread::hardware_concurrency(); + return sys_threads * 0; +} + +uint32_t Dhcpv6Srv::maxThreadQueueSize() { + uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize(); + if (max_thread_queue_size) { + return max_thread_queue_size; + } + return 4; +} + void Dhcpv6Srv::discardPackets() { // Dump all of our current packets, anything that is mid-stream isc::dhcp::Pkt6Ptr pkt6ptr_empty; diff --git a/src/bin/dhcp6/dhcp6_srv.h b/src/bin/dhcp6/dhcp6_srv.h index 80e42a47b1..17c3121f7e 100644 --- a/src/bin/dhcp6/dhcp6_srv.h +++ b/src/bin/dhcp6/dhcp6_srv.h @@ -8,13 +8,15 @@ #define DHCPV6_SRV_H #include -#include #include #include #include +#include #include #include +#include #include +#include #include #include #include @@ -25,6 +27,7 @@ #include #include #include +#include #include #include @@ -51,12 +54,16 @@ public: /// @brief DHCPv6 server service. /// -/// This class represents DHCPv6 server. It contains all +/// This singleton class represents DHCPv6 server. It contains all /// top-level methods and routines necessary for server operation. /// In particular, it instantiates IfaceMgr, loads or generates DUID /// that is going to be used as server-identifier, receives incoming /// packets, processes them, manages leases assignment and generates /// appropriate responses. +/// +/// This class does not support any controlling mechanisms directly. +/// See the derived \ref ControlledDhcpv6Srv class for support for +/// command and configuration updates over msgq. class Dhcpv6Srv : public process::Daemon { private: @@ -79,10 +86,13 @@ public: /// Instantiates necessary services, required to run DHCPv6 server. /// In particular, creates IfaceMgr that will be responsible for /// network interaction. Will instantiate lease manager, and load - /// old or create new DUID. + /// old or create new DUID. It is possible to specify alternate + /// port on which DHCPv6 server will listen on and alternate port + /// where DHCPv6 server sends all responses to. Those are mostly useful + /// for testing purposes. /// - /// @param server_port port on which all sockets will listen - /// @param client_port port to which all responses will be sent + /// @param server_port specifies port number to listen on + /// @param client_port specifies port number to send to Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT, uint16_t client_port = 0); @@ -120,6 +130,12 @@ public: /// redeclaration/redefinition. @ref isc::process::Daemon::getVersion() static std::string getVersion(bool extended); + /// @brief returns Kea DHCPv6 server thread count. + static uint32_t threadCount(); + + /// @brief returns Kea DHCPv6 server max thread queue size. + static uint32_t maxThreadQueueSize(); + /// @brief Returns server-identifier option. /// /// @return server-id option @@ -140,6 +156,24 @@ public: /// a response. void run_one(); + /// @brief Process a single incoming DHCPv6 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp); + + /// @brief Process a single incoming DHCPv6 packet and sends the response. + /// + /// It verifies correctness of the passed packet, call per-type processXXX + /// methods, generates appropriate answer, sends the answer to the client. + /// + /// @param query A pointer to the packet to be processed. + /// @param rsp A pointer to the response + void processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp); + /// @brief Process a single incoming DHCPv6 packet. /// /// It verifies correctness of the passed packet, call per-type processXXX @@ -152,15 +186,21 @@ public: /// @brief Instructs the server to shut down. void shutdown(); + /// + /// @name Public accessors returning values required to (re)open sockets. + /// + //@{ + /// /// @brief Get UDP port on which server should listen. /// - /// Typically, server listens on UDP port 547. Other ports are only - /// used for testing purposes. + /// Typically, server listens on UDP port number 547. Other ports are used + /// for testing purposes only. /// /// @return UDP port on which server should listen. uint16_t getServerPort() const { return (server_port_); } + //@} /// @brief Starts DHCP_DDNS client IO if DDNS updates are enabled. /// @@ -1057,9 +1097,12 @@ protected: /// @brief Controls access to the configuration backends. CBControlDHCPv6Ptr cb_control_; + + /// @brief Packet processing thread pool + isc::util::ThreadPool> pkt_thread_pool_; }; -}; // namespace isc::dhcp -}; // namespace isc +} // namespace dhcp +} // namespace isc #endif // DHCP6_SRV_H diff --git a/src/bin/dhcp6/main.cc b/src/bin/dhcp6/main.cc index 42946db9c6..89b4f46960 100644 --- a/src/bin/dhcp6/main.cc +++ b/src/bin/dhcp6/main.cc @@ -7,15 +7,15 @@ #include #include +#include #include #include #include #include #include +#include #include #include -#include -#include #include #include @@ -37,9 +37,8 @@ using namespace std; /// Dhcpv6Srv and other classes, see \ref dhcpv6Session. namespace { -const char* const DHCP6_NAME = "kea-dhcp6"; -const char* const DHCP6_LOGGER_NAME = "kea-dhcp6"; +const char* const DHCP6_NAME = "kea-dhcp6"; /// @brief Prints Kea Usage and exits /// @@ -60,9 +59,11 @@ usage() { << "(useful for testing only)" << endl; cerr << " -P number: specify non-standard client port number 1-65535 " << "(useful for testing only)" << endl; + cerr << " -N number: specify thread count 0-65535 " + << "(0 means multi-threading disabled)" << endl; exit(EXIT_FAILURE); } -} // end of anonymous namespace +} // namespace int main(int argc, char* argv[]) { @@ -71,6 +72,8 @@ main(int argc, char* argv[]) { int server_port_number = DHCP6_SERVER_PORT; // Not zero values are useful for testing only. int client_port_number = 0; + // Number of threads. 0 means multi-threading disabled + int thread_count = 0; bool verbose_mode = false; // Should server be verbose? bool check_mode = false; // Check syntax @@ -133,6 +136,21 @@ main(int argc, char* argv[]) { } break; + case 'N': // number of threads + try { + thread_count = boost::lexical_cast(optarg); + } catch (const boost::bad_lexical_cast &) { + cerr << "Failed to parse thread count number: [" << optarg + << "], 0-65535 allowed." << endl; + usage(); + } + if (thread_count < 0 || thread_count > 65535) { + cerr << "Failed to parse thread count number: [" << optarg + << "], 0-65535 allowed." << endl; + usage(); + } + break; + default: usage(); } @@ -193,11 +211,8 @@ main(int argc, char* argv[]) { cerr << "Error encountered: " << answer->stringValue() << endl; return (EXIT_FAILURE); } - - - return (EXIT_SUCCESS); } catch (const std::exception& ex) { - cerr << "Syntax check failed with " << ex.what() << endl; + cerr << "Syntax check failed with: " << ex.what() << endl; } return (EXIT_FAILURE); } @@ -207,11 +222,10 @@ main(int argc, char* argv[]) { // It is important that we set a default logger name because this name // will be used when the user doesn't provide the logging configuration // in the Kea configuration file. - Daemon::setDefaultLoggerName(DHCP6_LOGGER_NAME); + Daemon::setDefaultLoggerName(DHCP6_ROOT_LOGGER_NAME); // Initialize logging. If verbose, we'll use maximum verbosity. - Daemon::loggerInit(DHCP6_LOGGER_NAME, verbose_mode); - + Daemon::loggerInit(DHCP6_ROOT_LOGGER_NAME, verbose_mode); LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_START_INFO) .arg(getpid()) .arg(server_port_number) @@ -226,16 +240,14 @@ main(int argc, char* argv[]) { // Remember verbose-mode server.setVerbose(verbose_mode); - // Create our PID file + // Create our PID file. server.setProcName(DHCP6_NAME); server.setConfigFile(config_file); server.createPIDFile(); try { - // Initialize the server, e.g. establish control session - // Read a configuration file + // Initialize the server. server.init(config_file); - } catch (const std::exception& ex) { try { @@ -245,8 +257,8 @@ main(int argc, char* argv[]) { LOG_ERROR(dhcp6_logger, DHCP6_INIT_FAIL).arg(ex.what()); } catch (...) { // The exception thrown during the initialization could - // originate from logger subsystem. Therefore LOG_ERROR() may - // fail as well. + // originate from logger subsystem. Therefore LOG_ERROR() + // may fail as well. cerr << "Failed to initialize server: " << ex.what() << endl; } @@ -277,7 +289,6 @@ main(int argc, char* argv[]) { } ret = EXIT_FAILURE; } catch (const std::exception& ex) { - // First, we print the error on stderr (that should always work) cerr << DHCP6_NAME << "Fatal error during start up: " << ex.what() << endl; diff --git a/src/lib/dhcpsrv/srv_config.cc b/src/lib/dhcpsrv/srv_config.cc index d016bcbecf..b40c25a3ae 100644 --- a/src/lib/dhcpsrv/srv_config.cc +++ b/src/lib/dhcpsrv/srv_config.cc @@ -41,6 +41,8 @@ SrvConfig::SrvConfig() cfg_host_operations6_(CfgHostOperations::createConfig6()), class_dictionary_(new ClientClassDictionary()), decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0), + server_threads_(0), + server_max_thread_queue_size_(0), d2_client_config_(new D2ClientConfig()), configured_globals_(Element::createMap()), cfg_consist_(new CfgConsistency()) { @@ -59,6 +61,8 @@ SrvConfig::SrvConfig(const uint32_t sequence) cfg_host_operations6_(CfgHostOperations::createConfig6()), class_dictionary_(new ClientClassDictionary()), decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0), + server_threads_(0), + server_max_thread_queue_size_(0), d2_client_config_(new D2ClientConfig()), configured_globals_(Element::createMap()), cfg_consist_(new CfgConsistency()) { @@ -253,7 +257,6 @@ SrvConfig::mergeGlobals(SrvConfig& other) { void SrvConfig::removeStatistics() { - // Removes statistics for v4 and v6 subnets getCfgSubnets4()->removeStatistics(); diff --git a/src/lib/dhcpsrv/srv_config.h b/src/lib/dhcpsrv/srv_config.h index efbe06aebf..3107ffeb2e 100644 --- a/src/lib/dhcpsrv/srv_config.h +++ b/src/lib/dhcpsrv/srv_config.h @@ -705,6 +705,34 @@ public: return (dhcp4o6_port_); } + /// @brief Sets the server thread count. + /// + /// @param threads value of the server thread count + void setServerThreadCount(uint32_t threads) { + server_threads_ = threads; + } + + /// @brief Retrieves the server thread count. + /// + /// @return value of the server thread count + uint32_t getServerThreadCount() const { + return (server_threads_); + } + + /// @brief Sets the server max thread queue size. + /// + /// @param size max thread queue size + void setServerMaxThreadQueueSize(uint32_t size) { + server_max_thread_queue_size_ = size; + } + + /// @brief Retrieves the server max thread queue size. + /// + /// @return value of the max thread queue size + uint32_t getServerMaxThreadQueueSize() const { + return (server_max_thread_queue_size_); + } + /// @brief Returns pointer to the D2 client configuration D2ClientConfigPtr getD2ClientConfig() { return (d2_client_config_); @@ -923,6 +951,12 @@ private: /// this socket is bound and connected to this port and port + 1 uint16_t dhcp4o6_port_; + /// @brief The server thread count. + uint32_t server_threads_; + + /// @brief The server max thread queue size. + uint32_t server_max_thread_queue_size_; + /// @brief Stores D2 client configuration D2ClientConfigPtr d2_client_config_; @@ -943,7 +977,7 @@ typedef boost::shared_ptr SrvConfigPtr; typedef boost::shared_ptr ConstSrvConfigPtr; //@} -} // namespace isc::dhcp -} // namespace isc +} // namespace dhcp +} // namespace isc #endif // DHCPSRV_CONFIG_H