mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-09-03 07:25:18 +00:00
[#1219] Made pending_requests_ thread safe
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
#include <http/date_time.h>
|
#include <http/date_time.h>
|
||||||
#include <http/response_json.h>
|
#include <http/response_json.h>
|
||||||
#include <http/post_request_json.h>
|
#include <http/post_request_json.h>
|
||||||
|
#include <util/multi_threading_mgr.h>
|
||||||
#include <util/stopwatch.h>
|
#include <util/stopwatch.h>
|
||||||
#include <boost/pointer_cast.hpp>
|
#include <boost/pointer_cast.hpp>
|
||||||
#include <boost/bind.hpp>
|
#include <boost/bind.hpp>
|
||||||
@@ -51,7 +52,7 @@ HAService::HAService(const IOServicePtr& io_service, const NetworkStatePtr& netw
|
|||||||
const HAConfigPtr& config, const HAServerType& server_type)
|
const HAConfigPtr& config, const HAServerType& server_type)
|
||||||
: io_service_(io_service), network_state_(network_state), config_(config),
|
: io_service_(io_service), network_state_(network_state), config_(config),
|
||||||
server_type_(server_type), client_(*io_service), communication_state_(),
|
server_type_(server_type), client_(*io_service), communication_state_(),
|
||||||
query_filter_(config), pending_requests_() {
|
query_filter_(config), pending_requests_mutex_(), pending_requests_() {
|
||||||
|
|
||||||
if (server_type == HAServerType::DHCPv4) {
|
if (server_type == HAServerType::DHCPv4) {
|
||||||
communication_state_.reset(new CommunicationState4(io_service_, config));
|
communication_state_.reset(new CommunicationState4(io_service_, config));
|
||||||
@@ -913,6 +914,61 @@ HAService::asyncSendLeaseUpdates(const dhcp::Pkt6Ptr& query,
|
|||||||
return (sent_num);
|
return (sent_num);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
bool
|
||||||
|
HAService::leaseUpdateComplete(QueryPtrType& query,
|
||||||
|
const ParkingLotHandlePtr& parking_lot) {
|
||||||
|
if (MultiThreadingMgr::instance().getMode()) {
|
||||||
|
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
|
||||||
|
return (leaseUpdateCompleteInternal(query, parking_lot));
|
||||||
|
} else {
|
||||||
|
return (leaseUpdateCompleteInternal(query, parking_lot));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
bool
|
||||||
|
HAService::leaseUpdateCompleteInternal(QueryPtrType& query,
|
||||||
|
const ParkingLotHandlePtr& parking_lot) {
|
||||||
|
auto it = pending_requests_.find(query);
|
||||||
|
|
||||||
|
// If there are no more pending requests for this query, let's unpark
|
||||||
|
// the DHCP packet.
|
||||||
|
if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) {
|
||||||
|
parking_lot->unpark(query);
|
||||||
|
|
||||||
|
// If we have unparked the packet we can clear pending requests for
|
||||||
|
// this query.
|
||||||
|
if (it != pending_requests_.end()) {
|
||||||
|
pending_requests_.erase(it);
|
||||||
|
}
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
void
|
||||||
|
HAService::updatePendingRequest(QueryPtrType& query) {
|
||||||
|
if (MultiThreadingMgr::instance().getMode()) {
|
||||||
|
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
|
||||||
|
updatePendingRequestInternal(query);
|
||||||
|
} else {
|
||||||
|
updatePendingRequestInternal(query);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
void
|
||||||
|
HAService::updatePendingRequestInternal(QueryPtrType& query) {
|
||||||
|
if (pending_requests_.count(query) == 0) {
|
||||||
|
pending_requests_[query] = 1;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
++pending_requests_[query];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
template<typename QueryPtrType>
|
template<typename QueryPtrType>
|
||||||
void
|
void
|
||||||
HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
|
HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
|
||||||
@@ -1020,19 +1076,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto it = pending_requests_.find(query);
|
if (leaseUpdateComplete(query, parking_lot)) {
|
||||||
|
|
||||||
// If there are no more pending requests for this query, let's unpark
|
|
||||||
// the DHCP packet.
|
|
||||||
if (it == pending_requests_.end() || (--pending_requests_[query] <= 0)) {
|
|
||||||
parking_lot->unpark(query);
|
|
||||||
|
|
||||||
// If we have unparked the packet we can clear pending requests for
|
|
||||||
// this query.
|
|
||||||
if (it != pending_requests_.end()) {
|
|
||||||
pending_requests_.erase(it);
|
|
||||||
}
|
|
||||||
|
|
||||||
// If we have finished sending the lease updates we need to run the
|
// If we have finished sending the lease updates we need to run the
|
||||||
// state machine until the state machine finds that additional events
|
// state machine until the state machine finds that additional events
|
||||||
// are required, such as next heartbeat or a lease update. The runModel()
|
// are required, such as next heartbeat or a lease update. The runModel()
|
||||||
@@ -1052,12 +1096,7 @@ HAService::asyncSendLeaseUpdate(const QueryPtrType& query,
|
|||||||
// a backup increase the number of pending requests.
|
// a backup increase the number of pending requests.
|
||||||
if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) {
|
if (config_->amWaitingBackupAck() || (config->getRole() != HAConfig::PeerConfig::BACKUP)) {
|
||||||
// Request scheduled, so update the request counters for the query.
|
// Request scheduled, so update the request counters for the query.
|
||||||
if (pending_requests_.count(query) == 0) {
|
updatePendingRequest(query);
|
||||||
pending_requests_[query] = 1;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
++pending_requests_[query];
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2200,5 +2239,15 @@ HAService::clientCloseHandler(int tcp_native_fd) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
size_t
|
||||||
|
HAService::pendingRequestSize() {
|
||||||
|
if (MultiThreadingMgr::instance().getMode()) {
|
||||||
|
std::lock_guard<std::mutex> lock(pending_requests_mutex_);
|
||||||
|
return (pending_requests_.size());
|
||||||
|
} else {
|
||||||
|
return (pending_requests_.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // end of namespace isc::ha
|
} // end of namespace isc::ha
|
||||||
} // end of namespace isc
|
} // end of namespace isc
|
||||||
|
@@ -25,6 +25,7 @@
|
|||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <mutex>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
namespace isc {
|
namespace isc {
|
||||||
@@ -376,7 +377,7 @@ private:
|
|||||||
/// This is a generic implementation of the public @c inScope method
|
/// This is a generic implementation of the public @c inScope method
|
||||||
/// variants.
|
/// variants.
|
||||||
///
|
///
|
||||||
/// @tparam type of the pointer to the DHCP query.
|
/// @tparam QueryPtrType type of the pointer to the DHCP query.
|
||||||
/// @param [out] query6 pointer to the DHCP query received. A client class
|
/// @param [out] query6 pointer to the DHCP query received. A client class
|
||||||
/// will be appended to this query instance, appropriate for the server to
|
/// will be appended to this query instance, appropriate for the server to
|
||||||
/// process this query, e.g. "HA_server1" if the "server1" should process
|
/// process this query, e.g. "HA_server1" if the "server1" should process
|
||||||
@@ -917,6 +918,72 @@ protected:
|
|||||||
/// @brief Selects queries to be processed/dropped.
|
/// @brief Selects queries to be processed/dropped.
|
||||||
QueryFilter query_filter_;
|
QueryFilter query_filter_;
|
||||||
|
|
||||||
|
/// @brief Handle last pending request for this query.
|
||||||
|
///
|
||||||
|
/// Search if there are pending requests for this query:
|
||||||
|
/// - if there are decrement the count
|
||||||
|
/// - if there were at least two return false
|
||||||
|
/// - if there was none or one unpark the query
|
||||||
|
/// - if there was one remove the query from the map
|
||||||
|
/// - return true
|
||||||
|
///
|
||||||
|
/// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
|
||||||
|
/// i.e. Pkt4Ptr or Pkt6Ptr.
|
||||||
|
/// @param query Pointer to the DHCP client's query.
|
||||||
|
/// @param [out] parking_lot Parking lot where the query is parked.
|
||||||
|
/// This method uses this handle to unpark the packet when all asynchronous
|
||||||
|
/// requests have been completed.
|
||||||
|
/// @return True when all lease updates are complete, false otherwise.
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
bool leaseUpdateComplete(QueryPtrType& query,
|
||||||
|
const hooks::ParkingLotHandlePtr& parking_lot);
|
||||||
|
|
||||||
|
/// @brief Update pending request counter for this query.
|
||||||
|
///
|
||||||
|
/// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
|
||||||
|
/// i.e. Pkt4Ptr or Pkt6Ptr.
|
||||||
|
/// @param query Pointer to the DHCP client's query.
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
void updatePendingRequest(QueryPtrType& query);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/// @brief Get the number of entries in the pending request map.
|
||||||
|
/// @note Currently for testing purposes only.
|
||||||
|
/// @return Number of entries in the pending request map.
|
||||||
|
size_t pendingRequestSize();
|
||||||
|
|
||||||
|
private:
|
||||||
|
/// @brief Handle last pending request for this query.
|
||||||
|
///
|
||||||
|
/// Search if there are pending requests for this query:
|
||||||
|
/// - if there are decrement the count
|
||||||
|
/// - if there were at least two return false
|
||||||
|
/// - if there was none or one unpark the query
|
||||||
|
/// - if there was one remove the query from the map
|
||||||
|
/// - return true
|
||||||
|
///
|
||||||
|
/// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
|
||||||
|
/// i.e. Pkt4Ptr or Pkt6Ptr.
|
||||||
|
/// @param query Pointer to the DHCP client's query.
|
||||||
|
/// @param [out] parking_lot Parking lot where the query is parked.
|
||||||
|
/// This method uses this handle to unpark the packet when all asynchronous
|
||||||
|
/// requests have been completed.
|
||||||
|
/// @return True when all lease updates are complete, false otherwise.
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
bool leaseUpdateCompleteInternal(QueryPtrType& query,
|
||||||
|
const hooks::ParkingLotHandlePtr& parking_lot);
|
||||||
|
|
||||||
|
/// @brief Update pending request counter for this query.
|
||||||
|
///
|
||||||
|
/// @tparam QueryPtrType Type of the pointer to the DHCP client's message,
|
||||||
|
/// i.e. Pkt4Ptr or Pkt6Ptr.
|
||||||
|
/// @param query Pointer to the DHCP client's query.
|
||||||
|
template<typename QueryPtrType>
|
||||||
|
void updatePendingRequestInternal(QueryPtrType& query);
|
||||||
|
|
||||||
|
/// @brief Mutex to protect the pending_requests_ map.
|
||||||
|
std::mutex pending_requests_mutex_;
|
||||||
|
|
||||||
/// @brief Map holding a number of scheduled requests for a given packet.
|
/// @brief Map holding a number of scheduled requests for a given packet.
|
||||||
///
|
///
|
||||||
/// A single callout may send multiple requests at the same time, e.g.
|
/// A single callout may send multiple requests at the same time, e.g.
|
||||||
|
@@ -189,11 +189,11 @@ public:
|
|||||||
using HAService::transition;
|
using HAService::transition;
|
||||||
using HAService::verboseTransition;
|
using HAService::verboseTransition;
|
||||||
using HAService::shouldSendLeaseUpdates;
|
using HAService::shouldSendLeaseUpdates;
|
||||||
|
using HAService::pendingRequestSize;
|
||||||
using HAService::network_state_;
|
using HAService::network_state_;
|
||||||
using HAService::config_;
|
using HAService::config_;
|
||||||
using HAService::communication_state_;
|
using HAService::communication_state_;
|
||||||
using HAService::query_filter_;
|
using HAService::query_filter_;
|
||||||
using HAService::pending_requests_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/// @brief Pointer to the @c TestHAService.
|
/// @brief Pointer to the @c TestHAService.
|
||||||
@@ -738,7 +738,7 @@ public:
|
|||||||
// Actually perform the lease updates.
|
// Actually perform the lease updates.
|
||||||
ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
|
ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
|
||||||
// Finish running IO service when there are no more pending requests.
|
// Finish running IO service when there are no more pending requests.
|
||||||
return (service.pending_requests_.empty());
|
return (service.pendingRequestSize() == 0);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Only if we wait for lease updates to complete it makes senst to test
|
// Only if we wait for lease updates to complete it makes senst to test
|
||||||
@@ -842,7 +842,7 @@ public:
|
|||||||
// Actually perform the lease updates.
|
// Actually perform the lease updates.
|
||||||
ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
|
ASSERT_NO_THROW(runIOService(TEST_TIMEOUT, [&service]() {
|
||||||
// Finish running IO service when there are no more pending requests.
|
// Finish running IO service when there are no more pending requests.
|
||||||
return (service.pending_requests_.empty());
|
return (service.pendingRequestSize() == 0);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// Only if we wait for lease updates to complete it makes senst to test
|
// Only if we wait for lease updates to complete it makes senst to test
|
||||||
|
Reference in New Issue
Block a user