mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-08-31 14:05:33 +00:00
[(no branch, rebasing 1798-remove-tls-stream-clear-operation)] [#1798] Tried to fix MT issues
This commit is contained in:
@@ -152,14 +152,18 @@ public:
|
||||
/// @brief Checks if a transaction has been initiated over this connection.
|
||||
///
|
||||
/// @return true if transaction has been initiated, false otherwise.
|
||||
bool isTransactionOngoing() const;
|
||||
bool isTransactionOngoing() const {
|
||||
return (started_);
|
||||
}
|
||||
|
||||
/// @brief Checks if the socket has been closed.
|
||||
///
|
||||
/// @return true if the socket has been closed.
|
||||
bool isClosed() const;
|
||||
bool isClosed() const {
|
||||
return (closed_);
|
||||
}
|
||||
|
||||
/// @brief Checks if the peer has closed the socket at its side.
|
||||
/// @brief Checks if the peer has closed the idle socket at its side.
|
||||
///
|
||||
/// If the socket is open but is not usable the peer has closed
|
||||
/// the socket at its side so we close it.
|
||||
@@ -224,6 +228,14 @@ private:
|
||||
/// Should be called in a thread safe context.
|
||||
void closeInternal();
|
||||
|
||||
/// @brief Checks if the peer has closed the socket at its side.
|
||||
///
|
||||
/// Should be called in a thread safe context.
|
||||
///
|
||||
/// If the socket is open but is not usable the peer has closed
|
||||
/// the socket at its side so we close it.
|
||||
void isClosedByPeerInternal();
|
||||
|
||||
/// @brief Checks and logs if premature transaction timeout is suspected.
|
||||
///
|
||||
/// Should be called in a thread safe context.
|
||||
@@ -465,7 +477,7 @@ public:
|
||||
/// @param max_url_connections maximum number of concurrent
|
||||
/// connections allowed per URL.
|
||||
explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
|
||||
: io_service_(io_service), destinations_(), mutex_(),
|
||||
: io_service_(io_service), destinations_(), pool_mutex_(),
|
||||
max_url_connections_(max_url_connections) {
|
||||
}
|
||||
|
||||
@@ -483,7 +495,7 @@ public:
|
||||
/// should be processed.
|
||||
void processNextRequest(const Url& url, const TlsContextPtr& tls_context) {
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
std::lock_guard<std::mutex> lk(pool_mutex_);
|
||||
return (processNextRequestInternal(url, tls_context));
|
||||
} else {
|
||||
return (processNextRequestInternal(url, tls_context));
|
||||
@@ -531,7 +543,7 @@ public:
|
||||
const HttpClient::HandshakeHandler& handshake_callback,
|
||||
const HttpClient::CloseHandler& close_callback) {
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
std::lock_guard<std::mutex> lk(pool_mutex_);
|
||||
return (queueRequestInternal(url, tls_context, request, response,
|
||||
request_timeout, request_callback,
|
||||
connect_callback, handshake_callback,
|
||||
@@ -548,7 +560,7 @@ public:
|
||||
/// the connection pool.
|
||||
void closeAll() {
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
std::lock_guard<std::mutex> lk(pool_mutex_);
|
||||
closeAllInternal();
|
||||
} else {
|
||||
closeAllInternal();
|
||||
@@ -569,7 +581,7 @@ public:
|
||||
/// @param socket_fd socket descriptor to check
|
||||
void closeIfOutOfBand(int socket_fd) {
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
std::lock_guard<std::mutex> lk(pool_mutex_);
|
||||
closeIfOutOfBandInternal(socket_fd);
|
||||
} else {
|
||||
closeIfOutOfBandInternal(socket_fd);
|
||||
@@ -841,6 +853,11 @@ private:
|
||||
/// @brief Closes all connections and clears the list.
|
||||
/// @note This should be called in a thread safe context.
|
||||
void closeAllConnections() {
|
||||
// Flush the queue.
|
||||
while (!queue_.empty()) {
|
||||
queue_.pop();
|
||||
}
|
||||
|
||||
for (auto const& connection : connections_) {
|
||||
connection->close();
|
||||
}
|
||||
@@ -866,14 +883,20 @@ private:
|
||||
/// @brief Finds the first idle connection.
|
||||
///
|
||||
/// Iterates over the existing connections and returns the
|
||||
/// first connection which is not currently in a transaction.
|
||||
/// first connection which is not currently in a transaction and
|
||||
/// is not closed.
|
||||
///
|
||||
/// @note @ref garbageCollectConnections should be called before
|
||||
/// so the closed connections are not scanned (but as a connection
|
||||
/// can have been closed since the garbage collection the flag
|
||||
/// still has to be checked for).
|
||||
///
|
||||
/// @note @ref garbageCollectConnections should be called before.
|
||||
/// @return The first idle connection or an empty pointer if
|
||||
/// all connections are busy.
|
||||
/// all connections are busy or closed.
|
||||
ConnectionPtr getIdleConnection() {
|
||||
for (auto const& connection : connections_) {
|
||||
if (!connection->isTransactionOngoing()) {
|
||||
if (!connection->isTransactionOngoing() &&
|
||||
!connection->isClosed()) {
|
||||
return (connection);
|
||||
}
|
||||
}
|
||||
@@ -1013,6 +1036,8 @@ private:
|
||||
/// discards all of its queued requests while removing
|
||||
/// the destination from the list of known destinations.
|
||||
///
|
||||
/// @todo not used: remove it?
|
||||
///
|
||||
/// @param url URL of the destination to be removed.
|
||||
/// @param tls_context TLS context for the destination to be removed.
|
||||
/// @note Must be called from within a thread-safe context.
|
||||
@@ -1033,7 +1058,7 @@ private:
|
||||
std::map<DestinationDescriptor, DestinationPtr> destinations_;
|
||||
|
||||
/// @brief Mutex to protect the internal state.
|
||||
std::mutex mutex_;
|
||||
std::mutex pool_mutex_;
|
||||
|
||||
/// @brief Maximum number of connections per URL and TLS context.
|
||||
size_t max_url_connections_;
|
||||
@@ -1095,6 +1120,21 @@ Connection::closeCallback(const bool clear) {
|
||||
|
||||
void
|
||||
Connection::isClosedByPeer() {
|
||||
// This method applies only to idle connections.
|
||||
if (started_ || closed_) {
|
||||
return;
|
||||
}
|
||||
// This code was guarded by a lock so keep this.
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
std::lock_guard<std::mutex> lk(mutex_);
|
||||
isClosedByPeerInternal();
|
||||
} else {
|
||||
isClosedByPeerInternal();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
Connection::isClosedByPeerInternal() {
|
||||
// If the socket is open we check if it is possible to transmit
|
||||
// the data over this socket by reading from it with message
|
||||
// peeking. If the socket is not usable, we close it and then
|
||||
@@ -1233,16 +1273,6 @@ Connection::closeInternal() {
|
||||
resetState();
|
||||
}
|
||||
|
||||
bool
|
||||
Connection::isTransactionOngoing() const {
|
||||
return (started_);
|
||||
}
|
||||
|
||||
bool
|
||||
Connection::isClosed() const {
|
||||
return (closed_);
|
||||
}
|
||||
|
||||
bool
|
||||
Connection::isMySocket(int socket_fd) const {
|
||||
if (tcp_socket_) {
|
||||
@@ -1728,7 +1758,14 @@ public:
|
||||
/// @brief Close all connections, and if multi-threaded, stop internal IOService
|
||||
/// and the thread pool.
|
||||
void stop() {
|
||||
// Close all the connections.
|
||||
conn_pool_->closeAll();
|
||||
|
||||
// Stop the multi-threaded service.
|
||||
if (thread_io_service_) {
|
||||
// Flush cancelled (and ready) handlers.
|
||||
thread_io_service_->poll();
|
||||
|
||||
// Stop the private IOService.
|
||||
thread_io_service_->stop();
|
||||
|
||||
@@ -1740,9 +1777,6 @@ public:
|
||||
threads_.clear();
|
||||
}
|
||||
|
||||
// Close all the connections.
|
||||
conn_pool_->closeAll();
|
||||
|
||||
// Get rid of the IOService.
|
||||
thread_io_service_.reset();
|
||||
}
|
||||
|
Reference in New Issue
Block a user