From 2825e316a0604f6c4740ceb65efd8d96dc9ddcbb Mon Sep 17 00:00:00 2001 From: Thomas Markwalder Date: Fri, 26 Mar 2021 16:43:37 -0400 Subject: [PATCH] [#1732] Refactored internal HttpClient classes src/lib/http/client.cc Refactored, connections and request queue are now managed together as part of a URL destination. src/lib/http/url.cc Url::operator<(const Url& url) - compares original unparsed string rather then reconstructing a new string for both operands every time --- src/lib/http/client.cc | 487 +++++++++++++++++++++-------------------- src/lib/http/url.cc | 4 +- src/lib/http/url.h | 4 + 3 files changed, 261 insertions(+), 234 deletions(-) diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index b0baf631eb..b8411bf817 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -452,145 +453,13 @@ private: /// @brief Shared pointer to the connection. typedef boost::shared_ptr ConnectionPtr; -/// @brief Container of Connections for a given URL -class ConnectionList { -public: - /// @brief Constructor - /// - /// @param url URL associated with this connection list. - /// @param max_connections maximum number of connections - /// allowed for in the list URL - ConnectionList(Url url, size_t max_connections) - : url_(url), max_connections_(max_connections) { - } - - /// @brief Add a new connection to the list - /// - /// @param connection the connection to add - /// - /// @throw BadValue if the maximum number of connections already - /// exist. - void addConnection(ConnectionPtr connection) { - if (full()) { - isc_throw(BadValue, "URL: " << url_.toText() - << ", already at maximum connections: " - << max_connections_); - } - - connections_.push_back(connection); - } - - /// @brief Closes a connection and removes it from the list. - /// - /// @param connection the connection to remove - void closeConnection(ConnectionPtr connection) { - for (auto it = connections_.begin(); it != connections_.end(); ++it) { - if (*it == connection) { - (*it)->close(); - connections_.erase(it); - break; - } - } - } - - /// @brief Closes all connections and clears the list. - void closeAllConnections() { - for (auto connection : connections_) { - connection->close(); - } - - connections_.clear(); - } - - /// @brief Find the first idle connection. - /// - /// Iterates over the existing connections and returns the - /// first connection which is not currently in a transaction. - /// - /// @return The first idle connection or an empty pointer if - /// all connections are busy. - ConnectionPtr getIdleConnection() { - for (auto connection : connections_) { - if (!connection->isTransactionOngoing()) { - return(connection); - } - } - - return(ConnectionPtr()); - } - - /// @brief Find a connection by its socket descriptor. - /// - /// @param socket_fd socket descriptor to find - /// - /// @return The connection or an empty pointer if no matching - /// connection exists. - ConnectionPtr findBySocketFd(int socket_fd) { - for (auto connection : connections_) { - if (connection->isMySocket(socket_fd)) { - return(connection); - } - } - - return(ConnectionPtr()); - } - - /// @brief Indicates if there are no connections in the list. - /// - /// @return true if the list is empty. - bool empty() { - return(connections_.empty()); - } - - /// @brief Indicates if list contains the maximum number. - /// - /// @return true if the list is full. - bool full() { - return(connections_.size() >= max_connections_); - } - - /// @brief Fetches the number of connections in the list. - /// - /// @return the number of connections in the list. - size_t size() { - return connections_.size(); - } - - /// @brief Fetches the maximum number of connections. - /// - /// @return the maxim number of connections. - size_t max_connections() const { - return max_connections_; - } - - /// @brief Fetches the URL. - /// - /// @return the URL. - const Url& getUrl() const { - return url_; - } - -private: - /// @brief URL supported by the list. - Url url_; - - /// @brief Maximum number of concurrent connections allowed in the list. - size_t max_connections_; - - /// @brief List of concurrent connections. - std::vector connections_; -}; - -/// @brief Defines a pointer to a ConnectionList instance. -typedef boost::shared_ptr ConnectionListPtr; - /// @brief Connection pool for managing multiple connections. /// -/// Connection pool creates and destroys connections. It holds pointers -/// to all created connections and can verify whether the particular -/// connection is currently busy or idle. If the connection is idle, it -/// uses this connection for new requests. If the connection is busy, it -/// queues new requests until the connection becomes available. +/// Connection pool creates and destroys URL destinations. t manages +/// connections to and requests for URLs. Each time a request is +/// submitted for a URL, it assigns it to an available idle connection, +/// or if no idle connections are available, pushes the request on the queue +/// for that URL. class ConnectionPool : public boost::enable_shared_from_this { public: @@ -601,7 +470,7 @@ public: /// @param max_url_connections maximum number of concurrent /// connections allowed per URL. explicit ConnectionPool(IOService& io_service, size_t max_url_connections) - : io_service_(io_service), conns_(), queue_(), mutex_(), + : io_service_(io_service), destinations_(), mutex_(), max_url_connections_(max_url_connections) { } @@ -667,19 +536,6 @@ public: } } - /// @brief Closes a URL's connections and removes associated information - /// from the connection pool. - /// - /// @param url URL for which connection should be closed. - void closeUrlConnections(const Url& url) { - if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); - closeUrlConnectionsInternal(url); - } else { - closeUrlConnectionsInternal(url); - } - } - /// @brief Closes all URLS and removes associated information from /// the connection pool. void closeAll() { @@ -722,18 +578,12 @@ private: void processNextRequestInternal(const Url& url) { // Check if there is a queue for this URL. If there is no queue, there // is no request queued either. - auto queue_it = queue_.find(url); - if (queue_it != queue_.end()) { - if (!queue_it->second.empty()) { + DestinationPtr destination = findDestination(url); + if (destination) { + if (!destination->queueEmpty()) { // We have at least one queued request. Do we have an // idle connection? - auto conns_it = conns_.find(url); - if (conns_it == conns_.end()) { - isc_throw(Unexpected, "no connection list for :" << url.toText()); - } - - // Now, look for an idle connection. - ConnectionPtr connection = conns_it->second->getIdleConnection(); + ConnectionPtr connection = destination->getIdleConnection(); if (!connection) { TOMS_TRACE_LOG("*** No idle connections, don't dequeue?"); // @todo Resolve this, throw or just return, possibly log and return @@ -750,9 +600,7 @@ private: // Dequeue the oldest request and start a transaction for it using // the idle connection. - RequestDescriptor desc = queue_it->second.front(); - queue_it->second.pop(); - desc.conn_ = connection; + RequestDescriptor desc = destination->popNextRequest(); connection->doTransaction(desc.request_, desc.response_, desc.request_timeout_, desc.callback_, desc.connect_callback_, @@ -793,37 +641,35 @@ private: const HttpClient::ConnectHandler& connect_callback, const HttpClient::HandshakeHandler& handshake_callback, const HttpClient::CloseHandler& close_callback) { - // Find the connection list for the url - ConnectionListPtr url_connections; - auto it = conns_.find(url); - if (it != conns_.end()) { - url_connections = it->second; + ConnectionPtr connection; + // Find the destination for the requested URL. + DestinationPtr destination = findDestination(url); + if (destination) { + // Found it, look for an idle connection. + connection = destination->getIdleConnection(); } else { - // Doesn't exist yet, so create it. - url_connections.reset(new ConnectionList(url, max_url_connections_)); - conns_[url] = url_connections; + // Doesn't exist yet so it's a new destination/ + destination = addDestination(url); } - // Now, look for an idle connection. - ConnectionPtr connection = url_connections->getIdleConnection(); if (!connection) { - if (url_connections->full()) { + if (destination->connectionsFull()) { TOMS_TRACE_LOG("no idle connections, queue request"); // All connections busy, queue it. - queue_[url].push(RequestDescriptor(ConnectionPtr(), request, response, - request_timeout, - request_callback, - connect_callback, - handshake_callback, - close_callback)); + destination->pushRequest(RequestDescriptor(ConnectionPtr(), request, response, + request_timeout, + request_callback, + connect_callback, + handshake_callback, + close_callback)); return; } // Room to make another connection with this destination, so make one. TOMS_TRACE_LOG("creating a new connection"); - connection.reset(new Connection(io_service_, tls_context, + connection.reset(new Connection(io_service_, tls_context, shared_from_this(), url)); - url_connections->addConnection(connection); + destination->addConnection(connection); } // Use the connection to start the transaction. @@ -832,40 +678,16 @@ private: connect_callback, handshake_callback, close_callback); } - /// @brief Closes a URL's connections and clears its queue - /// - /// This method should be called in a thread safe context. - /// - /// @param url URL for which connection should be closed. - void closeUrlConnectionsInternal(const Url& url) { - // Remove requests from the queue. - auto queue_it = queue_.find(url); - if (queue_it != queue_.end()) { - queue_.erase(queue_it); - } - - // Close connections for the URL. - auto it = conns_.find(url); - if (it != conns_.end()) { - it->second->closeAllConnections(); - } - } - /// @brief Closes all connections for all URLs and removes associated /// information from the connection pool. /// /// This method should be called in a thread safe context. void closeAllInternal() { - // Remove all requests from the queue. - queue_.clear(); - - // Close connections for each URL. - for (auto it = conns_.begin(); it != conns_.end(); ++it) { - it->second->closeAllConnections(); + for (auto const& destination : destinations_) { + destination.second->closeAllConnections(); } - // Remove all of the connections. - conns_.clear(); + destinations_.clear(); } /// @brief Closes a connection if it has an out-of-band socket event @@ -883,24 +705,21 @@ private: /// /// @param socket_fd socket descriptor to check void closeIfOutOfBandInternal(int socket_fd) { - // First we look for a connection with the socket. - for (auto conns_it = conns_.begin(); conns_it != conns_.end(); - ++conns_it) { + for (auto const& destination : destinations_) { + // First we look for a connection with the socket. + ConnectionPtr connection = destination.second->findBySocketFd(socket_fd); + if (connection) { + if (!connection->isTransactionOngoing()) { + // Socket has no transaction, so any ready event is + // out-of-band (other end probably closed), so + // let's close it. Note we do not remove any queued + // requests, as this might somehow be occurring in + // between them. + destination.second->closeConnection(connection); + } - ConnectionListPtr url_connections = conns_it->second; - ConnectionPtr connection = url_connections->findBySocketFd(socket_fd); - if (connection && connection->isTransactionOngoing()) { - // Matches but is in a transaction, all is well. return; } - - // Socket has no transaction, so any ready event is - // out-of-band (other end probably closed), so - // let's close it. Note we do not remove any queued - // requests, as this might somehow be occurring in - // between them. - url_connections->closeConnection(connection); - break; } } @@ -961,14 +780,218 @@ private: HttpClient::CloseHandler close_callback_; }; + /// @brief Encapsulates connections and requests for a given URL + class Destination { + public: + /// @brief Constructor + /// + /// @param url server URL of this destination + /// @param max_connections maximum number of concurrent connections + /// allowed for in the list URL + Destination(Url url, size_t max_connections) + : url_(url), max_connections_(max_connections), connections_(), queue_() { } + + /// @brief Destructor + ~Destination() { + closeAllConnections(); + } + + /// @brief Add a new connection + /// + /// @param connection the connection to add + /// + /// @throw BadValue if the maximum number of connections already + /// exist. + void addConnection(ConnectionPtr connection) { + if (connectionsFull()) { + isc_throw(BadValue, "URL: " << url_.toText() + << ", already at maximum connections: " + << max_connections_); + } + + connections_.push_back(connection); + } + + /// @brief Closes a connection and removes it from the list. + /// + /// @param connection the connection to remove + void closeConnection(ConnectionPtr connection) { + for (auto it = connections_.begin(); it != connections_.end(); ++it) { + if (*it == connection) { + (*it)->close(); + connections_.erase(it); + break; + } + } + } + + /// @brief Closes all connections and clears the list. + void closeAllConnections() { + for (auto connection : connections_) { + connection->close(); + } + + connections_.clear(); + } + + /// @brief Find the first idle connection. + /// + /// Iterates over the existing connections and returns the + /// first connection which is not currently in a transaction. + /// + /// @return The first idle connection or an empty pointer if + /// all connections are busy. + ConnectionPtr getIdleConnection() { + for (auto connection : connections_) { + if (!connection->isTransactionOngoing()) { + return(connection); + } + } + + return(ConnectionPtr()); + } + + /// @brief Find a connection by its socket descriptor. + /// + /// @param socket_fd socket descriptor to find + /// + /// @return The connection or an empty pointer if no matching + /// connection exists. + ConnectionPtr findBySocketFd(int socket_fd) { + for (auto connection : connections_) { + if (connection->isMySocket(socket_fd)) { + return(connection); + } + } + + return(ConnectionPtr()); + } + + /// @brief Indicates if there are no connections in the list. + /// + /// @return true if the list is empty. + bool connectionsEmpty() { + return(connections_.empty()); + } + + /// @brief Indicates if list contains the maximum number. + /// + /// @return true if the list is full. + bool connectionsFull() { + return (connections_.size() >= max_connections_); + } + + /// @brief Fetches the number of connections in the list. + /// + /// @return the number of connections in the list. + size_t connectionCount() { + return (connections_.size()); + } + + /// @brief Fetches the maximum number of connections. + /// + /// @return the maxim number of connections. + size_t getMaxConnections() const { + return (max_connections_); + } + + /// @brief Fetches the URL. + /// + /// @return the URL. + const Url& getUrl() const { + return (url_); + } + + /// @brief Indicates if request queue is empty. + /// + /// @return true if there are no requests queued. + bool queueEmpty() const { + return (queue_.empty()); + + } + + /// @brief Adds a request to the end of the request queue. + /// + /// @param desc RequestDescriptor to queue. + void pushRequest(RequestDescriptor desc) { + queue_.push(desc); + } + + /// @brief Adds a request to the end of the request queue. + /// + /// @return desc RequestDescriptor to queue. + RequestDescriptor popNextRequest() { + if (queue_.empty()) { + isc_throw(InvalidOperation, "cannot pop, queue is empty"); + } + + RequestDescriptor desc = queue_.front(); + queue_.pop(); + return(desc); + } + + private: + /// @brief URL supported by the list. + Url url_; + + /// @brief Maximum number of concurrent connections for this destination. + size_t max_connections_; + + /// @brief List of concurrent connections. + std::vector connections_; + + /// @brief Holds the queue of request for this destination. + std::queue queue_; + }; + + /// @brief Pointer to a Destination. + typedef boost::shared_ptr DestinationPtr; + + /// @brief Creates a new destination for the given URL. + /// + /// @param url URL of the new destination. + /// + /// @return Pointer to the newly created destination. + /// @note Must be called from within a thread-safe context. + DestinationPtr addDestination(const Url& url) { + DestinationPtr destination(new Destination(url, max_url_connections_)); + destinations_[url] = destination; + return(destination); + } + + /// @brief Fetches a destination by URL + /// + /// @param url Url of the destination desired. + /// + /// @return pointer the desired destination, empty pointer + /// if the destination does not exist. + DestinationPtr findDestination(const Url& url) const { + auto it = destinations_.find(url); + if (it != destinations_.end()) { + return (it->second); + } + + return (DestinationPtr()); + } + + /// @brief Removes a destination by URL + /// + /// @param url Url of the destination to be removed. + /// @note Must be called from within a thread-safe context. + void removeDestination(const Url& url) { + // Remove requests from the queue. + auto it = destinations_.find(url); + if (it != destinations_.end()) { + it->second->closeAllConnections(); + destinations_.erase(it); + } + } + /// @brief A reference to the IOService that drives socket IO. IOService& io_service_; - /// @brief Holds lists of connections for each URL. - std::map conns_; - - /// @brief Holds the queue of requests for each URL. - std::map > queue_; + /// @brief Map of Destinations by URL. + std::map destinations_; /// @brief Mutex to protect the internal state. std::mutex mutex_; @@ -1222,7 +1245,7 @@ Connection::terminate(const boost::system::error_code& ec, void Connection::terminateInternal(const boost::system::error_code& ec, const std::string& parsing_error) { - TOMS_TRACE_LOG("terminate on: " << getSocketFd() + TOMS_TRACE_LOG("terminate on: " << getSocketFd() << ", isTransactionOngoing? " << isTransactionOngoing()); HttpResponsePtr response; @@ -1296,7 +1319,7 @@ Connection::terminateInternal(const boost::system::error_code& ec, // another transaction if there is at least one. ConnectionPoolPtr conn_pool = conn_pool_.lock(); if (conn_pool) { - TOMS_TRACE_LOG(" more work on? " << getSocketFd() + TOMS_TRACE_LOG(" more work on? " << getSocketFd() << ", isTransactionOngoing? " << isTransactionOngoing()); if (MultiThreadingMgr::instance().getMode()) { UnlockGuard lock(mutex_); diff --git a/src/lib/http/url.cc b/src/lib/http/url.cc index d7b161d28c..427b11afc7 100644 --- a/src/lib/http/url.cc +++ b/src/lib/http/url.cc @@ -1,4 +1,4 @@ -// Copyright (C) 2017-2018 Internet Systems Consortium, Inc. ("ISC") +// Copyright (C) 2017-2021 Internet Systems Consortium, Inc. ("ISC") // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this @@ -24,7 +24,7 @@ Url::Url(const std::string& url) bool Url::operator<(const Url& url) const { - return (toText() < url.toText()); + return (url_ < url.rawUrl()); } Url::Scheme diff --git a/src/lib/http/url.h b/src/lib/http/url.h index c96641b120..a9e80694bb 100644 --- a/src/lib/http/url.h +++ b/src/lib/http/url.h @@ -83,6 +83,10 @@ public: /// @brief Returns textual representation of the URL. std::string toText() const; + const std::string& rawUrl() const { + return (url_); + } + private: /// @brief Returns boolean value indicating if the URL is valid.