From a725726782d3d2bcae7db41d6b358b4ec60e9ab0 Mon Sep 17 00:00:00 2001 From: Francis Dupont Date: Wed, 21 Apr 2021 10:53:29 +0200 Subject: [PATCH] [(no branch, rebasing 1798-remove-tls-stream-clear-operation)] [#1798] Tried to fix MT issues --- src/lib/http/client.cc | 86 +++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 26 deletions(-) diff --git a/src/lib/http/client.cc b/src/lib/http/client.cc index 4440a099a5..1eaada4cdd 100644 --- a/src/lib/http/client.cc +++ b/src/lib/http/client.cc @@ -152,14 +152,18 @@ public: /// @brief Checks if a transaction has been initiated over this connection. /// /// @return true if transaction has been initiated, false otherwise. - bool isTransactionOngoing() const; + bool isTransactionOngoing() const { + return (started_); + } /// @brief Checks if the socket has been closed. /// /// @return true if the socket has been closed. - bool isClosed() const; + bool isClosed() const { + return (closed_); + } - /// @brief Checks if the peer has closed the socket at its side. + /// @brief Checks if the peer has closed the idle socket at its side. /// /// If the socket is open but is not usable the peer has closed /// the socket at its side so we close it. @@ -224,6 +228,14 @@ private: /// Should be called in a thread safe context. void closeInternal(); + /// @brief Checks if the peer has closed the socket at its side. + /// + /// Should be called in a thread safe context. + /// + /// If the socket is open but is not usable the peer has closed + /// the socket at its side so we close it. + void isClosedByPeerInternal(); + /// @brief Checks and logs if premature transaction timeout is suspected. /// /// Should be called in a thread safe context. @@ -465,7 +477,7 @@ public: /// @param max_url_connections maximum number of concurrent /// connections allowed per URL. explicit ConnectionPool(IOService& io_service, size_t max_url_connections) - : io_service_(io_service), destinations_(), mutex_(), + : io_service_(io_service), destinations_(), pool_mutex_(), max_url_connections_(max_url_connections) { } @@ -483,7 +495,7 @@ public: /// should be processed. void processNextRequest(const Url& url, const TlsContextPtr& tls_context) { if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); + std::lock_guard lk(pool_mutex_); return (processNextRequestInternal(url, tls_context)); } else { return (processNextRequestInternal(url, tls_context)); @@ -531,7 +543,7 @@ public: const HttpClient::HandshakeHandler& handshake_callback, const HttpClient::CloseHandler& close_callback) { if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); + std::lock_guard lk(pool_mutex_); return (queueRequestInternal(url, tls_context, request, response, request_timeout, request_callback, connect_callback, handshake_callback, @@ -548,7 +560,7 @@ public: /// the connection pool. void closeAll() { if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); + std::lock_guard lk(pool_mutex_); closeAllInternal(); } else { closeAllInternal(); @@ -569,7 +581,7 @@ public: /// @param socket_fd socket descriptor to check void closeIfOutOfBand(int socket_fd) { if (MultiThreadingMgr::instance().getMode()) { - std::lock_guard lk(mutex_); + std::lock_guard lk(pool_mutex_); closeIfOutOfBandInternal(socket_fd); } else { closeIfOutOfBandInternal(socket_fd); @@ -841,6 +853,11 @@ private: /// @brief Closes all connections and clears the list. /// @note This should be called in a thread safe context. void closeAllConnections() { + // Flush the queue. + while (!queue_.empty()) { + queue_.pop(); + } + for (auto const& connection : connections_) { connection->close(); } @@ -866,14 +883,20 @@ private: /// @brief Finds the first idle connection. /// /// Iterates over the existing connections and returns the - /// first connection which is not currently in a transaction. + /// first connection which is not currently in a transaction and + /// is not closed. + /// + /// @note @ref garbageCollectConnections should be called before + /// so the closed connections are not scanned (but as a connection + /// can have been closed since the garbage collection the flag + /// still has to be checked for). /// - /// @note @ref garbageCollectConnections should be called before. /// @return The first idle connection or an empty pointer if - /// all connections are busy. + /// all connections are busy or closed. ConnectionPtr getIdleConnection() { for (auto const& connection : connections_) { - if (!connection->isTransactionOngoing()) { + if (!connection->isTransactionOngoing() && + !connection->isClosed()) { return (connection); } } @@ -1013,6 +1036,8 @@ private: /// discards all of its queued requests while removing /// the destination from the list of known destinations. /// + /// @todo not used: remove it? + /// /// @param url URL of the destination to be removed. /// @param tls_context TLS context for the destination to be removed. /// @note Must be called from within a thread-safe context. @@ -1033,7 +1058,7 @@ private: std::map destinations_; /// @brief Mutex to protect the internal state. - std::mutex mutex_; + std::mutex pool_mutex_; /// @brief Maximum number of connections per URL and TLS context. size_t max_url_connections_; @@ -1095,6 +1120,21 @@ Connection::closeCallback(const bool clear) { void Connection::isClosedByPeer() { + // This method applies only to idle connections. + if (started_ || closed_) { + return; + } + // This code was guarded by a lock so keep this. + if (MultiThreadingMgr::instance().getMode()) { + std::lock_guard lk(mutex_); + isClosedByPeerInternal(); + } else { + isClosedByPeerInternal(); + } +} + +void +Connection::isClosedByPeerInternal() { // If the socket is open we check if it is possible to transmit // the data over this socket by reading from it with message // peeking. If the socket is not usable, we close it and then @@ -1233,16 +1273,6 @@ Connection::closeInternal() { resetState(); } -bool -Connection::isTransactionOngoing() const { - return (started_); -} - -bool -Connection::isClosed() const { - return (closed_); -} - bool Connection::isMySocket(int socket_fd) const { if (tcp_socket_) { @@ -1728,7 +1758,14 @@ public: /// @brief Close all connections, and if multi-threaded, stop internal IOService /// and the thread pool. void stop() { + // Close all the connections. + conn_pool_->closeAll(); + + // Stop the multi-threaded service. if (thread_io_service_) { + // Flush cancelled (and ready) handlers. + thread_io_service_->poll(); + // Stop the private IOService. thread_io_service_->stop(); @@ -1740,9 +1777,6 @@ public: threads_.clear(); } - // Close all the connections. - conn_pool_->closeAll(); - // Get rid of the IOService. thread_io_service_.reset(); }