2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-30 13:37:55 +00:00

[#892] create pkt thread pool and handle processing using multiple threads

This commit is contained in:
Razvan Becheriu 2020-02-05 14:15:07 +02:00
parent 694e1af0df
commit 00d6396b3b
12 changed files with 408 additions and 124 deletions

View File

@ -5,30 +5,34 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
#include <cc/data.h>
#include <cc/command_interpreter.h>
#include <cc/data.h>
#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
#include <dhcp/libdhcp++.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/cfg_db_access.h>
#include <dhcp4/ctrl_dhcp4_srv.h>
#include <dhcp4/dhcp4_log.h>
#include <dhcp4/dhcp4to6_ipc.h>
#include <dhcp4/json_config_parser.h>
#include <dhcp4/parser_context.h>
#include <dhcpsrv/cfg_db_access.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/db_type.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <cfgrpt/config_report.h>
#include <signal.h>
#include <sstream>
using namespace isc::config;
using namespace isc::db;
using namespace isc::data;
using namespace isc::db;
using namespace isc::dhcp;
using namespace isc::hooks;
using namespace isc::stats;
using namespace isc::util;
using namespace std;
namespace {
@ -124,8 +128,6 @@ ControlledDhcpv4Srv::loadConfigFile(const std::string& file_name) {
// configuration from a JSON file.
isc::data::ConstElementPtr json;
isc::data::ConstElementPtr dhcp4;
isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
@ -204,7 +206,6 @@ ControlledDhcpv4Srv::commandShutdownHandler(const string&, ConstElementPtr) {
ConstElementPtr
ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
/// @todo delete any stored CalloutHandles referring to the old libraries
/// Get list of currently loaded libraries and reload them.
HookLibsCollection loaded = HooksManager::getLibraryInfo();
@ -223,7 +224,6 @@ ControlledDhcpv4Srv::commandLibReloadHandler(const string&, ConstElementPtr) {
ConstElementPtr
ControlledDhcpv4Srv::commandConfigReloadHandler(const string&,
ConstElementPtr /*args*/) {
// Get configuration file name.
std::string file = ControlledDhcpv4Srv::getInstance()->getConfigFile();
try {
@ -269,6 +269,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
if (filename.empty()) {
// filename parameter was not specified, so let's use whatever we remember
// from the command-line
filename = getConfigFile();
}
@ -303,7 +304,7 @@ ControlledDhcpv4Srv::commandConfigWriteHandler(const string&,
ConstElementPtr
ControlledDhcpv4Srv::commandConfigSetHandler(const string&,
ConstElementPtr args) {
const int status_code = CONTROL_RESULT_ERROR; // 1 indicates an error
const int status_code = CONTROL_RESULT_ERROR;
ConstElementPtr dhcp4;
string message;
@ -618,6 +619,16 @@ ControlledDhcpv4Srv::processCommand(const string& command,
return (no_srv);
}
if (Dhcpv4Srv::threadCount()) {
if (srv->pkt_thread_pool_.size()) {
srv->pkt_thread_pool_.stop();
}
MultiThreadingMgr::instance().setMode(true);
srv->pkt_thread_pool_.start(Dhcpv4Srv::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
@ -776,6 +787,7 @@ ControlledDhcpv4Srv::processConfig(isc::data::ConstElementPtr config) {
return (isc::config::createAnswer(1, err.str()));
}
// Setup config backend polling, if configured for it.
auto ctl_info = CfgMgr::instance().getStagingCfg()->getConfigControlInfo();
if (ctl_info) {
long fetch_time = static_cast<long>(ctl_info->getConfigFetchWaitTime());
@ -966,8 +978,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() {
CommandMgr::instance().deregisterCommand("build-report");
CommandMgr::instance().deregisterCommand("config-backend-pull");
CommandMgr::instance().deregisterCommand("config-get");
CommandMgr::instance().deregisterCommand("config-reload");
CommandMgr::instance().deregisterCommand("config-set");
CommandMgr::instance().deregisterCommand("config-reload");
CommandMgr::instance().deregisterCommand("config-test");
CommandMgr::instance().deregisterCommand("config-write");
CommandMgr::instance().deregisterCommand("dhcp-disable");
@ -995,8 +1007,8 @@ ControlledDhcpv4Srv::~ControlledDhcpv4Srv() {
;
}
server_ = NULL; // forget this instance. Noone should call any handlers at
// this stage.
server_ = NULL; // forget this instance. There should be no callback anymore
// at this stage anyway.
}
void ControlledDhcpv4Srv::sessionReader(void) {
@ -1133,5 +1145,5 @@ ControlledDhcpv4Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg,
}
}
}; // end of isc::dhcp namespace
}; // end of isc namespace
} // namespace dhcp
} // namespace isc

View File

@ -33,7 +33,7 @@ public:
uint16_t client_port = 0);
/// @brief Destructor.
~ControlledDhcpv4Srv();
virtual ~ControlledDhcpv4Srv();
/// @brief Initializes the server.
///
@ -43,12 +43,12 @@ public:
/// This method may throw if initialization fails.
void init(const std::string& config_file);
/// @brief Loads specific config file
/// @brief Loads specific configuration file
///
/// This utility method is called whenever we know a filename of the config
/// and need to load it. It calls config-set command once the content of
/// the file has been loaded and verified to be a sane JSON configuration.
/// config-set handler will process the config file (load it as current
/// config-set handler will process the config file (apply it as current
/// configuration).
///
/// @param file_name name of the file to be loaded
@ -121,8 +121,8 @@ public:
return (server_);
}
private:
/// @brief Callback that will be called from iface_mgr when data
/// is received over control socket.
///
@ -249,7 +249,6 @@ private:
commandDhcpEnableHandler(const std::string& command,
isc::data::ConstElementPtr args);
/// @Brief handler for processing 'version-get' command
///
/// This handler processes version-get command, which returns
@ -416,7 +415,7 @@ private:
/// @brief Static pointer to the sole instance of the DHCP server.
///
/// This is required for config and command handlers to gain access to
/// the server
/// the server. Some of them need to be static methods.
static ControlledDhcpv4Srv* server_;
/// @brief IOService object, used for all ASIO operations.
@ -429,7 +428,7 @@ private:
TimerMgrPtr timer_mgr_;
};
}; // namespace isc::dhcp
}; // namespace isc
} // namespace dhcp
} // namespace isc
#endif

View File

@ -39,7 +39,6 @@
#include <dhcpsrv/subnet.h>
#include <dhcpsrv/subnet_selector.h>
#include <dhcpsrv/utils.h>
#include <dhcpsrv/utils.h>
#include <eval/evaluate.h>
#include <eval/eval_messages.h>
#include <hooks/callout_handle.h>
@ -47,7 +46,6 @@
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <util/strutil.h>
#include <stats/stats_mgr.h>
#include <log/logger.h>
#include <cryptolink/cryptolink.h>
#include <cfgrpt/config_report.h>
@ -80,6 +78,7 @@ using namespace isc::dhcp_ddns;
using namespace isc::hooks;
using namespace isc::log;
using namespace isc::stats;
using namespace isc::util;
using namespace std;
namespace {
@ -102,8 +101,8 @@ struct Dhcp4Hooks {
hook_index_pkt4_receive_ = HooksManager::registerHook("pkt4_receive");
hook_index_subnet4_select_ = HooksManager::registerHook("subnet4_select");
hook_index_leases4_committed_ = HooksManager::registerHook("leases4_committed");
hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send");
hook_index_lease4_release_ = HooksManager::registerHook("lease4_release");
hook_index_pkt4_send_ = HooksManager::registerHook("pkt4_send");
hook_index_buffer4_send_ = HooksManager::registerHook("buffer4_send");
hook_index_lease4_decline_ = HooksManager::registerHook("lease4_decline");
hook_index_host4_identifier_ = HooksManager::registerHook("host4_identifier");
@ -209,7 +208,7 @@ Dhcpv4Exchange::Dhcpv4Exchange(const AllocEnginePtr& alloc_engine,
.arg(query_->getLabel())
.arg(classes.toText());
}
};
}
void
Dhcpv4Exchange::initResponse() {
@ -470,14 +469,15 @@ const std::string Dhcpv4Srv::VENDOR_CLASS_PREFIX("VENDOR_CLASS_");
Dhcpv4Srv::Dhcpv4Srv(uint16_t server_port, uint16_t client_port,
const bool use_bcast, const bool direct_response_desired)
: io_service_(new IOService()), shutdown_(true), alloc_engine_(),
use_bcast_(use_bcast), server_port_(server_port),
client_port_(client_port),
: io_service_(new IOService()), server_port_(server_port),
client_port_(client_port), shutdown_(true),
alloc_engine_(), use_bcast_(use_bcast),
network_state_(new NetworkState(NetworkState::DHCPv4)),
cb_control_(new CBControlDHCPv4()) {
LOG_DEBUG(dhcp4_logger, DBG_DHCP4_START, DHCP4_OPEN_SOCKET)
.arg(server_port);
try {
// Port 0 is used for testing purposes where we don't open broadcast
// capable sockets. So, set the packet filter handling direct traffic
@ -801,6 +801,11 @@ Dhcpv4Srv::run() {
}
}
// destroying the thread pool
if (Dhcpv4Srv::threadCount()) {
pkt_thread_pool_.reset();
}
return (true);
}
@ -811,6 +816,17 @@ Dhcpv4Srv::run_one() {
Pkt4Ptr rsp;
try {
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queued_pkt_per_thread = Dhcpv4Srv::maxThreadQueueSize();
const auto queue_full_wait = std::chrono::milliseconds(1);
size_t pkt_queue_size = pkt_thread_pool_.count();
if (pkt_queue_size >= Dhcpv4Srv::threadCount() *
max_queued_pkt_per_thread) {
return;
}
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
@ -884,9 +900,33 @@ Dhcpv4Srv::run_one() {
.arg(query->getLabel());
return;
} else {
processPacket(query, rsp);
if (Dhcpv4Srv::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv4Srv::processPacketAndSendResponseNoThrow,
this, query, rsp));
pkt_thread_pool_.add(call_back);
} else {
processPacketAndSendResponse(query, rsp);
}
}
}
void
Dhcpv4Srv::processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp) {
try {
processPacketAndSendResponse(query, rsp);
} catch (const std::exception& e) {
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_STD_EXCEPTION)
.arg(e.what());
} catch (...) {
LOG_ERROR(packet4_logger, DHCP4_PACKET_PROCESS_EXCEPTION);
}
}
void
Dhcpv4Srv::processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp) {
processPacket(query, rsp);
if (!rsp) {
return;
}
@ -3800,6 +3840,23 @@ int Dhcpv4Srv::getHookIndexLease4Decline() {
return (Hooks.hook_index_lease4_decline_);
}
uint32_t Dhcpv4Srv::threadCount() {
uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
if (sys_threads) {
return sys_threads;
}
sys_threads = std::thread::hardware_concurrency();
return sys_threads * 0;
}
uint32_t Dhcpv4Srv::maxThreadQueueSize() {
uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
if (max_thread_queue_size) {
return max_thread_queue_size;
}
return 4;
}
void Dhcpv4Srv::discardPackets() {
// Clear any packets held by the callhout handle store and
// all parked packets

View File

@ -9,23 +9,22 @@
#include <asiolink/io_service.h>
#include <dhcp/dhcp4.h>
#include <dhcp/pkt4.h>
#include <dhcp/option.h>
#include <dhcp/option_string.h>
#include <dhcp/option4_client_fqdn.h>
#include <dhcp/option_custom.h>
#include <dhcp/pkt4.h>
#include <dhcp_ddns/ncr_msg.h>
#include <dhcpsrv/alloc_engine.h>
#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/cb_ctl_dhcp4.h>
#include <dhcpsrv/cfg_option.h>
#include <dhcpsrv/callout_handle_store.h>
#include <dhcpsrv/d2_client_mgr.h>
#include <dhcpsrv/network_state.h>
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
#include <boost/noncopyable.hpp>
#include <util/thread_pool.h>
#include <functional>
#include <iostream>
@ -164,12 +163,16 @@ private:
/// @brief Pointer to the allocation engine used by the server.
AllocEnginePtr alloc_engine_;
/// @brief Pointer to the DHCPv4 message sent by the client.
Pkt4Ptr query_;
/// @brief Pointer to the DHCPv4 message to be sent to the client.
Pkt4Ptr resp_;
/// @brief Context for use with allocation engine.
AllocEngine::ClientContext4Ptr context_;
/// @brief Configured option list.
/// @note The configured option list is an *ordered* list of
/// @c CfgOption objects used to append options to the response.
@ -234,9 +237,9 @@ public:
/// @brief Destructor. Used during DHCPv4 service shutdown.
virtual ~Dhcpv4Srv();
/// @brief Checks if the server is running in a test mode.
/// @brief Checks if the server is running in unit test mode.
///
/// @return true if the server is running in the test mode,
/// @return true if the server is running in unit test mode,
/// false otherwise.
bool inTestMode() const {
return (server_port_ == 0);
@ -265,6 +268,12 @@ public:
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
/// @brief returns Kea DHCPv4 server thread count.
static uint32_t threadCount();
/// @brief returns Kea DHCPv4 server max thread queue size.
static uint32_t maxThreadQueueSize();
/// @brief Main server processing loop.
///
/// Main server processing loop. Call the processing step routine
@ -280,6 +289,24 @@ public:
/// a response.
void run_one();
/// @brief Process a single incoming DHCPv4 packet and sends the response.
///
/// It verifies correctness of the passed packet, call per-type processXXX
/// methods, generates appropriate answer, sends the answer to the client.
///
/// @param query A pointer to the packet to be processed.
/// @param rsp A pointer to the response
void processPacketAndSendResponse(Pkt4Ptr& query, Pkt4Ptr& rsp);
/// @brief Process a single incoming DHCPv4 packet and sends the response.
///
/// It verifies correctness of the passed packet, call per-type processXXX
/// methods, generates appropriate answer, sends the answer to the client.
///
/// @param query A pointer to the packet to be processed.
/// @param rsp A pointer to the response
void processPacketAndSendResponseNoThrow(Pkt4Ptr& query, Pkt4Ptr& rsp);
/// @brief Process a single incoming DHCPv4 packet.
///
/// It verifies correctness of the passed packet, call per-type processXXX
@ -351,7 +378,9 @@ public:
NameChangeSender::Result result,
dhcp_ddns::NameChangeRequestPtr& ncr);
/// @brief Discard all in-progress packets
/// @brief Discards cached and parked packets
/// Clears the call_handle store and packet parking lots
/// of all packets. Called during reconfigure and shutdown.
void discardPackets();
protected:
@ -879,10 +908,6 @@ protected:
bool& drop,
bool sanity_only = false) const;
/// indicates if shutdown is in progress. Setting it to true will
/// initiate server shutdown procedure.
volatile bool shutdown_;
/// @brief dummy wrapper around IfaceMgr::receive4
///
/// This method is useful for testing purposes, where its replacement
@ -961,11 +986,6 @@ protected:
void processPacketBufferSend(hooks::CalloutHandlePtr& callout_handle,
Pkt4Ptr& rsp);
/// @brief Allocation Engine.
/// Pointer to the allocation engine that we are currently using
/// It must be a pointer, because we will support changing engines
/// during normal operation (e.g. to use different allocators)
boost::shared_ptr<AllocEngine> alloc_engine_;
private:
@ -984,17 +1004,27 @@ private:
/// @return Option that contains netmask information
static OptionPtr getNetmaskOption(const Subnet4Ptr& subnet);
/// Should broadcast be enabled on sockets (if true).
bool use_bcast_;
protected:
/// UDP port number on which server listens.
uint16_t server_port_;
/// UDP port number to which server sends responses.
/// UDP port number to which server sends all responses.
uint16_t client_port_;
/// Indicates if shutdown is in progress. Setting it to true will
/// initiate server shutdown procedure.
volatile bool shutdown_;
/// @brief Allocation Engine.
/// Pointer to the allocation engine that we are currently using
/// It must be a pointer, because we will support changing engines
/// during normal operation (e.g. to use different allocators)
boost::shared_ptr<AllocEngine> alloc_engine_;
/// Should broadcast be enabled on sockets (if true).
bool use_bcast_;
/// @brief Holds information about disabled DHCP service and/or
/// disabled subnet/network scopes.
NetworkStatePtr network_state_;
@ -1002,6 +1032,9 @@ protected:
/// @brief Controls access to the configuration backends.
CBControlDHCPv4Ptr cb_control_;
/// @brief Packet processing thread pool
isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
public:
/// Class methods for DHCPv4-over-DHCPv6 handler
@ -1042,7 +1075,7 @@ public:
static int getHookIndexLease4Decline();
};
}; // namespace isc::dhcp
}; // namespace isc
} // namespace dhcp
} // namespace isc
#endif // DHCP4_SRV_H

View File

@ -7,15 +7,16 @@
#include <config.h>
#include <kea_version.h>
#include <cfgrpt/config_report.h>
#include <dhcp4/ctrl_dhcp4_srv.h>
#include <dhcp4/dhcp4_log.h>
#include <dhcp4/parser_context.h>
#include <dhcp4/json_config_parser.h>
#include <cc/command_interpreter.h>
#include <dhcpsrv/cfgmgr.h>
#include <exceptions/exceptions.h>
#include <log/logger_support.h>
#include <log/logger_manager.h>
#include <cfgrpt/config_report.h>
#include <process/daemon.h>
#include <boost/lexical_cast.hpp>
@ -31,6 +32,9 @@ using namespace std;
/// instantiates ControlledDhcpv4Srv class that is responsible for establishing
/// connection with msgq (receiving commands and configuration) and also
/// creating Dhcpv4 server object as well.
///
/// For detailed explanation or relations between main(), ControlledDhcpv4Srv,
/// Dhcpv4Srv and other classes, see \ref dhcpv4Session.
namespace {
@ -55,9 +59,11 @@ usage() {
<< "(useful for testing only)" << endl;
cerr << " -P number: specify non-standard client port number 1-65535 "
<< "(useful for testing only)" << endl;
cerr << " -N number: specify thread count 0-65535 "
<< "(0 means multi-threading disabled)" << endl;
exit(EXIT_FAILURE);
}
} // end of anonymous namespace
} // namespace
int
main(int argc, char* argv[]) {
@ -66,6 +72,8 @@ main(int argc, char* argv[]) {
int server_port_number = DHCP4_SERVER_PORT;
// Not zero values are useful for testing only.
int client_port_number = 0;
// Number of threads. 0 means multi-threading disabled
int thread_count = 0;
bool verbose_mode = false; // Should server be verbose?
bool check_mode = false; // Check syntax
@ -98,7 +106,7 @@ main(int argc, char* argv[]) {
config_file = optarg;
break;
case 'p':
case 'p': // server port number
try {
server_port_number = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
@ -113,7 +121,7 @@ main(int argc, char* argv[]) {
}
break;
case 'P':
case 'P': // client port number
try {
client_port_number = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
@ -128,6 +136,21 @@ main(int argc, char* argv[]) {
}
break;
case 'N': // number of threads
try {
thread_count = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
cerr << "Failed to parse thread count number: [" << optarg
<< "], 0-65535 allowed." << endl;
usage();
}
if (thread_count < 0 || thread_count > 65535) {
cerr << "Failed to parse thread count number: [" << optarg
<< "], 0-65535 allowed." << endl;
usage();
}
break;
default:
usage();
}
@ -138,7 +161,6 @@ main(int argc, char* argv[]) {
usage();
}
// Configuration file is required.
if (config_file.empty()) {
cerr << "Configuration file not specified." << endl;
@ -150,7 +172,6 @@ main(int argc, char* argv[]) {
if (check_mode) {
try {
// We need to initialize logging, in case any error messages are to be printed.
// This is just a test, so we don't care about lockfile.
setenv("KEA_LOCKFILE_DIR", "none", 0);

View File

@ -5,30 +5,34 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#include <config.h>
#include <cc/data.h>
#include <cc/command_interpreter.h>
#include <cc/data.h>
#include <cfgrpt/config_report.h>
#include <config/command_mgr.h>
#include <dhcp/libdhcp++.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/cfg_db_access.h>
#include <dhcp6/ctrl_dhcp6_srv.h>
#include <dhcp6/dhcp6_log.h>
#include <dhcp6/dhcp6to4_ipc.h>
#include <dhcp6/json_config_parser.h>
#include <dhcp6/parser_context.h>
#include <dhcpsrv/cfg_db_access.h>
#include <dhcpsrv/cfgmgr.h>
#include <dhcpsrv/db_type.h>
#include <hooks/hooks.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <cfgrpt/config_report.h>
#include <signal.h>
#include <sstream>
using namespace isc::config;
using namespace isc::db;
using namespace isc::data;
using namespace isc::db;
using namespace isc::dhcp;
using namespace isc::hooks;
using namespace isc::stats;
using namespace isc::util;
using namespace std;
namespace {
@ -96,8 +100,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
// configuration from a JSON file.
isc::data::ConstElementPtr json;
isc::data::ConstElementPtr dhcp6;
isc::data::ConstElementPtr logger;
isc::data::ConstElementPtr result;
// Basic sanity check: file name must not be empty.
@ -160,7 +162,6 @@ ControlledDhcpv6Srv::loadConfigFile(const std::string& file_name) {
return (result);
}
void
ControlledDhcpv6Srv::init(const std::string& file_name) {
// Keep the call timestamp.
@ -168,6 +169,7 @@ ControlledDhcpv6Srv::init(const std::string& file_name) {
// Configure the server using JSON file.
ConstElementPtr result = loadConfigFile(file_name);
int rcode;
ConstElementPtr comment = isc::config::parseAnswer(rcode, result);
if (rcode != 0) {
@ -192,11 +194,10 @@ void ControlledDhcpv6Srv::cleanup() {
// Nothing to do here. No need to disconnect from anything.
}
ConstElementPtr
ControlledDhcpv6Srv::commandShutdownHandler(const string&, ConstElementPtr) {
if (ControlledDhcpv6Srv::server_) {
ControlledDhcpv6Srv::server_->shutdown();
if (ControlledDhcpv6Srv::getInstance()) {
ControlledDhcpv6Srv::getInstance()->shutdown();
} else {
LOG_WARN(dhcp6_logger, DHCP6_NOT_RUNNING);
ConstElementPtr answer = isc::config::createAnswer(1, "Shutdown failure.");
@ -378,6 +379,7 @@ ControlledDhcpv6Srv::commandConfigSetHandler(const string&,
isc::config::parseAnswer(rcode, result);
if (rcode == CONTROL_RESULT_SUCCESS) {
CfgMgr::instance().getStagingCfg()->applyLoggingCfg();
// Use new configuration.
CfgMgr::instance().commit();
} else {
@ -618,6 +620,16 @@ ControlledDhcpv6Srv::processCommand(const std::string& command,
return (no_srv);
}
if (Dhcpv6Srv::threadCount()) {
if (srv->pkt_thread_pool_.size()) {
srv->pkt_thread_pool_.stop();
}
MultiThreadingMgr::instance().setMode(true);
srv->pkt_thread_pool_.start(Dhcpv6Srv::threadCount());
} else {
MultiThreadingMgr::instance().setMode(false);
}
try {
if (command == "shutdown") {
return (srv->commandShutdownHandler(command, args));
@ -872,11 +884,11 @@ ControlledDhcpv6Srv::ControlledDhcpv6Srv(uint16_t server_port,
uint16_t client_port)
: Dhcpv6Srv(server_port, client_port), io_service_(),
timer_mgr_(TimerMgr::instance()) {
if (server_) {
if (getInstance()) {
isc_throw(InvalidOperation,
"There is another Dhcpv6Srv instance already.");
}
server_ = this; // remember this instance for use in callback
server_ = this; // remember this instance for later use in handlers
// TimerMgr uses IO service to run asynchronous timers.
TimerMgr::instance()->setIOService(getIOService());
@ -1021,8 +1033,8 @@ ControlledDhcpv6Srv::~ControlledDhcpv6Srv() {
void ControlledDhcpv6Srv::sessionReader(void) {
// Process one asio event. If there are more events, iface_mgr will call
// this callback more than once.
if (server_) {
server_->io_service_.run_one();
if (getInstance()) {
getInstance()->io_service_.run_one();
}
}
@ -1061,12 +1073,13 @@ ControlledDhcpv6Srv::dbReconnect(ReconnectCtlPtr db_reconnect_ctl) {
if (reopened) {
// Cancel the timer.
if (TimerMgr::instance()->isTimerRegistered("Dhcp6DbReconnectTimer")) {
TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer"); }
TimerMgr::instance()->cancel("Dhcp6DbReconnectTimer");
}
// Set network state to service enabled
network_state_->enableService();
// Toss the reconnct control, we're done with it
// Toss the reconnect control, we're done with it
db_reconnect_ctl.reset();
} else {
if (!db_reconnect_ctl->checkRetries()) {
@ -1150,5 +1163,5 @@ ControlledDhcpv6Srv::cbFetchUpdates(const SrvConfigPtr& srv_cfg,
}
}
}; // end of isc::dhcp namespace
}; // end of isc namespace
} // namespace dhcp
} // namespace isc

View File

@ -65,7 +65,7 @@ public:
/// @brief Initiates shutdown procedure for the whole DHCPv6 server.
void shutdown();
/// @brief command processor
/// @brief Command processor
///
/// This method is uniform for all config backends. It processes received
/// command (as a string + JSON arguments). Internally, it's just a
@ -75,9 +75,9 @@ public:
/// Currently supported commands are:
/// - config-reload
/// - config-test
/// - leases-reclaim
/// - libreload
/// - shutdown
/// - libreload
/// - leases-reclaim
/// ...
///
/// @note It never throws.
@ -89,7 +89,7 @@ public:
static isc::data::ConstElementPtr
processCommand(const std::string& command, isc::data::ConstElementPtr args);
/// @brief configuration processor
/// @brief Configuration processor
///
/// This is a method for handling incoming configuration updates.
/// This method should be called by all configuration backends when the
@ -114,7 +114,7 @@ public:
isc::data::ConstElementPtr
checkConfig(isc::data::ConstElementPtr new_config);
/// @brief returns pointer to the sole instance of Dhcpv6Srv
/// @brief Returns pointer to the sole instance of Dhcpv6Srv
///
/// @return server instance (may return NULL, if called before server is spawned)
static ControlledDhcpv6Srv* getInstance() {
@ -131,7 +131,7 @@ private:
/// (that was sent from some yet unspecified sender).
static void sessionReader(void);
/// @brief handler for processing 'shutdown' command
/// @brief Handler for processing 'shutdown' command
///
/// This handler processes shutdown command, which initializes shutdown
/// procedure.
@ -143,7 +143,7 @@ private:
commandShutdownHandler(const std::string& command,
isc::data::ConstElementPtr args);
/// @brief handler for processing 'libreload' command
/// @brief Handler for processing 'libreload' command
///
/// This handler processes libreload command, which unloads all hook
/// libraries and reloads them.
@ -156,7 +156,7 @@ private:
commandLibReloadHandler(const std::string& command,
isc::data::ConstElementPtr args);
/// @brief handler for processing 'config-reload' command
/// @brief Handler for processing 'config-reload' command
///
/// This handler processes config-reload command, which processes
/// configuration specified in args parameter.
@ -348,7 +348,6 @@ private:
const bool remove_lease,
const uint16_t max_unwarned_cycles);
/// @brief Deletes reclaimed leases and reschedules the timer.
///
/// This is a wrapper method for @c AllocEngine::deleteExpiredReclaimed6.
@ -373,6 +372,7 @@ private:
///
/// If the maximum number of retries has been exhausted an error is logged
/// and the server shuts down.
///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
///
@ -394,6 +394,8 @@ private:
///
/// @param db_reconnect_ctl pointer to the ReconnectCtl containing the
/// configured reconnect parameters
///
/// @return false if reconnect is not configured, true otherwise
bool dbLostCallback(db::ReconnectCtlPtr db_reconnect_ctl);
/// @brief Callback invoked periodically to fetch configuration updates
@ -426,7 +428,7 @@ private:
TimerMgrPtr timer_mgr_;
};
}; // namespace isc::dhcp
}; // namespace isc
} // namespace dhcp
} // namespace isc
#endif

View File

@ -46,7 +46,6 @@
#include <hooks/hooks_log.h>
#include <hooks/hooks_manager.h>
#include <stats/stats_mgr.h>
#include <util/encode/hex.h>
#include <util/io_utilities.h>
#include <util/pointer_util.h>
@ -472,6 +471,11 @@ bool Dhcpv6Srv::run() {
}
}
// destroying the thread pool
if (Dhcpv6Srv::threadCount()) {
pkt_thread_pool_.reset();
}
return (true);
}
@ -481,6 +485,17 @@ void Dhcpv6Srv::run_one() {
Pkt6Ptr rsp;
try {
// Do not read more packets from socket if there are enough
// packets to be processed in the packet thread pool queue
const int max_queued_pkt_per_thread = Dhcpv6Srv::maxThreadQueueSize();
const auto queue_full_wait = std::chrono::milliseconds(1);
size_t pkt_queue_size = pkt_thread_pool_.count();
if (pkt_queue_size >= Dhcpv6Srv::threadCount() *
max_queued_pkt_per_thread) {
return;
}
// Set select() timeout to 1s. This value should not be modified
// because it is important that the select() returns control
// frequently so as the IOService can be polled for ready handlers.
@ -558,9 +573,33 @@ void Dhcpv6Srv::run_one() {
.arg(query->getLabel());
return;
} else {
processPacket(query, rsp);
if (Dhcpv6Srv::threadCount()) {
typedef function<void()> CallBack;
boost::shared_ptr<CallBack> call_back =
boost::make_shared<CallBack>(std::bind(&Dhcpv6Srv::processPacketAndSendResponseNoThrow,
this, query, rsp));
pkt_thread_pool_.add(call_back);
} else {
processPacketAndSendResponse(query, rsp);
}
}
}
void
Dhcpv6Srv::processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp) {
try {
processPacketAndSendResponse(query, rsp);
} catch (const std::exception& e) {
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_STD_EXCEPTION)
.arg(e.what());
} catch (...) {
LOG_ERROR(packet6_logger, DHCP6_PACKET_PROCESS_EXCEPTION);
}
}
void
Dhcpv6Srv::processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp) {
processPacket(query, rsp);
if (!rsp) {
return;
}
@ -3997,6 +4036,23 @@ Dhcpv6Srv::requestedInORO(const Pkt6Ptr& query, const uint16_t code) const {
return (false);
}
uint32_t Dhcpv6Srv::threadCount() {
uint32_t sys_threads = CfgMgr::instance().getCurrentCfg()->getServerThreadCount();
if (sys_threads) {
return sys_threads;
}
sys_threads = std::thread::hardware_concurrency();
return sys_threads * 0;
}
uint32_t Dhcpv6Srv::maxThreadQueueSize() {
uint32_t max_thread_queue_size = CfgMgr::instance().getCurrentCfg()->getServerMaxThreadQueueSize();
if (max_thread_queue_size) {
return max_thread_queue_size;
}
return 4;
}
void Dhcpv6Srv::discardPackets() {
// Dump all of our current packets, anything that is mid-stream
isc::dhcp::Pkt6Ptr pkt6ptr_empty;

View File

@ -8,13 +8,15 @@
#define DHCPV6_SRV_H
#include <asiolink/io_service.h>
#include <dhcp_ddns/ncr_msg.h>
#include <dhcp/dhcp6.h>
#include <dhcp/duid.h>
#include <dhcp/option.h>
#include <dhcp/option_string.h>
#include <dhcp/option6_client_fqdn.h>
#include <dhcp/option6_ia.h>
#include <dhcp/option_custom.h>
#include <dhcp/option_definition.h>
#include <dhcp_ddns/ncr_msg.h>
#include <dhcp/pkt6.h>
#include <dhcpsrv/alloc_engine.h>
#include <dhcpsrv/callout_handle_store.h>
@ -25,6 +27,7 @@
#include <dhcpsrv/subnet.h>
#include <hooks/callout_handle.h>
#include <process/daemon.h>
#include <util/thread_pool.h>
#include <functional>
#include <iostream>
@ -51,12 +54,16 @@ public:
/// @brief DHCPv6 server service.
///
/// This class represents DHCPv6 server. It contains all
/// This singleton class represents DHCPv6 server. It contains all
/// top-level methods and routines necessary for server operation.
/// In particular, it instantiates IfaceMgr, loads or generates DUID
/// that is going to be used as server-identifier, receives incoming
/// packets, processes them, manages leases assignment and generates
/// appropriate responses.
///
/// This class does not support any controlling mechanisms directly.
/// See the derived \ref ControlledDhcpv6Srv class for support for
/// command and configuration updates over msgq.
class Dhcpv6Srv : public process::Daemon {
private:
@ -79,10 +86,13 @@ public:
/// Instantiates necessary services, required to run DHCPv6 server.
/// In particular, creates IfaceMgr that will be responsible for
/// network interaction. Will instantiate lease manager, and load
/// old or create new DUID.
/// old or create new DUID. It is possible to specify alternate
/// port on which DHCPv6 server will listen on and alternate port
/// where DHCPv6 server sends all responses to. Those are mostly useful
/// for testing purposes.
///
/// @param server_port port on which all sockets will listen
/// @param client_port port to which all responses will be sent
/// @param server_port specifies port number to listen on
/// @param client_port specifies port number to send to
Dhcpv6Srv(uint16_t server_port = DHCP6_SERVER_PORT,
uint16_t client_port = 0);
@ -120,6 +130,12 @@ public:
/// redeclaration/redefinition. @ref isc::process::Daemon::getVersion()
static std::string getVersion(bool extended);
/// @brief returns Kea DHCPv6 server thread count.
static uint32_t threadCount();
/// @brief returns Kea DHCPv6 server max thread queue size.
static uint32_t maxThreadQueueSize();
/// @brief Returns server-identifier option.
///
/// @return server-id option
@ -140,6 +156,24 @@ public:
/// a response.
void run_one();
/// @brief Process a single incoming DHCPv6 packet and sends the response.
///
/// It verifies correctness of the passed packet, call per-type processXXX
/// methods, generates appropriate answer, sends the answer to the client.
///
/// @param query A pointer to the packet to be processed.
/// @param rsp A pointer to the response
void processPacketAndSendResponse(Pkt6Ptr& query, Pkt6Ptr& rsp);
/// @brief Process a single incoming DHCPv6 packet and sends the response.
///
/// It verifies correctness of the passed packet, call per-type processXXX
/// methods, generates appropriate answer, sends the answer to the client.
///
/// @param query A pointer to the packet to be processed.
/// @param rsp A pointer to the response
void processPacketAndSendResponseNoThrow(Pkt6Ptr& query, Pkt6Ptr& rsp);
/// @brief Process a single incoming DHCPv6 packet.
///
/// It verifies correctness of the passed packet, call per-type processXXX
@ -152,15 +186,21 @@ public:
/// @brief Instructs the server to shut down.
void shutdown();
///
/// @name Public accessors returning values required to (re)open sockets.
///
//@{
///
/// @brief Get UDP port on which server should listen.
///
/// Typically, server listens on UDP port 547. Other ports are only
/// used for testing purposes.
/// Typically, server listens on UDP port number 547. Other ports are used
/// for testing purposes only.
///
/// @return UDP port on which server should listen.
uint16_t getServerPort() const {
return (server_port_);
}
//@}
/// @brief Starts DHCP_DDNS client IO if DDNS updates are enabled.
///
@ -1057,9 +1097,12 @@ protected:
/// @brief Controls access to the configuration backends.
CBControlDHCPv6Ptr cb_control_;
/// @brief Packet processing thread pool
isc::util::ThreadPool<std::function<void()>> pkt_thread_pool_;
};
}; // namespace isc::dhcp
}; // namespace isc
} // namespace dhcp
} // namespace isc
#endif // DHCP6_SRV_H

View File

@ -7,15 +7,15 @@
#include <config.h>
#include <kea_version.h>
#include <cfgrpt/config_report.h>
#include <dhcp6/ctrl_dhcp6_srv.h>
#include <dhcp6/dhcp6_log.h>
#include <dhcp6/parser_context.h>
#include <dhcp6/json_config_parser.h>
#include <dhcpsrv/cfgmgr.h>
#include <exceptions/exceptions.h>
#include <log/logger_support.h>
#include <log/logger_manager.h>
#include <exceptions/exceptions.h>
#include <cfgrpt/config_report.h>
#include <process/daemon.h>
#include <boost/lexical_cast.hpp>
@ -37,9 +37,8 @@ using namespace std;
/// Dhcpv6Srv and other classes, see \ref dhcpv6Session.
namespace {
const char* const DHCP6_NAME = "kea-dhcp6";
const char* const DHCP6_LOGGER_NAME = "kea-dhcp6";
const char* const DHCP6_NAME = "kea-dhcp6";
/// @brief Prints Kea Usage and exits
///
@ -60,9 +59,11 @@ usage() {
<< "(useful for testing only)" << endl;
cerr << " -P number: specify non-standard client port number 1-65535 "
<< "(useful for testing only)" << endl;
cerr << " -N number: specify thread count 0-65535 "
<< "(0 means multi-threading disabled)" << endl;
exit(EXIT_FAILURE);
}
} // end of anonymous namespace
} // namespace
int
main(int argc, char* argv[]) {
@ -71,6 +72,8 @@ main(int argc, char* argv[]) {
int server_port_number = DHCP6_SERVER_PORT;
// Not zero values are useful for testing only.
int client_port_number = 0;
// Number of threads. 0 means multi-threading disabled
int thread_count = 0;
bool verbose_mode = false; // Should server be verbose?
bool check_mode = false; // Check syntax
@ -133,6 +136,21 @@ main(int argc, char* argv[]) {
}
break;
case 'N': // number of threads
try {
thread_count = boost::lexical_cast<int>(optarg);
} catch (const boost::bad_lexical_cast &) {
cerr << "Failed to parse thread count number: [" << optarg
<< "], 0-65535 allowed." << endl;
usage();
}
if (thread_count < 0 || thread_count > 65535) {
cerr << "Failed to parse thread count number: [" << optarg
<< "], 0-65535 allowed." << endl;
usage();
}
break;
default:
usage();
}
@ -193,11 +211,8 @@ main(int argc, char* argv[]) {
cerr << "Error encountered: " << answer->stringValue() << endl;
return (EXIT_FAILURE);
}
return (EXIT_SUCCESS);
} catch (const std::exception& ex) {
cerr << "Syntax check failed with " << ex.what() << endl;
cerr << "Syntax check failed with: " << ex.what() << endl;
}
return (EXIT_FAILURE);
}
@ -207,11 +222,10 @@ main(int argc, char* argv[]) {
// It is important that we set a default logger name because this name
// will be used when the user doesn't provide the logging configuration
// in the Kea configuration file.
Daemon::setDefaultLoggerName(DHCP6_LOGGER_NAME);
Daemon::setDefaultLoggerName(DHCP6_ROOT_LOGGER_NAME);
// Initialize logging. If verbose, we'll use maximum verbosity.
Daemon::loggerInit(DHCP6_LOGGER_NAME, verbose_mode);
Daemon::loggerInit(DHCP6_ROOT_LOGGER_NAME, verbose_mode);
LOG_DEBUG(dhcp6_logger, DBG_DHCP6_START, DHCP6_START_INFO)
.arg(getpid())
.arg(server_port_number)
@ -226,16 +240,14 @@ main(int argc, char* argv[]) {
// Remember verbose-mode
server.setVerbose(verbose_mode);
// Create our PID file
// Create our PID file.
server.setProcName(DHCP6_NAME);
server.setConfigFile(config_file);
server.createPIDFile();
try {
// Initialize the server, e.g. establish control session
// Read a configuration file
// Initialize the server.
server.init(config_file);
} catch (const std::exception& ex) {
try {
@ -245,8 +257,8 @@ main(int argc, char* argv[]) {
LOG_ERROR(dhcp6_logger, DHCP6_INIT_FAIL).arg(ex.what());
} catch (...) {
// The exception thrown during the initialization could
// originate from logger subsystem. Therefore LOG_ERROR() may
// fail as well.
// originate from logger subsystem. Therefore LOG_ERROR()
// may fail as well.
cerr << "Failed to initialize server: " << ex.what() << endl;
}
@ -277,7 +289,6 @@ main(int argc, char* argv[]) {
}
ret = EXIT_FAILURE;
} catch (const std::exception& ex) {
// First, we print the error on stderr (that should always work)
cerr << DHCP6_NAME << "Fatal error during start up: " << ex.what()
<< endl;

View File

@ -41,6 +41,8 @@ SrvConfig::SrvConfig()
cfg_host_operations6_(CfgHostOperations::createConfig6()),
class_dictionary_(new ClientClassDictionary()),
decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0),
server_threads_(0),
server_max_thread_queue_size_(0),
d2_client_config_(new D2ClientConfig()),
configured_globals_(Element::createMap()),
cfg_consist_(new CfgConsistency()) {
@ -59,6 +61,8 @@ SrvConfig::SrvConfig(const uint32_t sequence)
cfg_host_operations6_(CfgHostOperations::createConfig6()),
class_dictionary_(new ClientClassDictionary()),
decline_timer_(0), echo_v4_client_id_(true), dhcp4o6_port_(0),
server_threads_(0),
server_max_thread_queue_size_(0),
d2_client_config_(new D2ClientConfig()),
configured_globals_(Element::createMap()),
cfg_consist_(new CfgConsistency()) {
@ -253,7 +257,6 @@ SrvConfig::mergeGlobals(SrvConfig& other) {
void
SrvConfig::removeStatistics() {
// Removes statistics for v4 and v6 subnets
getCfgSubnets4()->removeStatistics();

View File

@ -705,6 +705,34 @@ public:
return (dhcp4o6_port_);
}
/// @brief Sets the server thread count.
///
/// @param threads value of the server thread count
void setServerThreadCount(uint32_t threads) {
server_threads_ = threads;
}
/// @brief Retrieves the server thread count.
///
/// @return value of the server thread count
uint32_t getServerThreadCount() const {
return (server_threads_);
}
/// @brief Sets the server max thread queue size.
///
/// @param size max thread queue size
void setServerMaxThreadQueueSize(uint32_t size) {
server_max_thread_queue_size_ = size;
}
/// @brief Retrieves the server max thread queue size.
///
/// @return value of the max thread queue size
uint32_t getServerMaxThreadQueueSize() const {
return (server_max_thread_queue_size_);
}
/// @brief Returns pointer to the D2 client configuration
D2ClientConfigPtr getD2ClientConfig() {
return (d2_client_config_);
@ -923,6 +951,12 @@ private:
/// this socket is bound and connected to this port and port + 1
uint16_t dhcp4o6_port_;
/// @brief The server thread count.
uint32_t server_threads_;
/// @brief The server max thread queue size.
uint32_t server_max_thread_queue_size_;
/// @brief Stores D2 client configuration
D2ClientConfigPtr d2_client_config_;
@ -943,7 +977,7 @@ typedef boost::shared_ptr<SrvConfig> SrvConfigPtr;
typedef boost::shared_ptr<const SrvConfig> ConstSrvConfigPtr;
//@}
} // namespace isc::dhcp
} // namespace dhcp
} // namespace isc
#endif // DHCPSRV_CONFIG_H