2
0
mirror of https://gitlab.isc.org/isc-projects/kea synced 2025-08-30 13:37:55 +00:00

[#1732] Refactored internal HttpClient classes

src/lib/http/client.cc
    Refactored, connections and request queue are now managed together
    as part of a URL destination.

src/lib/http/url.cc
    Url::operator<(const Url& url) - compares original
    unparsed string rather then reconstructing a new string for
    both operands every time
This commit is contained in:
Thomas Markwalder
2021-03-26 16:43:37 -04:00
parent 804e98ca4b
commit 2825e316a0
3 changed files with 261 additions and 234 deletions

View File

@@ -22,6 +22,7 @@
#include <atomic>
#include <array>
#include <unordered_set>
#include <functional>
#include <iostream>
#include <map>
@@ -452,145 +453,13 @@ private:
/// @brief Shared pointer to the connection.
typedef boost::shared_ptr<Connection> ConnectionPtr;
/// @brief Container of Connections for a given URL
class ConnectionList {
public:
/// @brief Constructor
///
/// @param url URL associated with this connection list.
/// @param max_connections maximum number of connections
/// allowed for in the list URL
ConnectionList(Url url, size_t max_connections)
: url_(url), max_connections_(max_connections) {
}
/// @brief Add a new connection to the list
///
/// @param connection the connection to add
///
/// @throw BadValue if the maximum number of connections already
/// exist.
void addConnection(ConnectionPtr connection) {
if (full()) {
isc_throw(BadValue, "URL: " << url_.toText()
<< ", already at maximum connections: "
<< max_connections_);
}
connections_.push_back(connection);
}
/// @brief Closes a connection and removes it from the list.
///
/// @param connection the connection to remove
void closeConnection(ConnectionPtr connection) {
for (auto it = connections_.begin(); it != connections_.end(); ++it) {
if (*it == connection) {
(*it)->close();
connections_.erase(it);
break;
}
}
}
/// @brief Closes all connections and clears the list.
void closeAllConnections() {
for (auto connection : connections_) {
connection->close();
}
connections_.clear();
}
/// @brief Find the first idle connection.
///
/// Iterates over the existing connections and returns the
/// first connection which is not currently in a transaction.
///
/// @return The first idle connection or an empty pointer if
/// all connections are busy.
ConnectionPtr getIdleConnection() {
for (auto connection : connections_) {
if (!connection->isTransactionOngoing()) {
return(connection);
}
}
return(ConnectionPtr());
}
/// @brief Find a connection by its socket descriptor.
///
/// @param socket_fd socket descriptor to find
///
/// @return The connection or an empty pointer if no matching
/// connection exists.
ConnectionPtr findBySocketFd(int socket_fd) {
for (auto connection : connections_) {
if (connection->isMySocket(socket_fd)) {
return(connection);
}
}
return(ConnectionPtr());
}
/// @brief Indicates if there are no connections in the list.
///
/// @return true if the list is empty.
bool empty() {
return(connections_.empty());
}
/// @brief Indicates if list contains the maximum number.
///
/// @return true if the list is full.
bool full() {
return(connections_.size() >= max_connections_);
}
/// @brief Fetches the number of connections in the list.
///
/// @return the number of connections in the list.
size_t size() {
return connections_.size();
}
/// @brief Fetches the maximum number of connections.
///
/// @return the maxim number of connections.
size_t max_connections() const {
return max_connections_;
}
/// @brief Fetches the URL.
///
/// @return the URL.
const Url& getUrl() const {
return url_;
}
private:
/// @brief URL supported by the list.
Url url_;
/// @brief Maximum number of concurrent connections allowed in the list.
size_t max_connections_;
/// @brief List of concurrent connections.
std::vector<ConnectionPtr> connections_;
};
/// @brief Defines a pointer to a ConnectionList instance.
typedef boost::shared_ptr<ConnectionList> ConnectionListPtr;
/// @brief Connection pool for managing multiple connections.
///
/// Connection pool creates and destroys connections. It holds pointers
/// to all created connections and can verify whether the particular
/// connection is currently busy or idle. If the connection is idle, it
/// uses this connection for new requests. If the connection is busy, it
/// queues new requests until the connection becomes available.
/// Connection pool creates and destroys URL destinations. t manages
/// connections to and requests for URLs. Each time a request is
/// submitted for a URL, it assigns it to an available idle connection,
/// or if no idle connections are available, pushes the request on the queue
/// for that URL.
class ConnectionPool : public boost::enable_shared_from_this<ConnectionPool> {
public:
@@ -601,7 +470,7 @@ public:
/// @param max_url_connections maximum number of concurrent
/// connections allowed per URL.
explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
: io_service_(io_service), conns_(), queue_(), mutex_(),
: io_service_(io_service), destinations_(), mutex_(),
max_url_connections_(max_url_connections) {
}
@@ -667,19 +536,6 @@ public:
}
}
/// @brief Closes a URL's connections and removes associated information
/// from the connection pool.
///
/// @param url URL for which connection should be closed.
void closeUrlConnections(const Url& url) {
if (MultiThreadingMgr::instance().getMode()) {
std::lock_guard<std::mutex> lk(mutex_);
closeUrlConnectionsInternal(url);
} else {
closeUrlConnectionsInternal(url);
}
}
/// @brief Closes all URLS and removes associated information from
/// the connection pool.
void closeAll() {
@@ -722,18 +578,12 @@ private:
void processNextRequestInternal(const Url& url) {
// Check if there is a queue for this URL. If there is no queue, there
// is no request queued either.
auto queue_it = queue_.find(url);
if (queue_it != queue_.end()) {
if (!queue_it->second.empty()) {
DestinationPtr destination = findDestination(url);
if (destination) {
if (!destination->queueEmpty()) {
// We have at least one queued request. Do we have an
// idle connection?
auto conns_it = conns_.find(url);
if (conns_it == conns_.end()) {
isc_throw(Unexpected, "no connection list for :" << url.toText());
}
// Now, look for an idle connection.
ConnectionPtr connection = conns_it->second->getIdleConnection();
ConnectionPtr connection = destination->getIdleConnection();
if (!connection) {
TOMS_TRACE_LOG("*** No idle connections, don't dequeue?");
// @todo Resolve this, throw or just return, possibly log and return
@@ -750,9 +600,7 @@ private:
// Dequeue the oldest request and start a transaction for it using
// the idle connection.
RequestDescriptor desc = queue_it->second.front();
queue_it->second.pop();
desc.conn_ = connection;
RequestDescriptor desc = destination->popNextRequest();
connection->doTransaction(desc.request_, desc.response_,
desc.request_timeout_, desc.callback_,
desc.connect_callback_,
@@ -793,37 +641,35 @@ private:
const HttpClient::ConnectHandler& connect_callback,
const HttpClient::HandshakeHandler& handshake_callback,
const HttpClient::CloseHandler& close_callback) {
// Find the connection list for the url
ConnectionListPtr url_connections;
auto it = conns_.find(url);
if (it != conns_.end()) {
url_connections = it->second;
ConnectionPtr connection;
// Find the destination for the requested URL.
DestinationPtr destination = findDestination(url);
if (destination) {
// Found it, look for an idle connection.
connection = destination->getIdleConnection();
} else {
// Doesn't exist yet, so create it.
url_connections.reset(new ConnectionList(url, max_url_connections_));
conns_[url] = url_connections;
// Doesn't exist yet so it's a new destination/
destination = addDestination(url);
}
// Now, look for an idle connection.
ConnectionPtr connection = url_connections->getIdleConnection();
if (!connection) {
if (url_connections->full()) {
if (destination->connectionsFull()) {
TOMS_TRACE_LOG("no idle connections, queue request");
// All connections busy, queue it.
queue_[url].push(RequestDescriptor(ConnectionPtr(), request, response,
request_timeout,
request_callback,
connect_callback,
handshake_callback,
close_callback));
destination->pushRequest(RequestDescriptor(ConnectionPtr(), request, response,
request_timeout,
request_callback,
connect_callback,
handshake_callback,
close_callback));
return;
}
// Room to make another connection with this destination, so make one.
TOMS_TRACE_LOG("creating a new connection");
connection.reset(new Connection(io_service_, tls_context,
connection.reset(new Connection(io_service_, tls_context,
shared_from_this(), url));
url_connections->addConnection(connection);
destination->addConnection(connection);
}
// Use the connection to start the transaction.
@@ -832,40 +678,16 @@ private:
connect_callback, handshake_callback, close_callback);
}
/// @brief Closes a URL's connections and clears its queue
///
/// This method should be called in a thread safe context.
///
/// @param url URL for which connection should be closed.
void closeUrlConnectionsInternal(const Url& url) {
// Remove requests from the queue.
auto queue_it = queue_.find(url);
if (queue_it != queue_.end()) {
queue_.erase(queue_it);
}
// Close connections for the URL.
auto it = conns_.find(url);
if (it != conns_.end()) {
it->second->closeAllConnections();
}
}
/// @brief Closes all connections for all URLs and removes associated
/// information from the connection pool.
///
/// This method should be called in a thread safe context.
void closeAllInternal() {
// Remove all requests from the queue.
queue_.clear();
// Close connections for each URL.
for (auto it = conns_.begin(); it != conns_.end(); ++it) {
it->second->closeAllConnections();
for (auto const& destination : destinations_) {
destination.second->closeAllConnections();
}
// Remove all of the connections.
conns_.clear();
destinations_.clear();
}
/// @brief Closes a connection if it has an out-of-band socket event
@@ -883,24 +705,21 @@ private:
///
/// @param socket_fd socket descriptor to check
void closeIfOutOfBandInternal(int socket_fd) {
// First we look for a connection with the socket.
for (auto conns_it = conns_.begin(); conns_it != conns_.end();
++conns_it) {
for (auto const& destination : destinations_) {
// First we look for a connection with the socket.
ConnectionPtr connection = destination.second->findBySocketFd(socket_fd);
if (connection) {
if (!connection->isTransactionOngoing()) {
// Socket has no transaction, so any ready event is
// out-of-band (other end probably closed), so
// let's close it. Note we do not remove any queued
// requests, as this might somehow be occurring in
// between them.
destination.second->closeConnection(connection);
}
ConnectionListPtr url_connections = conns_it->second;
ConnectionPtr connection = url_connections->findBySocketFd(socket_fd);
if (connection && connection->isTransactionOngoing()) {
// Matches but is in a transaction, all is well.
return;
}
// Socket has no transaction, so any ready event is
// out-of-band (other end probably closed), so
// let's close it. Note we do not remove any queued
// requests, as this might somehow be occurring in
// between them.
url_connections->closeConnection(connection);
break;
}
}
@@ -961,14 +780,218 @@ private:
HttpClient::CloseHandler close_callback_;
};
/// @brief Encapsulates connections and requests for a given URL
class Destination {
public:
/// @brief Constructor
///
/// @param url server URL of this destination
/// @param max_connections maximum number of concurrent connections
/// allowed for in the list URL
Destination(Url url, size_t max_connections)
: url_(url), max_connections_(max_connections), connections_(), queue_() { }
/// @brief Destructor
~Destination() {
closeAllConnections();
}
/// @brief Add a new connection
///
/// @param connection the connection to add
///
/// @throw BadValue if the maximum number of connections already
/// exist.
void addConnection(ConnectionPtr connection) {
if (connectionsFull()) {
isc_throw(BadValue, "URL: " << url_.toText()
<< ", already at maximum connections: "
<< max_connections_);
}
connections_.push_back(connection);
}
/// @brief Closes a connection and removes it from the list.
///
/// @param connection the connection to remove
void closeConnection(ConnectionPtr connection) {
for (auto it = connections_.begin(); it != connections_.end(); ++it) {
if (*it == connection) {
(*it)->close();
connections_.erase(it);
break;
}
}
}
/// @brief Closes all connections and clears the list.
void closeAllConnections() {
for (auto connection : connections_) {
connection->close();
}
connections_.clear();
}
/// @brief Find the first idle connection.
///
/// Iterates over the existing connections and returns the
/// first connection which is not currently in a transaction.
///
/// @return The first idle connection or an empty pointer if
/// all connections are busy.
ConnectionPtr getIdleConnection() {
for (auto connection : connections_) {
if (!connection->isTransactionOngoing()) {
return(connection);
}
}
return(ConnectionPtr());
}
/// @brief Find a connection by its socket descriptor.
///
/// @param socket_fd socket descriptor to find
///
/// @return The connection or an empty pointer if no matching
/// connection exists.
ConnectionPtr findBySocketFd(int socket_fd) {
for (auto connection : connections_) {
if (connection->isMySocket(socket_fd)) {
return(connection);
}
}
return(ConnectionPtr());
}
/// @brief Indicates if there are no connections in the list.
///
/// @return true if the list is empty.
bool connectionsEmpty() {
return(connections_.empty());
}
/// @brief Indicates if list contains the maximum number.
///
/// @return true if the list is full.
bool connectionsFull() {
return (connections_.size() >= max_connections_);
}
/// @brief Fetches the number of connections in the list.
///
/// @return the number of connections in the list.
size_t connectionCount() {
return (connections_.size());
}
/// @brief Fetches the maximum number of connections.
///
/// @return the maxim number of connections.
size_t getMaxConnections() const {
return (max_connections_);
}
/// @brief Fetches the URL.
///
/// @return the URL.
const Url& getUrl() const {
return (url_);
}
/// @brief Indicates if request queue is empty.
///
/// @return true if there are no requests queued.
bool queueEmpty() const {
return (queue_.empty());
}
/// @brief Adds a request to the end of the request queue.
///
/// @param desc RequestDescriptor to queue.
void pushRequest(RequestDescriptor desc) {
queue_.push(desc);
}
/// @brief Adds a request to the end of the request queue.
///
/// @return desc RequestDescriptor to queue.
RequestDescriptor popNextRequest() {
if (queue_.empty()) {
isc_throw(InvalidOperation, "cannot pop, queue is empty");
}
RequestDescriptor desc = queue_.front();
queue_.pop();
return(desc);
}
private:
/// @brief URL supported by the list.
Url url_;
/// @brief Maximum number of concurrent connections for this destination.
size_t max_connections_;
/// @brief List of concurrent connections.
std::vector<ConnectionPtr> connections_;
/// @brief Holds the queue of request for this destination.
std::queue<RequestDescriptor> queue_;
};
/// @brief Pointer to a Destination.
typedef boost::shared_ptr<Destination> DestinationPtr;
/// @brief Creates a new destination for the given URL.
///
/// @param url URL of the new destination.
///
/// @return Pointer to the newly created destination.
/// @note Must be called from within a thread-safe context.
DestinationPtr addDestination(const Url& url) {
DestinationPtr destination(new Destination(url, max_url_connections_));
destinations_[url] = destination;
return(destination);
}
/// @brief Fetches a destination by URL
///
/// @param url Url of the destination desired.
///
/// @return pointer the desired destination, empty pointer
/// if the destination does not exist.
DestinationPtr findDestination(const Url& url) const {
auto it = destinations_.find(url);
if (it != destinations_.end()) {
return (it->second);
}
return (DestinationPtr());
}
/// @brief Removes a destination by URL
///
/// @param url Url of the destination to be removed.
/// @note Must be called from within a thread-safe context.
void removeDestination(const Url& url) {
// Remove requests from the queue.
auto it = destinations_.find(url);
if (it != destinations_.end()) {
it->second->closeAllConnections();
destinations_.erase(it);
}
}
/// @brief A reference to the IOService that drives socket IO.
IOService& io_service_;
/// @brief Holds lists of connections for each URL.
std::map<Url, ConnectionListPtr> conns_;
/// @brief Holds the queue of requests for each URL.
std::map<Url, std::queue<RequestDescriptor> > queue_;
/// @brief Map of Destinations by URL.
std::map<Url, DestinationPtr> destinations_;
/// @brief Mutex to protect the internal state.
std::mutex mutex_;
@@ -1222,7 +1245,7 @@ Connection::terminate(const boost::system::error_code& ec,
void
Connection::terminateInternal(const boost::system::error_code& ec,
const std::string& parsing_error) {
TOMS_TRACE_LOG("terminate on: " << getSocketFd()
TOMS_TRACE_LOG("terminate on: " << getSocketFd()
<< ", isTransactionOngoing? " << isTransactionOngoing());
HttpResponsePtr response;
@@ -1296,7 +1319,7 @@ Connection::terminateInternal(const boost::system::error_code& ec,
// another transaction if there is at least one.
ConnectionPoolPtr conn_pool = conn_pool_.lock();
if (conn_pool) {
TOMS_TRACE_LOG(" more work on? " << getSocketFd()
TOMS_TRACE_LOG(" more work on? " << getSocketFd()
<< ", isTransactionOngoing? " << isTransactionOngoing());
if (MultiThreadingMgr::instance().getMode()) {
UnlockGuard<std::mutex> lock(mutex_);

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2017-2018 Internet Systems Consortium, Inc. ("ISC")
// Copyright (C) 2017-2021 Internet Systems Consortium, Inc. ("ISC")
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -24,7 +24,7 @@ Url::Url(const std::string& url)
bool
Url::operator<(const Url& url) const {
return (toText() < url.toText());
return (url_ < url.rawUrl());
}
Url::Scheme

View File

@@ -83,6 +83,10 @@ public:
/// @brief Returns textual representation of the URL.
std::string toText() const;
const std::string& rawUrl() const {
return (url_);
}
private:
/// @brief Returns boolean value indicating if the URL is valid.