diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 96124a2d89..c5368309f4 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -11,6 +11,7 @@ libisc_la_HEADERS = \ include/isc/atomic.h \ include/isc/attributes.h \ include/isc/backtrace.h \ + include/isc/barrier.h \ include/isc/base32.h \ include/isc/base64.h \ include/isc/bind9.h \ diff --git a/lib/isc/include/isc/barrier.h b/lib/isc/include/isc/barrier.h new file mode 100644 index 0000000000..48353c69a8 --- /dev/null +++ b/lib/isc/include/isc/barrier.h @@ -0,0 +1,37 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#pragma once + +#include + +#if __SANITIZE_THREAD__ && !defined(WIN32) + +#include + +#define isc_barrier_t pthread_barrier_t + +#define isc_barrier_init(barrier, count) \ + pthread_barrier_init(barrier, NULL, count) +#define isc_barrier_destroy(barrier) pthread_barrier_destroy(barrier) +#define isc_barrier_wait(barrier) pthread_barrier_wait(barrier) + +#else + +#include + +#define isc_barrier_t uv_barrier_t + +#define isc_barrier_init(barrier, count) uv_barrier_init(barrier, count) +#define isc_barrier_destroy(barrier) uv_barrier_destroy(barrier) +#define isc_barrier_wait(barrier) uv_barrier_wait(barrier) + +#endif /* __SANITIZE_THREAD__ */ diff --git a/lib/isc/netmgr/http.c b/lib/isc/netmgr/http.c index 021feaa3c3..e64a018e5d 100644 --- a/lib/isc/netmgr/http.c +++ b/lib/isc/netmgr/http.c @@ -1199,7 +1199,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, sock->extrahandlesize = extrahandlesize; sock->connect_timeout = timeout; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; sock->connect_cb = cb; sock->connect_cbarg = cbarg; atomic_init(&sock->client, true); @@ -2170,7 +2170,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, isc_nmiface_t *iface, int backlog, isc__nmsocket_attach(sock, &sock->outer->h2.httpserver); sock->nchildren = sock->outer->nchildren; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; sock->tid = isc_random_uniform(sock->nchildren); sock->fd = (uv_os_sock_t)-1; diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 6db3f93d05..a126a4e403 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -174,7 +175,6 @@ typedef struct isc__networker { uv_async_t async; /* async channel to send * data to this networker */ isc_mutex_t lock; - isc_condition_t cond; bool paused; bool finished; isc_thread_t thread; @@ -185,6 +185,8 @@ typedef struct isc__networker { * used for listening etc. * can be processed while * worker is paused */ + isc_condition_t cond_prio; + isc_refcount_t references; atomic_int_fast64_t pktcount; char *recvbuf; @@ -671,7 +673,7 @@ struct isc_nm { isc_mutex_t evlock; uint_fast32_t workers_running; - uint_fast32_t workers_paused; + atomic_uint_fast32_t workers_paused; atomic_uint_fast32_t maxudp; atomic_bool paused; @@ -702,6 +704,9 @@ struct isc_nm { atomic_uint_fast32_t keepalive; atomic_uint_fast32_t advertised; + isc_barrier_t pausing; + isc_barrier_t resuming; + #ifdef NETMGR_TRACE ISC_LIST(isc_nmsocket_t) active_sockets; #endif @@ -836,6 +841,9 @@ struct isc_nmsocket { /*% Self socket */ isc_nmsocket_t *self; + isc_barrier_t startlistening; + isc_barrier_t stoplistening; + /*% TLS stuff */ struct tls { isc_tls_t *tls; @@ -930,7 +938,7 @@ struct isc_nmsocket { /* Atomic */ /*% Number of running (e.g. listening) child sockets */ - uint_fast32_t rchildren; + atomic_uint_fast32_t rchildren; /*% * Socket is active if it's listening, working, etc. If it's diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 56d4630266..b795dd5aba 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -135,8 +136,12 @@ nm_thread(isc_threadarg_t worker0); static void async_cb(uv_async_t *handle); static bool +process_netievent(isc__networker_t *worker, isc__netievent_t *ievent); +static bool process_queue(isc__networker_t *worker, isc_queue_t *queue, unsigned int *quantump); +static void +wait_for_priority_queue(isc__networker_t *worker); static bool process_priority_queue(isc__networker_t *worker, unsigned int *quantump); static bool @@ -146,6 +151,15 @@ process_task_queue(isc__networker_t *worker, unsigned int *quantump); static bool process_normal_queue(isc__networker_t *worker, unsigned int *quantump); +#define drain_priority_queue(worker) \ + (void)process_priority_queue(worker, &(unsigned int){ UINT_MAX }) +#define drain_privilege_queue(worker) \ + (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX }) +#define drain_task_queue(worker) \ + (void)process_task_queue(worker, &(unsigned int){ UINT_MAX }) +#define drain_normal_queue(worker) \ + (void)process_normal_queue(worker, &(unsigned int){ UINT_MAX }) + static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); static void @@ -229,6 +243,7 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_refcount_init(&mgr->references, 1); atomic_init(&mgr->maxudp, 0); atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED); + atomic_init(&mgr->workers_paused, 0); #ifdef NETMGR_TRACE ISC_LIST_INIT(mgr->active_sockets); @@ -258,6 +273,9 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { isc_mempool_associatelock(mgr->evpool, &mgr->evlock); isc_mempool_setfillcount(mgr->evpool, 32); + isc_barrier_init(&mgr->pausing, workers); + isc_barrier_init(&mgr->resuming, workers); + mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t)); for (size_t i = 0; i < workers; i++) { int r; @@ -277,12 +295,13 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) { RUNTIME_CHECK(r == 0); isc_mutex_init(&worker->lock); - isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); worker->ievents_task = isc_queue_new(mgr->mctx, 128); worker->ievents_priv = isc_queue_new(mgr->mctx, 128); worker->ievents_prio = isc_queue_new(mgr->mctx, 128); + isc_condition_init(&worker->cond_prio); + worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -351,6 +370,7 @@ nm_destroy(isc_nm_t **mgr0) { { isc_mempool_put(mgr->evpool, ievent); } + isc_condition_destroy(&worker->cond_prio); r = uv_loop_close(&worker->loop); INSIST(r == 0); @@ -360,7 +380,6 @@ nm_destroy(isc_nm_t **mgr0) { isc_queue_destroy(worker->ievents_task); isc_queue_destroy(worker->ievents_prio); isc_mutex_destroy(&worker->lock); - isc_condition_destroy(&worker->cond); isc_mem_put(mgr->mctx, worker->sendbuf, ISC_NETMGR_SENDBUF_SIZE); @@ -373,6 +392,9 @@ nm_destroy(isc_nm_t **mgr0) { isc_stats_detach(&mgr->stats); } + isc_barrier_destroy(&mgr->resuming); + isc_barrier_destroy(&mgr->pausing); + isc_condition_destroy(&mgr->wkstatecond); isc_condition_destroy(&mgr->wkpausecond); isc_mutex_destroy(&mgr->lock); @@ -413,13 +435,18 @@ isc_nm_pause(isc_nm_t *mgr) { } } + if (isc__nm_in_netthread()) { + isc_barrier_wait(&mgr->pausing); + } + LOCK(&mgr->lock); - while (mgr->workers_paused != pausing) { + while (atomic_load(&mgr->workers_paused) != pausing) { WAIT(&mgr->wkstatecond, &mgr->lock); } + UNLOCK(&mgr->lock); + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false }, true)); - UNLOCK(&mgr->lock); } void @@ -439,14 +466,19 @@ isc_nm_resume(isc_nm_t *mgr) { } } + if (isc__nm_in_netthread()) { + isc_barrier_wait(&mgr->resuming); + } + LOCK(&mgr->lock); - while (mgr->workers_paused != 0) { + while (atomic_load(&mgr->workers_paused) != 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } + UNLOCK(&mgr->lock); + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true }, false)); - BROADCAST(&mgr->wkpausecond); - UNLOCK(&mgr->lock); + isc__nm_drop_interlocked(mgr); } @@ -617,45 +649,29 @@ nm_thread(isc_threadarg_t worker0) { if (worker->paused) { INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid()); - /* - * We need to lock the worker first; otherwise - * isc_nm_resume() might slip in before WAIT() in - * the while loop starts, then the signal never - * gets delivered and we are stuck forever in the - * paused loop. - */ - LOCK(&worker->lock); - LOCK(&mgr->lock); - mgr->workers_paused++; - SIGNAL(&mgr->wkstatecond); - UNLOCK(&mgr->lock); - - while (worker->paused) { - WAIT(&worker->cond, &worker->lock); - UNLOCK(&worker->lock); - (void)process_priority_queue( - worker, &(unsigned int){ UINT_MAX }); - LOCK(&worker->lock); + atomic_fetch_add(&mgr->workers_paused, 1); + if (isc_barrier_wait(&mgr->pausing) != 0) { + LOCK(&mgr->lock); + SIGNAL(&mgr->wkstatecond); + UNLOCK(&mgr->lock); } - LOCK(&mgr->lock); - mgr->workers_paused--; - SIGNAL(&mgr->wkstatecond); - UNLOCK(&mgr->lock); - UNLOCK(&worker->lock); + while (worker->paused) { + wait_for_priority_queue(worker); + } /* * All workers must drain the privileged event * queue before we resume from pause. */ - (void)process_privilege_queue( - worker, &(unsigned int){ UINT_MAX }); + drain_privilege_queue(worker); - LOCK(&mgr->lock); - while (atomic_load(&mgr->paused)) { - WAIT(&mgr->wkpausecond, &mgr->lock); + atomic_fetch_sub(&mgr->workers_paused, 1); + if (isc_barrier_wait(&mgr->resuming) != 0) { + LOCK(&mgr->lock); + SIGNAL(&mgr->wkstatecond); + UNLOCK(&mgr->lock); } - UNLOCK(&mgr->lock); } if (r == 0) { @@ -671,8 +687,8 @@ nm_thread(isc_threadarg_t worker0) { * (they may include shutdown events) but do not process * the netmgr event queue. */ - (void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX }); - (void)process_task_queue(worker, &(unsigned int){ UINT_MAX }); + drain_privilege_queue(worker); + drain_task_queue(worker); LOCK(&mgr->lock); mgr->workers_running--; @@ -792,6 +808,34 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { } } +static void +wait_for_priority_queue(isc__networker_t *worker) { + isc_queue_t *queue = worker->ievents_prio; + isc_condition_t *cond = &worker->cond_prio; + bool wait_for_work = true; + + while (true) { + isc__netievent_t *ievent; + LOCK(&worker->lock); + ievent = (isc__netievent_t *)isc_queue_dequeue(queue); + if (wait_for_work) { + while (ievent == NULL) { + WAIT(cond, &worker->lock); + ievent = (isc__netievent_t *)isc_queue_dequeue( + queue); + } + } + UNLOCK(&worker->lock); + wait_for_work = false; + + if (ievent == NULL) { + return; + } + + (void)process_netievent(worker, ievent); + } +} + static bool process_priority_queue(isc__networker_t *worker, unsigned int *quantump) { return (process_queue(worker, worker->ievents_prio, quantump)); @@ -917,13 +961,13 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue, isc__netievent_t *ievent = (isc__netievent_t *)isc_queue_dequeue(queue); - (*quantump)--; - if (ievent == NULL) { /* We fully drained this queue */ return (true); } + (*quantump)--; + if (!process_netievent(worker, ievent)) { /* Netievent told us to stop */ return (false); @@ -1034,7 +1078,7 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { */ LOCK(&worker->lock); isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); - SIGNAL(&worker->cond); + SIGNAL(&worker->cond_prio); UNLOCK(&worker->lock); } else if (event->type == netievent_privilegedtask) { isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event); @@ -1122,7 +1166,14 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) { } /* - * This was a parent socket; free the children. + * This was a parent socket: destroy the listening + * barriers that synchronized the children. + */ + isc_barrier_destroy(&sock->startlistening); + isc_barrier_destroy(&sock->stoplistening); + + /* + * Now free them. */ isc_mem_put(sock->mgr->mctx, sock->children, sock->nchildren * sizeof(*sock)); @@ -1169,7 +1220,6 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) { isc_mem_free(sock->mgr->mctx, sock->ah_frees); isc_mem_free(sock->mgr->mctx, sock->ah_handles); isc_mutex_destroy(&sock->lock); - isc_condition_destroy(&sock->cond); isc_condition_destroy(&sock->scond); isc__nm_tls_cleanup_data(sock); isc__nm_http_cleanup_data(sock); @@ -1416,7 +1466,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, } isc_mutex_init(&sock->lock); - isc_condition_init(&sock->cond); isc_condition_init(&sock->scond); isc_refcount_init(&sock->references, 1); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 60cf9222c5..052030bf79 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -116,7 +117,7 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; int r; REQUIRE(VALID_NMSOCK(sock)); @@ -302,7 +303,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, sock->extrahandlesize = extrahandlesize; sock->connect_timeout = timeout; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; sock->fd = (uv_os_sock_t)-1; atomic_init(&sock->client, true); @@ -344,7 +345,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, (isc__netievent_t *)ievent); } LOCK(&sock->lock); - while (sock->result == ISC_R_DEFAULT) { + while (sock->result == ISC_R_UNSET) { WAIT(&sock->cond, &sock->lock); } atomic_store(&sock->active, true); @@ -375,6 +376,39 @@ isc__nm_tcp_lb_socket(sa_family_t sa_family) { return (sock); } +static void +start_tcp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock, + uv_os_sock_t fd, int tid) { + isc__netievent_tcplisten_t *ievent = NULL; + isc_nmsocket_t *csock = &sock->children[tid]; + + isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface); + csock->parent = sock; + csock->accept_cb = sock->accept_cb; + csock->accept_cbarg = sock->accept_cbarg; + csock->extrahandlesize = sock->extrahandlesize; + csock->backlog = sock->backlog; + csock->tid = tid; + /* + * We don't attach to quota, just assign - to avoid + * increasing quota unnecessarily. + */ + csock->pquota = sock->pquota; + isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); + +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + UNUSED(fd); + csock->fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family); +#else + csock->fd = dup(fd); +#endif + REQUIRE(csock->fd >= 0); + + ievent = isc__nm_get_netievent_tcplisten(mgr, csock); + isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], + (isc__netievent_t *)ievent); +} + isc_result_t isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_accept_cb_t accept_cb, void *accept_cbarg, @@ -382,18 +416,15 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t **sockp) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; - sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; -#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) uv_os_sock_t fd = -1; -#endif REQUIRE(VALID_NM(mgr)); sock = isc_mem_get(mgr->mctx, sizeof(*sock)); isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface); - sock->rchildren = 0; + atomic_init(&sock->rchildren, 0); #if defined(WIN32) sock->nchildren = 1; #else @@ -403,42 +434,36 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); - sock->result = ISC_R_DEFAULT; - sock->tid = isc_random_uniform(sock->nchildren); + sock->result = ISC_R_UNSET; + + sock->accept_cb = accept_cb; + sock->accept_cbarg = accept_cbarg; + sock->extrahandlesize = extrahandlesize; + sock->backlog = backlog; + sock->pquota = quota; + + if (isc__nm_in_netthread()) { + sock->tid = isc_nm_tid(); + } else { + sock->tid = isc_random_uniform(sock->nchildren); + } sock->fd = -1; #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) - fd = isc__nm_tcp_lb_socket(sa_family); + fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family); #endif + isc_barrier_init(&sock->startlistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tcplisten_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + if ((int)i == isc_nm_tid()) { + continue; + } + start_tcp_child(mgr, iface, sock, fd, i); + } - isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface); - csock->parent = sock; - csock->accept_cb = accept_cb; - csock->accept_cbarg = accept_cbarg; - csock->extrahandlesize = extrahandlesize; - csock->backlog = backlog; - csock->tid = i; - /* - * We don't attach to quota, just assign - to avoid - * increasing quota unnecessarily. - */ - csock->pquota = quota; - isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); - -#if HAVE_SO_REUSEPORT_LB || defined(WIN32) - csock->fd = isc__nm_tcp_lb_socket(sa_family); -#else - csock->fd = dup(fd); -#endif - REQUIRE(csock->fd >= 0); - - ievent = isc__nm_get_netievent_tcplisten(mgr, csock); - isc__nm_maybe_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + if (isc__nm_in_netthread()) { + start_tcp_child(mgr, iface, sock, fd, isc_nm_tid()); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -446,21 +471,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, #endif LOCK(&sock->lock); - while (sock->rchildren != sock->nchildren) { + while (atomic_load(&sock->rchildren) != sock->nchildren) { WAIT(&sock->cond, &sock->lock); } result = sock->result; atomic_store(&sock->active, true); - BROADCAST(&sock->scond); UNLOCK(&sock->lock); - INSIST(result != ISC_R_DEFAULT); + + INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { - REQUIRE(sock->rchildren == sock->nchildren); + REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; } else { atomic_store(&sock->active, false); - isc__nm_tcp_stoplistening(sock); + isc_nm_stoplistening(sock); isc_nmsocket_close(&sock); } @@ -565,16 +590,14 @@ done: sock->pquota = NULL; } - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { + atomic_fetch_add(&sock->parent->rchildren, 1); + if (sock->parent->result == ISC_R_UNSET) { sock->parent->result = result; } SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); UNLOCK(&sock->parent->lock); + + isc_barrier_wait(&sock->parent->startlistening); } static void @@ -637,7 +660,15 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) { INSIST(0); ISC_UNREACHABLE(); } - enqueue_stoplistening(sock); + + if (!isc__nm_in_netthread()) { + enqueue_stoplistening(sock); + } else if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tcp_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } void @@ -655,7 +686,12 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - stop_tcp_parent(sock); + if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tcp_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } void @@ -1194,8 +1230,6 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tcp_child(isc_nmsocket_t *sock) { - bool last_child = false; - REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1206,33 +1240,40 @@ stop_tcp_child(isc_nmsocket_t *sock) { tcp_close_direct(sock); - LOCK(&sock->parent->lock); - sock->parent->rchildren -= 1; - last_child = (sock->parent->rchildren == 0); - UNLOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); - if (last_child) { - atomic_store(&sock->parent->closed, true); - isc__nmsocket_prep_destroy(sock->parent); - } + isc_barrier_wait(&sock->parent->stoplistening); } static void stop_tcp_parent(isc_nmsocket_t *sock) { + isc_nmsocket_t *csock = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcplistener); + isc_barrier_init(&sock->stoplistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpstop_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + csock = &sock->children[i]; REQUIRE(VALID_NMSOCK(csock)); - atomic_store(&csock->active, false); + if ((int)i == isc_nm_tid()) { + /* + * We need to schedule closing the other sockets first + */ + continue; + } - ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock); - isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], - (isc__netievent_t *)ievent); + atomic_store(&csock->active, false); + enqueue_stoplistening(csock); } + + csock = &sock->children[isc_nm_tid()]; + atomic_store(&csock->active, false); + stop_tcp_child(csock); + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 9d2601b906..142d8b30fa 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -85,7 +86,7 @@ stop_tcpdns_child(isc_nmsocket_t *sock); static isc_result_t tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; int r; REQUIRE(VALID_NMSOCK(sock)); @@ -269,7 +270,7 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, sock->extrahandlesize = extrahandlesize; sock->connect_timeout = timeout; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; atomic_init(&sock->client, true); req = isc__nm_uvreq_get(mgr, sock); @@ -311,7 +312,7 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, } LOCK(&sock->lock); - while (sock->result == ISC_R_DEFAULT) { + while (sock->result == ISC_R_UNSET) { WAIT(&sock->cond, &sock->lock); } atomic_store(&sock->active, true); @@ -342,6 +343,40 @@ isc__nm_tcpdns_lb_socket(sa_family_t sa_family) { return (sock); } +static void +start_tcpdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock, + uv_os_sock_t fd, int tid) { + isc__netievent_tcpdnslisten_t *ievent = NULL; + isc_nmsocket_t *csock = &sock->children[tid]; + + isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface); + csock->parent = sock; + csock->accept_cb = sock->accept_cb; + csock->accept_cbarg = sock->accept_cbarg; + csock->recv_cb = sock->recv_cb; + csock->recv_cbarg = sock->recv_cbarg; + csock->extrahandlesize = sock->extrahandlesize; + csock->backlog = sock->backlog; + csock->tid = tid; + /* + * We don't attach to quota, just assign - to avoid + * increasing quota unnecessarily. + */ + csock->pquota = sock->pquota; + isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); + +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + UNUSED(fd); + csock->fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family); +#else + csock->fd = dup(fd); +#endif + REQUIRE(csock->fd >= 0); + + ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock); + isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], + (isc__netievent_t *)ievent); +} isc_result_t isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t recv_cb, void *recv_cbarg, @@ -350,18 +385,15 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t **sockp) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; - sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; -#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) uv_os_sock_t fd = -1; -#endif REQUIRE(VALID_NM(mgr)); sock = isc_mem_get(mgr->mctx, sizeof(*sock)); isc__nmsocket_init(sock, mgr, isc_nm_tcpdnslistener, iface); - sock->rchildren = 0; + atomic_init(&sock->rchildren, 0); #if defined(WIN32) sock->nchildren = 1; #else @@ -371,44 +403,37 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); - sock->result = ISC_R_DEFAULT; - sock->tid = isc_random_uniform(sock->nchildren); + sock->result = ISC_R_UNSET; + sock->accept_cb = accept_cb; + sock->accept_cbarg = accept_cbarg; + sock->recv_cb = recv_cb; + sock->recv_cbarg = recv_cbarg; + sock->extrahandlesize = extrahandlesize; + sock->backlog = backlog; + sock->pquota = quota; + + if (isc__nm_in_netthread()) { + sock->tid = isc_nm_tid(); + } else { + sock->tid = isc_random_uniform(sock->nchildren); + } sock->fd = -1; #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) - fd = isc__nm_tcpdns_lb_socket(sa_family); + fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family); #endif + isc_barrier_init(&sock->startlistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpdnslisten_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + if ((int)i == isc_nm_tid()) { + continue; + } + start_tcpdns_child(mgr, iface, sock, fd, i); + } - isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface); - csock->parent = sock; - csock->accept_cb = accept_cb; - csock->accept_cbarg = accept_cbarg; - csock->recv_cb = recv_cb; - csock->recv_cbarg = recv_cbarg; - csock->extrahandlesize = extrahandlesize; - csock->backlog = backlog; - csock->tid = i; - /* - * We don't attach to quota, just assign - to avoid - * increasing quota unnecessarily. - */ - csock->pquota = quota; - isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); - -#if HAVE_SO_REUSEPORT_LB || defined(WIN32) - csock->fd = isc__nm_tcpdns_lb_socket(sa_family); -#else - csock->fd = dup(fd); -#endif - REQUIRE(csock->fd >= 0); - - ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock); - isc__nm_maybe_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + if (isc__nm_in_netthread()) { + start_tcpdns_child(mgr, iface, sock, fd, isc_nm_tid()); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -416,21 +441,21 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, #endif LOCK(&sock->lock); - while (sock->rchildren != sock->nchildren) { + while (atomic_load(&sock->rchildren) != sock->nchildren) { WAIT(&sock->cond, &sock->lock); } result = sock->result; atomic_store(&sock->active, true); - BROADCAST(&sock->scond); UNLOCK(&sock->lock); - INSIST(result != ISC_R_DEFAULT); + + INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { - REQUIRE(sock->rchildren == sock->nchildren); + REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; } else { atomic_store(&sock->active, false); - isc__nm_tcpdns_stoplistening(sock); + isc_nm_stoplistening(sock); isc_nmsocket_close(&sock); } @@ -446,7 +471,7 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { int r; int flags = 0; isc_nmsocket_t *sock = NULL; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -534,16 +559,14 @@ done: sock->pquota = NULL; } - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { + atomic_fetch_add(&sock->parent->rchildren, 1); + if (sock->parent->result == ISC_R_UNSET) { sock->parent->result = result; } SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); UNLOCK(&sock->parent->lock); + + isc_barrier_wait(&sock->parent->startlistening); } static void @@ -607,7 +630,15 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) { INSIST(0); ISC_UNREACHABLE(); } - enqueue_stoplistening(sock); + + if (!isc__nm_in_netthread()) { + enqueue_stoplistening(sock); + } else if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tcpdns_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } void @@ -626,7 +657,15 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - stop_tcpdns_parent(sock); + /* + * If network manager is paused, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tcpdns_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } void @@ -1224,8 +1263,6 @@ timer_close_cb(uv_handle_t *timer) { static void stop_tcpdns_child(isc_nmsocket_t *sock) { - bool last_child = false; - REQUIRE(sock->type == isc_nm_tcpdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1236,33 +1273,41 @@ stop_tcpdns_child(isc_nmsocket_t *sock) { tcpdns_close_direct(sock); - LOCK(&sock->parent->lock); - sock->parent->rchildren -= 1; - last_child = (sock->parent->rchildren == 0); - UNLOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); - if (last_child) { - atomic_store(&sock->parent->closed, true); - isc__nmsocket_prep_destroy(sock->parent); - } + isc_barrier_wait(&sock->parent->stoplistening); } static void stop_tcpdns_parent(isc_nmsocket_t *sock) { + isc_nmsocket_t *csock = NULL; + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpdnslistener); + isc_barrier_init(&sock->stoplistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpdnsstop_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + csock = &sock->children[i]; REQUIRE(VALID_NMSOCK(csock)); - atomic_store(&csock->active, false); + if ((int)i == isc_nm_tid()) { + /* + * We need to schedule closing the other sockets first + */ + continue; + } - ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock); - isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], - (isc__netievent_t *)ievent); + atomic_store(&csock->active, false); + enqueue_stoplistening(csock); } + + csock = &sock->children[isc_nm_tid()]; + atomic_store(&csock->active, false); + stop_tcpdns_child(csock); + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index bb7ea76a3c..3f68c7f513 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -97,7 +98,7 @@ can_log_tlsdns_quota(void) { static isc_result_t tlsdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; int r; REQUIRE(VALID_NMSOCK(sock)); @@ -324,7 +325,7 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, sock->extrahandlesize = extrahandlesize; sock->connect_timeout = timeout; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; sock->tls.ctx = sslctx; atomic_init(&sock->client, true); atomic_init(&sock->connecting, true); @@ -364,7 +365,7 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, (isc__netievent_t *)ievent); } LOCK(&sock->lock); - while (sock->result == ISC_R_DEFAULT) { + while (sock->result == ISC_R_UNSET) { WAIT(&sock->cond, &sock->lock); } atomic_store(&sock->active, true); @@ -407,6 +408,43 @@ isc__nm_tlsdns_lb_socket(sa_family_t sa_family) { return (sock); } +static void +start_tlsdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock, + uv_os_sock_t fd, int tid) { + isc__netievent_tlsdnslisten_t *ievent = NULL; + isc_nmsocket_t *csock = &sock->children[tid]; + + isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface); + csock->parent = sock; + csock->accept_cb = sock->accept_cb; + csock->accept_cbarg = sock->accept_cbarg; + csock->recv_cb = sock->recv_cb; + csock->recv_cbarg = sock->recv_cbarg; + csock->extrahandlesize = sock->extrahandlesize; + csock->backlog = sock->backlog; + csock->tid = tid; + csock->tls.ctx = sock->tls.ctx; + + /* + * We don't attach to quota, just assign - to avoid + * increasing quota unnecessarily. + */ + csock->pquota = sock->pquota; + isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); + +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + UNUSED(fd); + csock->fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family); +#else + csock->fd = dup(fd); +#endif + REQUIRE(csock->fd >= 0); + + ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock); + isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], + (isc__netievent_t *)ievent); +} + isc_result_t isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t recv_cb, void *recv_cbarg, @@ -415,18 +453,15 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_tlsctx_t *sslctx, isc_nmsocket_t **sockp) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; - sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; -#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) uv_os_sock_t fd = -1; -#endif REQUIRE(VALID_NM(mgr)); sock = isc_mem_get(mgr->mctx, sizeof(*sock)); isc__nmsocket_init(sock, mgr, isc_nm_tlsdnslistener, iface); - sock->rchildren = 0; + atomic_init(&sock->rchildren, 0); #if defined(WIN32) sock->nchildren = 1; #else @@ -436,47 +471,39 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); - sock->result = ISC_R_DEFAULT; - sock->tid = isc_random_uniform(sock->nchildren); - sock->fd = -1; + sock->result = ISC_R_UNSET; + sock->accept_cb = accept_cb; + sock->accept_cbarg = accept_cbarg; + sock->recv_cb = recv_cb; + sock->recv_cbarg = recv_cbarg; + sock->extrahandlesize = extrahandlesize; + sock->backlog = backlog; + sock->pquota = quota; + + if (isc__nm_in_netthread()) { + sock->tid = isc_nm_tid(); + } else { + sock->tid = isc_random_uniform(sock->nchildren); + } + sock->tls.ctx = sslctx; + sock->fd = -1; #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) - fd = isc__nm_tlsdns_lb_socket(sa_family); + fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family); #endif + isc_barrier_init(&sock->startlistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tlsdnslisten_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + if ((int)i == isc_nm_tid()) { + continue; + } + start_tlsdns_child(mgr, iface, sock, fd, i); + } - isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface); - csock->parent = sock; - csock->accept_cb = accept_cb; - csock->accept_cbarg = accept_cbarg; - csock->recv_cb = recv_cb; - csock->recv_cbarg = recv_cbarg; - csock->extrahandlesize = extrahandlesize; - csock->backlog = backlog; - csock->tid = i; - csock->tls.ctx = sslctx; - - /* - * We don't attach to quota, just assign - to avoid - * increasing quota unnecessarily. - */ - csock->pquota = quota; - isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); - -#if HAVE_SO_REUSEPORT_LB || defined(WIN32) - csock->fd = isc__nm_tlsdns_lb_socket(sa_family); -#else - csock->fd = dup(fd); -#endif - REQUIRE(csock->fd >= 0); - - ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock); - isc__nm_maybe_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + if (isc__nm_in_netthread()) { + start_tlsdns_child(mgr, iface, sock, fd, isc_nm_tid()); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -484,21 +511,21 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, #endif LOCK(&sock->lock); - while (sock->rchildren != sock->nchildren) { + while (atomic_load(&sock->rchildren) != sock->nchildren) { WAIT(&sock->cond, &sock->lock); } result = sock->result; atomic_store(&sock->active, true); - BROADCAST(&sock->scond); UNLOCK(&sock->lock); - INSIST(result != ISC_R_DEFAULT); + + INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { - REQUIRE(sock->rchildren == sock->nchildren); + REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; } else { atomic_store(&sock->active, false); - isc__nm_tlsdns_stoplistening(sock); + isc_nm_stoplistening(sock); isc_nmsocket_close(&sock); } @@ -514,7 +541,7 @@ isc__nm_async_tlsdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { int r; int flags = 0; isc_nmsocket_t *sock = NULL; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -603,16 +630,14 @@ done: sock->pquota = NULL; } - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { + atomic_fetch_add(&sock->parent->rchildren, 1); + if (sock->parent->result == ISC_R_UNSET) { sock->parent->result = result; } SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); UNLOCK(&sock->parent->lock); + + isc_barrier_wait(&sock->parent->startlistening); } static void @@ -676,7 +701,15 @@ isc__nm_tlsdns_stoplistening(isc_nmsocket_t *sock) { INSIST(0); ISC_UNREACHABLE(); } - enqueue_stoplistening(sock); + + if (!isc__nm_in_netthread()) { + enqueue_stoplistening(sock); + } else if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tlsdns_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } static void @@ -770,7 +803,15 @@ isc__nm_async_tlsdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - stop_tlsdns_parent(sock); + /* + * If network manager is paused, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_tlsdns_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } void @@ -1770,8 +1811,6 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tlsdns_child(isc_nmsocket_t *sock) { - bool last_child = false; - REQUIRE(sock->type == isc_nm_tlsdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1782,34 +1821,42 @@ stop_tlsdns_child(isc_nmsocket_t *sock) { tlsdns_close_direct(sock); - LOCK(&sock->parent->lock); - sock->parent->rchildren -= 1; - last_child = (sock->parent->rchildren == 0); - UNLOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); - if (last_child) { - atomic_store(&sock->parent->closed, true); - isc__nmsocket_prep_destroy(sock->parent); - } + isc_barrier_wait(&sock->parent->stoplistening); } static void stop_tlsdns_parent(isc_nmsocket_t *sock) { + isc_nmsocket_t *csock = NULL; + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tlsdnslistener); + isc_barrier_init(&sock->stoplistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_tlsdnsstop_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + csock = &sock->children[i]; REQUIRE(VALID_NMSOCK(csock)); - atomic_store(&csock->active, false); + if ((int)i == isc_nm_tid()) { + /* + * We need to schedule closing the other sockets first + */ + continue; + } - ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock); - isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], - (isc__netievent_t *)ievent); + atomic_store(&csock->active, false); + enqueue_stoplistening(csock); } + + csock = &sock->children[isc_nm_tid()]; + atomic_store(&csock->active, false); + stop_tlsdns_child(csock); + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tlsstream.c b/lib/isc/netmgr/tlsstream.c index fab2072f22..ec339a738a 100644 --- a/lib/isc/netmgr/tlsstream.c +++ b/lib/isc/netmgr/tlsstream.c @@ -621,7 +621,7 @@ isc_nm_listentls(isc_nm_t *mgr, isc_nmiface_t *iface, tlssock->tlsstream.server_iface = *iface; ISC_LINK_INIT(&tlssock->tlsstream.server_iface.addr, link); tlssock->iface = &tlssock->tlsstream.server_iface; - tlssock->result = ISC_R_DEFAULT; + tlssock->result = ISC_R_UNSET; tlssock->accept_cb = accept_cb; tlssock->accept_cbarg = accept_cbarg; tlssock->extrahandlesize = extrahandlesize; @@ -643,19 +643,12 @@ isc_nm_listentls(isc_nm_t *mgr, isc_nmiface_t *iface, /* wait for listen result */ isc__nmsocket_attach(tlssock->outer, &tsock); - LOCK(&tlssock->outer->lock); - while (tlssock->outer->rchildren != tlssock->outer->nchildren) { - WAIT(&tlssock->outer->cond, &tlssock->outer->lock); - } - result = tlssock->outer->result; tlssock->result = result; atomic_store(&tlssock->active, true); INSIST(tlssock->outer->tlsstream.tlslistener == NULL); isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener); - BROADCAST(&tlssock->outer->scond); - UNLOCK(&tlssock->outer->lock); isc__nmsocket_detach(&tsock); - INSIST(result != ISC_R_DEFAULT); + INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { atomic_store(&tlssock->listening, true); @@ -891,7 +884,7 @@ isc_nm_tlsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, ISC_LINK_INIT(&nsock->tlsstream.local_iface.addr, link); nsock->iface = &nsock->tlsstream.local_iface; nsock->extrahandlesize = extrahandlesize; - nsock->result = ISC_R_DEFAULT; + nsock->result = ISC_R_UNSET; nsock->connect_cb = cb; nsock->connect_cbarg = cbarg; nsock->connect_timeout = timeout; diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 221da22663..ca4900c5ea 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -77,18 +78,44 @@ isc__nm_udp_lb_socket(sa_family_t sa_family) { return (sock); } +static void +start_udp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock, + uv_os_sock_t fd, int tid) { + isc_nmsocket_t *csock; + isc__netievent_udplisten_t *ievent = NULL; + + csock = &sock->children[tid]; + + isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface); + csock->parent = sock; + csock->iface = sock->iface; + csock->reading = true; + csock->recv_cb = sock->recv_cb; + csock->recv_cbarg = sock->recv_cbarg; + csock->extrahandlesize = sock->extrahandlesize; + csock->tid = tid; + +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + UNUSED(fd); + csock->fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family); +#else + csock->fd = dup(fd); +#endif + REQUIRE(csock->fd >= 0); + + ievent = isc__nm_get_netievent_udplisten(mgr, csock); + isc__nm_maybe_enqueue_ievent(&mgr->workers[tid], + (isc__netievent_t *)ievent); +} + isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { isc_result_t result = ISC_R_SUCCESS; isc_nmsocket_t *sock = NULL; - sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; -#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) - uv_os_sock_t fd = -1; -#endif - REQUIRE(VALID_NM(mgr)); + uv_os_sock_t fd = -1; /* * We are creating mgr->nworkers duplicated sockets, one @@ -97,7 +124,7 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t)); isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface); - sock->rchildren = 0; + atomic_init(&sock->rchildren, 0); #if defined(WIN32) sock->nchildren = 1; #else @@ -111,37 +138,29 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, sock->recv_cb = cb; sock->recv_cbarg = cbarg; sock->extrahandlesize = extrahandlesize; - sock->result = ISC_R_DEFAULT; - sock->tid = isc_random_uniform(sock->nchildren); + sock->result = ISC_R_UNSET; + if (isc__nm_in_netthread()) { + sock->tid = isc_nm_tid(); + } else { + sock->tid = isc_random_uniform(sock->nchildren); + } sock->fd = -1; #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) - fd = isc__nm_udp_lb_socket(sa_family); + fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family); #endif + isc_barrier_init(&sock->startlistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_udplisten_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + if ((int)i == isc_nm_tid()) { + continue; + } + start_udp_child(mgr, iface, sock, fd, i); + } - isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface); - csock->parent = sock; - csock->iface = sock->iface; - csock->reading = true; - csock->recv_cb = cb; - csock->recv_cbarg = cbarg; - csock->extrahandlesize = sock->extrahandlesize; - csock->tid = i; - -#if HAVE_SO_REUSEPORT_LB || defined(WIN32) - csock->fd = isc__nm_udp_lb_socket(sa_family); -#else - csock->fd = dup(fd); -#endif - REQUIRE(csock->fd >= 0); - - ievent = isc__nm_get_netievent_udplisten(mgr, csock); - isc__nm_maybe_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + if (isc__nm_in_netthread()) { + start_udp_child(mgr, iface, sock, fd, isc_nm_tid()); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -149,21 +168,21 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, #endif LOCK(&sock->lock); - while (sock->rchildren != sock->nchildren) { + while (atomic_load(&sock->rchildren) != sock->nchildren) { WAIT(&sock->cond, &sock->lock); } result = sock->result; atomic_store(&sock->active, true); - BROADCAST(&sock->scond); UNLOCK(&sock->lock); - INSIST(result != ISC_R_DEFAULT); + + INSIST(result != ISC_R_UNSET); if (result == ISC_R_SUCCESS) { - REQUIRE(sock->rchildren == sock->nchildren); + REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren); *sockp = sock; } else { atomic_store(&sock->active, false); - isc__nm_udp_stoplistening(sock); + isc_nm_stoplistening(sock); isc_nmsocket_close(&sock); } @@ -181,7 +200,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { int r, uv_bind_flags = 0; int uv_init_flags = 0; sa_family_t sa_family; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -269,16 +288,14 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { done: result = isc__nm_uverr2result(r); - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { + atomic_fetch_add(&sock->parent->rchildren, 1); + if (sock->parent->result == ISC_R_UNSET) { sock->parent->result = result; } SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); UNLOCK(&sock->parent->lock); + + isc_barrier_wait(&sock->parent->startlistening); } static void @@ -300,7 +317,14 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { ISC_UNREACHABLE(); } - enqueue_stoplistening(sock); + if (!isc__nm_in_netthread()) { + enqueue_stoplistening(sock); + } else if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_udp_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } /* @@ -324,7 +348,12 @@ isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { /* * If network manager is paused, re-enqueue the event for later. */ - stop_udp_parent(sock); + if (!isc__nm_acquire_interlocked(sock->mgr)) { + enqueue_stoplistening(sock); + } else { + stop_udp_parent(sock); + isc__nm_drop_interlocked(sock->mgr); + } } /* @@ -590,7 +619,7 @@ static isc_result_t udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int uv_bind_flags = UV_UDP_REUSEADDR; - isc_result_t result = ISC_R_DEFAULT; + isc_result_t result = ISC_R_UNSET; int tries = 3; int r; @@ -733,7 +762,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, sock->read_timeout = timeout; sock->extrahandlesize = extrahandlesize; sock->peer = peer->addr; - sock->result = ISC_R_DEFAULT; + sock->result = ISC_R_UNSET; atomic_init(&sock->client, true); req = isc__nm_uvreq_get(mgr, sock); @@ -782,7 +811,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, (isc__netievent_t *)event); } LOCK(&sock->lock); - while (sock->result == ISC_R_DEFAULT) { + while (sock->result == ISC_R_UNSET) { WAIT(&sock->cond, &sock->lock); } atomic_store(&sock->active, true); @@ -970,8 +999,6 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_nm_tid()); - bool last_child = false; - if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { return; @@ -979,33 +1006,41 @@ stop_udp_child(isc_nmsocket_t *sock) { udp_close_direct(sock); - LOCK(&sock->parent->lock); - sock->parent->rchildren -= 1; - last_child = (sock->parent->rchildren == 0); - UNLOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); - if (last_child) { - atomic_store(&sock->parent->closed, true); - isc__nmsocket_prep_destroy(sock->parent); - } + isc_barrier_wait(&sock->parent->stoplistening); } static void stop_udp_parent(isc_nmsocket_t *sock) { + isc_nmsocket_t *csock = NULL; + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udplistener); + isc_barrier_init(&sock->stoplistening, sock->nchildren); + for (size_t i = 0; i < sock->nchildren; i++) { - isc__netievent_udpstop_t *ievent = NULL; - isc_nmsocket_t *csock = &sock->children[i]; + csock = &sock->children[i]; REQUIRE(VALID_NMSOCK(csock)); - atomic_store(&csock->active, false); + if ((int)i == isc_nm_tid()) { + /* + * We need to schedule closing the other sockets first + */ + continue; + } - ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock); - isc__nm_enqueue_ievent(&sock->mgr->workers[i], - (isc__netievent_t *)ievent); + atomic_store(&csock->active, false); + enqueue_stoplistening(csock); } + + csock = &sock->children[isc_nm_tid()]; + atomic_store(&csock->active, false); + stop_udp_child(csock); + + atomic_store(&sock->closed, true); + isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/task.c b/lib/isc/task.c index 5c217ad5b0..49e2c4fcc6 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1014,7 +1014,7 @@ isc__taskmgr_shutdown(isc_taskmgr_t *manager) { REQUIRE(VALID_MANAGER(manager)); - XTHREADTRACE(e "isc_taskmgr_shutdown"); + XTHREADTRACE("isc_taskmgr_shutdown"); /* * Only one non-worker thread may ever call this routine. * If a worker thread wants to initiate shutdown of the diff --git a/lib/isc/win32/libisc.vcxproj.filters.in b/lib/isc/win32/libisc.vcxproj.filters.in index 6c10081745..ce7b6e7edd 100644 --- a/lib/isc/win32/libisc.vcxproj.filters.in +++ b/lib/isc/win32/libisc.vcxproj.filters.in @@ -44,6 +44,9 @@ Library Header Files + + Library Header Files + Library Header Files diff --git a/lib/isc/win32/libisc.vcxproj.in b/lib/isc/win32/libisc.vcxproj.in index 7689222868..6f73664314 100644 --- a/lib/isc/win32/libisc.vcxproj.in +++ b/lib/isc/win32/libisc.vcxproj.in @@ -269,6 +269,7 @@ copy InstallFiles ..\Build\Release\ + diff --git a/util/copyrights b/util/copyrights index ba92b26e62..96f8dd7eec 100644 --- a/util/copyrights +++ b/util/copyrights @@ -1852,6 +1852,7 @@ ./lib/isc/include/isc/atomic.h C 2018,2019,2020,2021 ./lib/isc/include/isc/attributes.h C 2020,2021 ./lib/isc/include/isc/backtrace.h C 2009,2016,2018,2019,2020,2021 +./lib/isc/include/isc/barrier.h C 2021 ./lib/isc/include/isc/base32.h C 2008,2014,2016,2018,2019,2020,2021 ./lib/isc/include/isc/base64.h C 1999,2000,2001,2004,2005,2006,2007,2016,2018,2019,2020,2021 ./lib/isc/include/isc/bind9.h C 2009,2013,2016,2018,2019,2020,2021