2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-31 22:15:23 +00:00

[#1148] make NameChangeSender thread safe

This commit is contained in:
Razvan Becheriu
2020-03-17 22:50:04 +02:00
parent 02d802a50d
commit 6f00cb77c1
3 changed files with 757 additions and 52 deletions

View File

@@ -8,12 +8,18 @@
#include <asiolink/asio_wrapper.h>
#include <dhcp_ddns/dhcp_ddns_log.h>
#include <dhcp_ddns/ncr_io.h>
#include <util/multi_threading_mgr.h>
#include <boost/algorithm/string/predicate.hpp>
#include <mutex>
namespace isc {
namespace dhcp_ddns {
using namespace isc::util;
using namespace std;
NameChangeProtocol stringToNcrProtocol(const std::string& protocol_str) {
if (boost::iequals(protocol_str, "UDP")) {
return (NCR_UDP);
@@ -154,7 +160,7 @@ NameChangeListener::invokeRecvHandler(const Result result,
NameChangeSender::NameChangeSender(RequestSendHandler& send_handler,
size_t send_queue_max)
: sending_(false), send_handler_(send_handler),
send_queue_max_(send_queue_max), io_service_(NULL) {
send_queue_max_(send_queue_max), io_service_(NULL), mutex_(new mutex) {
// Queue size must be big enough to hold at least 1 entry.
setQueueMaxSize(send_queue_max);
@@ -167,18 +173,28 @@ NameChangeSender::startSending(isc::asiolink::IOService& io_service) {
isc_throw(NcrSenderError, "NameChangeSender is already sending");
}
// Clear send marker.
ncr_to_send_.reset();
// Call implementation dependent open.
try {
// Remember io service we're given.
io_service_ = &io_service;
open(io_service);
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
startSendingInternal(io_service);
} else {
startSendingInternal(io_service);
}
} catch (const isc::Exception& ex) {
stopSending();
isc_throw(NcrSenderOpenError, "Open failed: " << ex.what());
}
}
void
NameChangeSender::startSendingInternal(isc::asiolink::IOService& io_service) {
// Clear send marker.
ncr_to_send_.reset();
// Remember io service we're given.
io_service_ = &io_service;
open(io_service);
// Set our status to sending.
setSending(true);
@@ -229,10 +245,20 @@ NameChangeSender::sendRequest(NameChangeRequestPtr& ncr) {
isc_throw(NcrSenderError, "request to send is empty");
}
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
sendRequestInternal(ncr);
} else {
sendRequestInternal(ncr);
}
}
void
NameChangeSender::sendRequestInternal(NameChangeRequestPtr& ncr) {
if (send_queue_.size() >= send_queue_max_) {
isc_throw(NcrSenderQueueFull,
"send queue has reached maximum capacity: "
<< send_queue_max_ );
<< send_queue_max_);
}
// Put it on the queue.
@@ -267,6 +293,16 @@ NameChangeSender::sendNext() {
void
NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
invokeSendHandlerInternal(result);
} else {
invokeSendHandlerInternal(result);
}
}
void
NameChangeSender::invokeSendHandlerInternal(const NameChangeSender::Result result) {
// @todo reset defense timer
if (result == SUCCESS) {
// It shipped so pull it off the queue.
@@ -318,6 +354,16 @@ NameChangeSender::invokeSendHandler(const NameChangeSender::Result result) {
void
NameChangeSender::skipNext() {
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
skipNextInternal();
} else {
skipNextInternal();
}
}
void
NameChangeSender::skipNextInternal() {
if (!send_queue_.empty()) {
// Discards the request at the front of the queue.
send_queue_.pop_front();
@@ -330,7 +376,12 @@ NameChangeSender::clearSendQueue() {
isc_throw(NcrSenderError, "Cannot clear queue while sending");
}
send_queue_.clear();
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
send_queue_.clear();
} else {
send_queue_.clear();
}
}
void
@@ -341,19 +392,54 @@ NameChangeSender::setQueueMaxSize(const size_t new_max) {
}
send_queue_max_ = new_max;
}
size_t
NameChangeSender::getQueueSize() const {
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
return (getQueueSizeInternal());
} else {
return (getQueueSizeInternal());
}
}
size_t
NameChangeSender::getQueueSizeInternal() const {
return (send_queue_.size());
}
const NameChangeRequestPtr&
NameChangeSender::peekAt(const size_t index) const {
if (index >= getQueueSize()) {
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
return (peekAtInternal(index));
} else {
return (peekAtInternal(index));
}
}
const NameChangeRequestPtr&
NameChangeSender::peekAtInternal(const size_t index) const {
auto size = getQueueSizeInternal();
if (index >= size) {
isc_throw(NcrSenderError,
"NameChangeSender::peekAt peek beyond end of queue attempted"
<< " index: " << index << " queue size: " << getQueueSize());
<< " index: " << index << " queue size: " << size);
}
return (send_queue_.at(index));
}
bool
NameChangeSender::isSendInProgress() const {
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
return ((ncr_to_send_) ? true : false);
} else {
return ((ncr_to_send_) ? true : false);
}
}
void
NameChangeSender::assumeQueue(NameChangeSender& source_sender) {
@@ -372,6 +458,16 @@ NameChangeSender::assumeQueue(NameChangeSender& source_sender) {
" source queue count exceeds target queue max");
}
if (MultiThreadingMgr::instance().getMode()) {
lock_guard<mutex> lock(*mutex_);
assumeQueueInternal(source_sender);
} else {
assumeQueueInternal(source_sender);
}
}
void
NameChangeSender::assumeQueueInternal(NameChangeSender& source_sender) {
if (!send_queue_.empty()) {
isc_throw(NcrSenderError, "Cannot assume queue:"
" target queue is not empty");
@@ -399,6 +495,5 @@ NameChangeSender::runReadyIO() {
io_service_->get_io_service().poll_one();
}
} // namespace isc::dhcp_ddns
} // namespace isc
} // namespace dhcp_ddns
} // namespace isc