From 5ab2c0ebb386f94384c1d16b44d61f71276fe59a Mon Sep 17 00:00:00 2001 From: Artem Boldariev Date: Fri, 14 Oct 2022 20:45:40 +0300 Subject: [PATCH] Synchronise stop listening operation for multi-layer transports This commit introduces a primitive isc__nmsocket_stop() which performs shutting down on a multilayered socket ensuring the proper order of the operations. The shared data within the socket object can be destroyed after the call completed, as it is guaranteed to not be used from within the context of other worker threads. --- lib/isc/netmgr/http.c | 18 +++------- lib/isc/netmgr/netmgr-int.h | 28 ++++++++++++++++ lib/isc/netmgr/netmgr.c | 66 +++++++++++++++++++++++++++++++++++++ lib/isc/netmgr/tcp.c | 2 +- lib/isc/netmgr/tcpdns.c | 2 +- lib/isc/netmgr/tlsdns.c | 2 +- lib/isc/netmgr/tlsstream.c | 26 +++++++-------- lib/isc/netmgr/udp.c | 2 +- 8 files changed, 114 insertions(+), 32 deletions(-) diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 2a8677126e..3576bc6279 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -2536,6 +2536,9 @@ isc_nm_listenhttp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->nchildren = sock->outer->nchildren; sock->fd = (uv_os_sock_t)-1; + isc__nmsocket_barrier_init(sock); + atomic_init(&sock->rchildren, sock->nchildren); + atomic_store(&sock->listening, true); *sockp = sock; return (ISC_R_SUCCESS); @@ -2702,20 +2705,7 @@ isc__nm_http_stoplistening(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_httplistener); REQUIRE(isc_tid() == sock->tid); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) { - UNREACHABLE(); - } - - atomic_store(&sock->listening, false); - atomic_store(&sock->closed, true); - sock->recv_cb = NULL; - sock->recv_cbarg = NULL; - - if (sock->outer != NULL) { - isc_nm_stoplistening(sock->outer); - isc_nmsocket_close(&sock->outer); - } + isc__nmsocket_stop(sock); } static void diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index aa23797c49..da6d28836d 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -286,6 +286,7 @@ typedef enum isc__netievent_type { netievent_sendcb, netievent_settlsctx, + netievent_sockstop, /* for multilayer sockets */ netievent_udplisten, netievent_udpstop, @@ -1060,6 +1061,7 @@ struct isc_nmsocket { atomic_int_fast32_t active_child_connections; + bool barrier_initialised; #ifdef NETMGR_TRACE void *backtrace[TRACE_SIZE]; int backtrace_size; @@ -1678,6 +1680,9 @@ isc__nm_http_set_max_streams(isc_nmsocket_t *listener, void isc__nm_async_settlsctx(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_sockstop(isc__networker_t *worker, isc__netievent_t *ev0); + void isc__nm_incstats(isc_nmsocket_t *sock, isc__nm_statid_t id); /*%< @@ -1764,6 +1769,27 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle); * Sets the pre-configured network buffers size on the handle. */ +void +isc__nmsocket_barrier_init(isc_nmsocket_t *listener); +/*%> + * Initialise the socket synchronisation barrier according to the + * number of children. + */ + +void +isc__nmsocket_stop(isc_nmsocket_t *listener); +/*%> + * Broadcast "stop" event for a listener socket across all workers and + * wait its processing completion - then, stop and close the underlying + * transport listener socket. + * + * The primitive is used in multi-layer transport listener sockets to + * implement shutdown properly: after the broadcasted events has been + * processed it is safe to destroy the shared data within the listener + * socket (including shutting down the underlying transport listener + * socket). + */ + /* * typedef all the netievent types */ @@ -1815,6 +1841,7 @@ NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel); NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept); NETIEVENT_SOCKET_TLSCTX_TYPE(settlsctx); +NETIEVENT_SOCKET_TYPE(sockstop); /* Now declared the helper functions */ @@ -1864,6 +1891,7 @@ NETIEVENT_SOCKET_DECL(detach); NETIEVENT_SOCKET_QUOTA_DECL(tcpaccept); NETIEVENT_SOCKET_TLSCTX_DECL(settlsctx); +NETIEVENT_SOCKET_DECL(sockstop); void isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index e33dbbc4ca..c4f1d9d3ad 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -483,6 +483,7 @@ process_netievent(void *arg) { NETIEVENT_CASE(httpendpoints); NETIEVENT_CASE(settlsctx); #endif + NETIEVENT_CASE(sockstop); NETIEVENT_CASE(connectcb); NETIEVENT_CASE(readcb); @@ -558,6 +559,7 @@ NETIEVENT_SOCKET_DEF(detach); NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept); NETIEVENT_SOCKET_TLSCTX_DEF(settlsctx); +NETIEVENT_SOCKET_DEF(sockstop); void isc__nm_process_ievent(isc__networker_t *worker, isc__netievent_t *event) { @@ -715,6 +717,10 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) { isc__nm_http_cleanup_data(sock); #endif + if (sock->barrier_initialised) { + isc_barrier_destroy(&sock->barrier); + } + sock->magic = 0; #ifdef NETMGR_TRACE @@ -2103,6 +2109,66 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) { } } +void +isc__nmsocket_stop(isc_nmsocket_t *listener) { + isc__netievent_sockstop_t ievent = { .sock = listener }; + + REQUIRE(VALID_NMSOCK(listener)); + REQUIRE(listener->tid == isc_tid()); + REQUIRE(listener->tid == 0); + + if (!atomic_compare_exchange_strong(&listener->closing, + &(bool){ false }, true)) { + UNREACHABLE(); + } + + for (size_t i = 1; i < listener->nchildren; i++) { + isc__networker_t *worker = + &listener->worker->netmgr->workers[i]; + isc__netievent_sockstop_t *ev = + isc__nm_get_netievent_sockstop(worker, listener); + isc__nm_enqueue_ievent(worker, (isc__netievent_t *)ev); + } + + isc__nm_async_sockstop(listener->worker, (isc__netievent_t *)&ievent); + INSIST(atomic_load(&listener->rchildren) == 0); + + if (!atomic_compare_exchange_strong(&listener->listening, + &(bool){ true }, false)) + { + UNREACHABLE(); + } + + listener->accept_cb = NULL; + listener->accept_cbarg = NULL; + listener->recv_cb = NULL; + listener->recv_cbarg = NULL; + + if (listener->outer != NULL) { + isc_nm_stoplistening(listener->outer); + isc__nmsocket_detach(&listener->outer); + } + + atomic_store(&listener->closed, true); +} + +void +isc__nmsocket_barrier_init(isc_nmsocket_t *listener) { + REQUIRE(listener->nchildren > 0); + isc_barrier_init(&listener->barrier, listener->nchildren); + listener->barrier_initialised = true; +} + +void +isc__nm_async_sockstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_sockstop_t *ievent = (isc__netievent_sockstop_t *)ev0; + isc_nmsocket_t *listener = ievent->sock; + UNUSED(worker); + + (void)atomic_fetch_sub(&listener->rchildren, 1); + isc_barrier_wait(&listener->barrier); +} + void isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, isc_result_t eresult, bool async) { diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 9525b23f2b..0283596e01 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -408,7 +408,7 @@ isc_nm_listentcp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->children = isc_mem_getx(worker->mctx, children_size, ISC_MEM_ZERO); - isc_barrier_init(&sock->barrier, sock->nchildren); + isc__nmsocket_barrier_init(sock); sock->accept_cb = accept_cb; sock->accept_cbarg = accept_cbarg; diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index fa42f8e233..069b5e748d 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -398,7 +398,7 @@ isc_nm_listentcpdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->children = isc_mem_getx(worker->mctx, children_size, ISC_MEM_ZERO); - isc_barrier_init(&sock->barrier, sock->nchildren); + isc__nmsocket_barrier_init(sock); sock->accept_cb = accept_cb; sock->accept_cbarg = accept_cbarg; diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index 7ec144941e..e5737c253d 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -511,7 +511,7 @@ isc_nm_listentlsdns(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->children = isc_mem_getx(worker->mctx, children_size, ISC_MEM_ZERO); - isc_barrier_init(&sock->barrier, sock->nchildren); + isc__nmsocket_barrier_init(sock); sock->accept_cb = accept_cb; sock->accept_cbarg = accept_cbarg; diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index bd562f3313..d99e842ca3 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -675,6 +675,13 @@ tlslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { REQUIRE(VALID_NMSOCK(tlslistensock)); REQUIRE(tlslistensock->type == isc_nm_tlslistener); + if (isc__nmsocket_closing(handle->sock) || + isc__nmsocket_closing(tlslistensock) || + !atomic_load(&tlslistensock->listening)) + { + return (ISC_R_CANCELED); + } + /* * We need to create a 'wrapper' tlssocket for this connection. */ @@ -763,6 +770,10 @@ isc_nm_listentls(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener); isc__nmsocket_detach(&tsock); INSIST(result != ISC_R_UNSET); + tlssock->nchildren = tlssock->outer->nchildren; + + isc__nmsocket_barrier_init(tlssock); + atomic_init(&tlssock->rchildren, tlssock->nchildren); if (result == ISC_R_SUCCESS) { atomic_store(&tlssock->listening, true); @@ -933,20 +944,7 @@ isc__nm_tls_stoplistening(isc_nmsocket_t *sock) { REQUIRE(sock->tlsstream.tls == NULL); REQUIRE(sock->tlsstream.ctx == NULL); - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, - true)) { - UNREACHABLE(); - } - - atomic_store(&sock->listening, false); - atomic_store(&sock->closed, true); - sock->recv_cb = NULL; - sock->recv_cbarg = NULL; - - if (sock->outer != NULL) { - isc_nm_stoplistening(sock->outer); - isc__nmsocket_detach(&sock->outer); - } + isc__nmsocket_stop(sock); } static void diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index cda3d9d98c..3900372173 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -155,7 +155,7 @@ isc_nm_listenudp(isc_nm_t *mgr, uint32_t workers, isc_sockaddr_t *iface, sock->children = isc_mem_getx(worker->mctx, children_size, ISC_MEM_ZERO); - isc_barrier_init(&sock->barrier, sock->nchildren); + isc__nmsocket_barrier_init(sock); sock->recv_cb = cb; sock->recv_cbarg = cbarg;