mirror of
https://gitlab.isc.org/isc-projects/kea
synced 2025-08-30 13:37:55 +00:00
[#1732] Addressed review comments
Largely cosmetic and minor code clean ups: modified: src/lib/http/client.* src/lib/http/client.h src/lib/http/url.h src/lib/http/tests/mt_client_unittests.cc
This commit is contained in:
parent
948d709a8a
commit
cb1e7b9ce3
@ -371,21 +371,6 @@ private:
|
||||
/// after invocation. Defaults to false.
|
||||
void closeCallback(const bool clear = false);
|
||||
|
||||
/// @brief Fetches the current socket descriptor, if one.
|
||||
///
|
||||
/// @return The socket descriptor or -1.
|
||||
int getSocketFd() const {
|
||||
int fd = -1;
|
||||
|
||||
if (tcp_socket_) {
|
||||
fd = tcp_socket_->getNative();
|
||||
} else if (tls_socket_) {
|
||||
fd = tls_socket_->getNative();
|
||||
}
|
||||
|
||||
return (fd);
|
||||
}
|
||||
|
||||
/// @brief Pointer to the connection pool owning this connection.
|
||||
///
|
||||
/// This is a weak pointer to avoid circular dependency between the
|
||||
@ -443,7 +428,7 @@ typedef boost::shared_ptr<Connection> ConnectionPtr;
|
||||
|
||||
/// @brief Connection pool for managing multiple connections.
|
||||
///
|
||||
/// Connection pool creates and destroys URL destinations. t manages
|
||||
/// Connection pool creates and destroys URL destinations. It manages
|
||||
/// connections to and requests for URLs. Each time a request is
|
||||
/// submitted for a URL, it assigns it to an available idle connection,
|
||||
/// or if no idle connections are available, pushes the request on the queue
|
||||
@ -459,7 +444,7 @@ public:
|
||||
/// connections allowed per URL.
|
||||
explicit ConnectionPool(IOService& io_service, size_t max_url_connections)
|
||||
: io_service_(io_service), destinations_(), mutex_(),
|
||||
max_url_connections_(max_url_connections) {
|
||||
max_url_connections_(max_url_connections) {
|
||||
}
|
||||
|
||||
/// @brief Destructor.
|
||||
@ -524,7 +509,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
/// @brief Closes all URLS and removes associated information from
|
||||
/// @brief Closes all URLs and removes associated information from
|
||||
/// the connection pool.
|
||||
void closeAll() {
|
||||
if (MultiThreadingMgr::instance().getMode()) {
|
||||
@ -573,15 +558,7 @@ private:
|
||||
// idle connection?
|
||||
ConnectionPtr connection = destination->getIdleConnection();
|
||||
if (!connection) {
|
||||
// @todo Resolve this, throw or just return, possibly log and return
|
||||
//
|
||||
// We shouldn't be in this function w/o an idle connection as it is called
|
||||
// from by terminate() after completion of a transaction? It should not be
|
||||
// possible for the connection that got us here to not be busy.
|
||||
// Do we throw or just not dequeue ther request? It was TSAN tested and
|
||||
// perf tested with just the return.
|
||||
// isc_throw(Unexpected, "no idle connections for :" << url.toText());
|
||||
// Let's leave it on the queue, nothing idle yet?
|
||||
// No idle connections, so just return.
|
||||
return;
|
||||
}
|
||||
|
||||
@ -642,7 +619,7 @@ private:
|
||||
if (!connection) {
|
||||
if (destination->connectionsFull()) {
|
||||
// All connections busy, queue it.
|
||||
destination->pushRequest(RequestDescriptor(ConnectionPtr(), request, response,
|
||||
destination->pushRequest(RequestDescriptor(request, response,
|
||||
request_timeout,
|
||||
request_callback,
|
||||
connect_callback,
|
||||
@ -712,7 +689,6 @@ private:
|
||||
struct RequestDescriptor {
|
||||
/// @brief Constructor.
|
||||
///
|
||||
/// @param conn Pointer to the connection.
|
||||
/// @param request Pointer to the request to be sent.
|
||||
/// @param response Pointer to the object into which the response will
|
||||
/// be stored.
|
||||
@ -724,24 +700,20 @@ private:
|
||||
/// performs the TLS handshake with the server.
|
||||
/// @param close_callback pointer to the user callback to be invoked
|
||||
/// when the client closes the connection to the server.
|
||||
RequestDescriptor(const ConnectionPtr& conn,
|
||||
const HttpRequestPtr& request,
|
||||
RequestDescriptor(const HttpRequestPtr& request,
|
||||
const HttpResponsePtr& response,
|
||||
const long& request_timeout,
|
||||
const HttpClient::RequestHandler& callback,
|
||||
const HttpClient::ConnectHandler& connect_callback,
|
||||
const HttpClient::HandshakeHandler& handshake_callback,
|
||||
const HttpClient::CloseHandler& close_callback)
|
||||
: conn_(conn), request_(request), response_(response),
|
||||
: request_(request), response_(response),
|
||||
request_timeout_(request_timeout), callback_(callback),
|
||||
connect_callback_(connect_callback),
|
||||
handshake_callback_(handshake_callback),
|
||||
close_callback_(close_callback) {
|
||||
}
|
||||
|
||||
/// @brief Holds the connection.
|
||||
ConnectionPtr conn_;
|
||||
|
||||
/// @brief Holds pointer to the request.
|
||||
HttpRequestPtr request_;
|
||||
|
||||
@ -780,12 +752,13 @@ private:
|
||||
closeAllConnections();
|
||||
}
|
||||
|
||||
/// @brief Add a new connection
|
||||
/// @brief Adds a new connection
|
||||
///
|
||||
/// @param connection the connection to add
|
||||
///
|
||||
/// @throw BadValue if the maximum number of connections already
|
||||
/// exist.
|
||||
/// @note This should be called in a thread safe context.
|
||||
void addConnection(ConnectionPtr connection) {
|
||||
if (connectionsFull()) {
|
||||
isc_throw(BadValue, "URL: " << url_.toText()
|
||||
@ -799,6 +772,7 @@ private:
|
||||
/// @brief Closes a connection and removes it from the list.
|
||||
///
|
||||
/// @param connection the connection to remove
|
||||
/// @note This should be called in a thread safe context.
|
||||
void closeConnection(ConnectionPtr connection) {
|
||||
for (auto it = connections_.begin(); it != connections_.end(); ++it) {
|
||||
if (*it == connection) {
|
||||
@ -810,15 +784,16 @@ private:
|
||||
}
|
||||
|
||||
/// @brief Closes all connections and clears the list.
|
||||
/// @note This should be called in a thread safe context.
|
||||
void closeAllConnections() {
|
||||
for (auto connection : connections_) {
|
||||
for (auto const& connection : connections_) {
|
||||
connection->close();
|
||||
}
|
||||
|
||||
connections_.clear();
|
||||
}
|
||||
|
||||
/// @brief Find the first idle connection.
|
||||
/// @brief Finds the first idle connection.
|
||||
///
|
||||
/// Iterates over the existing connections and returns the
|
||||
/// first connection which is not currently in a transaction.
|
||||
@ -826,13 +801,13 @@ private:
|
||||
/// @return The first idle connection or an empty pointer if
|
||||
/// all connections are busy.
|
||||
ConnectionPtr getIdleConnection() {
|
||||
for (auto connection : connections_) {
|
||||
for (auto const& connection : connections_) {
|
||||
if (!connection->isTransactionOngoing()) {
|
||||
return(connection);
|
||||
return (connection);
|
||||
}
|
||||
}
|
||||
|
||||
return(ConnectionPtr());
|
||||
return (ConnectionPtr());
|
||||
}
|
||||
|
||||
/// @brief Find a connection by its socket descriptor.
|
||||
@ -844,18 +819,18 @@ private:
|
||||
ConnectionPtr findBySocketFd(int socket_fd) {
|
||||
for (auto connection : connections_) {
|
||||
if (connection->isMySocket(socket_fd)) {
|
||||
return(connection);
|
||||
return (connection);
|
||||
}
|
||||
}
|
||||
|
||||
return(ConnectionPtr());
|
||||
return (ConnectionPtr());
|
||||
}
|
||||
|
||||
/// @brief Indicates if there are no connections in the list.
|
||||
///
|
||||
/// @return true if the list is empty.
|
||||
bool connectionsEmpty() {
|
||||
return(connections_.empty());
|
||||
return (connections_.empty());
|
||||
}
|
||||
|
||||
/// @brief Indicates if list contains the maximum number.
|
||||
@ -891,7 +866,6 @@ private:
|
||||
/// @return true if there are no requests queued.
|
||||
bool queueEmpty() const {
|
||||
return (queue_.empty());
|
||||
|
||||
}
|
||||
|
||||
/// @brief Adds a request to the end of the request queue.
|
||||
@ -901,9 +875,9 @@ private:
|
||||
queue_.push(desc);
|
||||
}
|
||||
|
||||
/// @brief Adds a request to the end of the request queue.
|
||||
/// @brief Removes a request from the front of the request queue.
|
||||
///
|
||||
/// @return desc RequestDescriptor to queue.
|
||||
/// @return desc RequestDescriptor of the removed request.
|
||||
RequestDescriptor popNextRequest() {
|
||||
if (queue_.empty()) {
|
||||
isc_throw(InvalidOperation, "cannot pop, queue is empty");
|
||||
@ -911,11 +885,11 @@ private:
|
||||
|
||||
RequestDescriptor desc = queue_.front();
|
||||
queue_.pop();
|
||||
return(desc);
|
||||
return (desc);
|
||||
}
|
||||
|
||||
private:
|
||||
/// @brief URL supported by the list.
|
||||
/// @brief URL supported by this destination.
|
||||
Url url_;
|
||||
|
||||
/// @brief Maximum number of concurrent connections for this destination.
|
||||
@ -940,12 +914,12 @@ private:
|
||||
DestinationPtr addDestination(const Url& url) {
|
||||
DestinationPtr destination(new Destination(url, max_url_connections_));
|
||||
destinations_[url] = destination;
|
||||
return(destination);
|
||||
return (destination);
|
||||
}
|
||||
|
||||
/// @brief Fetches a destination by URL
|
||||
///
|
||||
/// @param url Url of the destination desired.
|
||||
/// @param url URL of the destination desired.
|
||||
///
|
||||
/// @return pointer the desired destination, empty pointer
|
||||
/// if the destination does not exist.
|
||||
@ -960,10 +934,13 @@ private:
|
||||
|
||||
/// @brief Removes a destination by URL
|
||||
///
|
||||
/// @param url Url of the destination to be removed.
|
||||
/// Closes all of the destination's connections and
|
||||
/// discards all of its queued requests while removing
|
||||
/// the destination from the list of known destinations.
|
||||
///
|
||||
/// @param url URL of the destination to be removed.
|
||||
/// @note Must be called from within a thread-safe context.
|
||||
void removeDestination(const Url& url) {
|
||||
// Remove requests from the queue.
|
||||
auto it = destinations_.find(url);
|
||||
if (it != destinations_.end()) {
|
||||
it->second->closeAllConnections();
|
||||
@ -1631,20 +1608,19 @@ public:
|
||||
thread_pool_size_(thread_pool_size) {
|
||||
if (thread_pool_size_ > 0) {
|
||||
// Create our own private IOService.
|
||||
my_io_service_.reset(new IOService());
|
||||
thread_io_service_.reset(new IOService());
|
||||
|
||||
// Create a pool of threads, each calls run on the same, private
|
||||
// io_service instance
|
||||
for (std::size_t i = 0; i < thread_pool_size_; ++i)
|
||||
{
|
||||
for (std::size_t i = 0; i < thread_pool_size_; ++i) {
|
||||
boost::shared_ptr<std::thread> thread(new std::thread(std::bind(&IOService::run,
|
||||
my_io_service_)));
|
||||
thread_io_service_)));
|
||||
threads_.push_back(thread);
|
||||
}
|
||||
|
||||
// Create the connection pool. Note that we use the thread_pool_size
|
||||
// as the maximum connections per URL value.
|
||||
conn_pool_.reset(new ConnectionPool(*my_io_service_, thread_pool_size_));
|
||||
conn_pool_.reset(new ConnectionPool(*thread_io_service_, thread_pool_size_));
|
||||
|
||||
LOG_DEBUG(http_logger, isc::log::DBGLVL_TRACE_BASIC, HTTP_CLIENT_MT_STARTED)
|
||||
.arg(threads_.size());
|
||||
@ -1665,13 +1641,13 @@ public:
|
||||
/// @brief Close all connections, and if multi-threaded, stop internal IOService
|
||||
/// and the thread pool.
|
||||
void stop() {
|
||||
if (my_io_service_) {
|
||||
if (thread_io_service_) {
|
||||
// Stop the private IOService.
|
||||
my_io_service_->stop();
|
||||
thread_io_service_->stop();
|
||||
|
||||
// Shutdown the threads.
|
||||
for (std::size_t i = 0; i < threads_.size(); ++i) {
|
||||
threads_[i]->join();
|
||||
for(auto const& thread : threads_) {
|
||||
thread->join();
|
||||
}
|
||||
|
||||
threads_.clear();
|
||||
@ -1681,14 +1657,14 @@ public:
|
||||
conn_pool_->closeAll();
|
||||
|
||||
// Get rid of the IOService.
|
||||
my_io_service_.reset();
|
||||
thread_io_service_.reset();
|
||||
}
|
||||
|
||||
/// @brief Fetches the internal IOService used in multi-threaded mode.
|
||||
///
|
||||
/// @return A pointer to the IOService, or an empty pointer when
|
||||
/// in single-threaded mode.
|
||||
asiolink::IOServicePtr getMyIOService() { return(my_io_service_); };
|
||||
asiolink::IOServicePtr getThreadIOService() { return (thread_io_service_); };
|
||||
|
||||
/// @brief Fetches the maximum size of the thread pool.
|
||||
///
|
||||
@ -1716,7 +1692,7 @@ private:
|
||||
std::vector<boost::shared_ptr<std::thread> > threads_;
|
||||
|
||||
/// @brief Pointer to private IOService used in multi-threaded mode.
|
||||
asiolink::IOServicePtr my_io_service_;
|
||||
asiolink::IOServicePtr thread_io_service_;
|
||||
};
|
||||
|
||||
HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size) {
|
||||
@ -1732,7 +1708,6 @@ HttpClient::HttpClient(IOService& io_service, size_t thread_pool_size) {
|
||||
}
|
||||
|
||||
HttpClient::~HttpClient() {
|
||||
stop();
|
||||
}
|
||||
|
||||
void
|
||||
@ -1782,8 +1757,8 @@ HttpClient::stop() {
|
||||
}
|
||||
|
||||
const IOServicePtr
|
||||
HttpClient::getMyIOService() const {
|
||||
return (impl_->getMyIOService());
|
||||
HttpClient::getThreadIOService() const {
|
||||
return (impl_->getThreadIOService());
|
||||
}
|
||||
|
||||
uint16_t
|
||||
|
@ -136,7 +136,7 @@ public:
|
||||
///
|
||||
/// @param io_service IO service to be used by the HTTP client.
|
||||
/// @param thread_pool_size maximum number of threads in the thread pool.
|
||||
/// A value greater than zero enables multi-threaded mode as sets the
|
||||
/// A value greater than zero enables multi-threaded mode and sets the
|
||||
/// maximum number of concurrent connections per URL. A value of zero
|
||||
/// (default) enables single-threaded mode with one connection per URL.
|
||||
explicit HttpClient(asiolink::IOService& io_service, size_t thread_pool_size = 0);
|
||||
@ -148,7 +148,7 @@ public:
|
||||
///
|
||||
/// The client maintains an internal connection pool which manages lists
|
||||
/// of connections per URL. In single-threaded mode, each URL is limited
|
||||
/// to a single /connection. In multi-threaded mode, each URL may have
|
||||
/// to a single connection. In multi-threaded mode, each URL may have
|
||||
/// more than one open connection per URL, enabling the client to carry
|
||||
/// on multiple concurrent requests per URL.
|
||||
///
|
||||
@ -268,16 +268,16 @@ public:
|
||||
///
|
||||
/// @return pointer to the IOService instance, or an empty pointer
|
||||
/// in single-threaded mode.
|
||||
const asiolink::IOServicePtr getMyIOService() const;
|
||||
const asiolink::IOServicePtr getThreadIOService() const;
|
||||
|
||||
/// @brief Fetches the maximum size of the thread pool.
|
||||
///
|
||||
/// @return unit16_t containing the maximum size of the thread pool.
|
||||
/// @return maximum size of the thread pool.
|
||||
uint16_t getThreadPoolSize() const;
|
||||
|
||||
/// @brief Fetches the number of threads in the pool.
|
||||
///
|
||||
/// @return unit16_t containing the number of running threads.
|
||||
/// @return number of running threads.
|
||||
uint16_t getThreadCount() const;
|
||||
|
||||
private:
|
||||
|
@ -411,10 +411,10 @@ public:
|
||||
|
||||
if (num_threads_ == 0) {
|
||||
// If we single-threaded client should not have it's own IOService.
|
||||
ASSERT_FALSE(client_->getMyIOService());
|
||||
ASSERT_FALSE(client_->getThreadIOService());
|
||||
} else {
|
||||
// If we multi-threaded client should have it's own IOService.
|
||||
ASSERT_TRUE(client_->getMyIOService());
|
||||
ASSERT_TRUE(client_->getThreadIOService());
|
||||
}
|
||||
|
||||
// Verify the pool size and number of threads are as expected.
|
||||
@ -587,7 +587,7 @@ TEST_F(MtHttpClientTest, basics) {
|
||||
ASSERT_NO_THROW_LOG(client.reset(new HttpClient(io_service_, 0)));
|
||||
ASSERT_TRUE(client);
|
||||
|
||||
ASSERT_FALSE(client->getMyIOService());
|
||||
ASSERT_FALSE(client->getThreadIOService());
|
||||
ASSERT_EQ(client->getThreadPoolSize(), 0);
|
||||
ASSERT_EQ(client->getThreadCount(), 0);
|
||||
|
||||
@ -609,7 +609,7 @@ TEST_F(MtHttpClientTest, basics) {
|
||||
|
||||
// Verify that it has an internal IOService and that thread pool size
|
||||
// and thread count match.
|
||||
ASSERT_TRUE(client->getMyIOService());
|
||||
ASSERT_TRUE(client->getThreadIOService());
|
||||
ASSERT_EQ(client->getThreadPoolSize(), 3);
|
||||
ASSERT_EQ(client->getThreadCount(), 3);
|
||||
|
||||
@ -617,7 +617,7 @@ TEST_F(MtHttpClientTest, basics) {
|
||||
ASSERT_NO_THROW_LOG(client->stop());
|
||||
|
||||
// Verify we're stopped.
|
||||
ASSERT_FALSE(client->getMyIOService());
|
||||
ASSERT_FALSE(client->getThreadIOService());
|
||||
ASSERT_EQ(client->getThreadPoolSize(), 3);
|
||||
ASSERT_EQ(client->getThreadCount(), 0);
|
||||
|
||||
|
@ -83,6 +83,7 @@ public:
|
||||
/// @brief Returns textual representation of the URL.
|
||||
std::string toText() const;
|
||||
|
||||
/// @brief Returns the raw, unparsed url string.
|
||||
const std::string& rawUrl() const {
|
||||
return (url_);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user