diff --git a/CHANGES b/CHANGES index da00868330..52577f1f06 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,5 @@ +5335. [func] Make TCP listening code multithreaded. [GL !2659] + 5334. [doc] Update documentation with dnssec-policy clarifications. Also change some defaults. diff --git a/config.h.in b/config.h.in index 88f46849b8..1b0a2a6a62 100644 --- a/config.h.in +++ b/config.h.in @@ -465,6 +465,12 @@ /* Define to 1 if you have the `usleep' function. */ #undef HAVE_USLEEP +/* Define to 1 if you have the `uv_handle_get_data' function. */ +#undef HAVE_UV_HANDLE_GET_DATA + +/* Define to 1 if you have the `uv_handle_set_data' function. */ +#undef HAVE_UV_HANDLE_SET_DATA + /* Use zlib library */ #undef HAVE_ZLIB diff --git a/configure b/configure index 7d1629af0e..a3eb8aa1d2 100755 --- a/configure +++ b/configure @@ -15943,6 +15943,21 @@ fi CFLAGS="$CFLAGS $LIBUV_CFLAGS" LIBS="$LIBS $LIBUV_LIBS" +# Those functions are only provided in newer versions of libuv, we'll be emulating them +# for now +for ac_func in uv_handle_get_data uv_handle_set_data +do : + as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` +ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" +if eval test \"x\$"$as_ac_var"\" = x"yes"; then : + cat >>confdefs.h <<_ACEOF +#define `$as_echo "HAVE_$ac_func" | $as_tr_cpp` 1 +_ACEOF + +fi +done + + # # flockfile is usually provided by pthreads # diff --git a/configure.ac b/configure.ac index 25b0ce8959..19b4816540 100644 --- a/configure.ac +++ b/configure.ac @@ -663,6 +663,10 @@ AX_SAVE_FLAGS([libuv]) CFLAGS="$CFLAGS $LIBUV_CFLAGS" LIBS="$LIBS $LIBUV_LIBS" +# Those functions are only provided in newer versions of libuv, we'll be emulating them +# for now +AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data]) + # # flockfile is usually provided by pthreads # diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 3acacc756b..daae5fd880 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -46,7 +46,11 @@ typedef struct isc__networker { bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents_prio; /* priority async events + * used for listening etc. + * can be processed while + * worker is paused */ isc_refcount_t references; atomic_int_fast64_t pktcount; char recvbuf[65536]; @@ -103,9 +107,6 @@ struct isc_nmiface { }; typedef enum isc__netievent_type { - netievent_stop, - netievent_udplisten, - netievent_udpstoplisten, netievent_udpsend, netievent_udprecv, netievent_tcpconnect, @@ -113,11 +114,22 @@ typedef enum isc__netievent_type { netievent_tcprecv, netievent_tcpstartread, netievent_tcppauseread, - netievent_tcplisten, - netievent_tcpstoplisten, - netievent_tcpclose, + netievent_tcpchildlisten, + netievent_tcpchildstop, netievent_closecb, netievent_shutdown, + netievent_stop, + netievent_udpstop, + netievent_tcpstop, + netievent_tcpclose, + netievent_tcpdnsclose, + netievent_prio = 0xff, /* event type values higher than this + * will be treated as high-priority + * events, which can be processed + * while the netmgr is paused. + */ + netievent_udplisten, + netievent_tcplisten, } isc__netievent_type; /* @@ -154,11 +166,13 @@ typedef struct isc__nm_uvreq { isc_nmsocket_t * sock; isc_nmhandle_t * handle; uv_buf_t uvbuf; /* translated isc_region_t, to be - sent or received */ + * sent or received */ isc_sockaddr_t local; /* local address */ isc_sockaddr_t peer; /* peer address */ isc__nm_cb_t cb; /* callback */ void * cbarg; /* callback argument */ + uv_pipe_t ipc; /* used for sending socket + * uv_handles to other threads */ union { uv_req_t req; uv_getaddrinfo_t getaddrinfo; @@ -178,12 +192,13 @@ typedef struct isc__netievent__socket { } isc__netievent__socket_t; typedef isc__netievent__socket_t isc__netievent_udplisten_t; -typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; -typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; +typedef isc__netievent__socket_t isc__netievent_udpstop_t; +typedef isc__netievent__socket_t isc__netievent_tcpstop_t; +typedef isc__netievent__socket_t isc__netievent_tcpchildstop_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; -typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -193,8 +208,18 @@ typedef struct isc__netievent__socket_req { typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; +typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; +typedef struct isc__netievent__socket_handle { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc_nmhandle_t *handle; +} isc__netievent__socket_handle_t; + +typedef isc__netievent__socket_handle_t isc__netievent_closecb_t; + + typedef struct isc__netievent_udpsend { isc__netievent_type type; isc_nmsocket_t *sock; @@ -274,6 +299,7 @@ typedef enum isc_nmsocket_type { isc_nm_udplistener, /* Aggregate of nm_udpsocks */ isc_nm_tcpsocket, isc_nm_tcplistener, + isc_nm_tcpchildlistener, isc_nm_tcpdnslistener, isc_nm_tcpdnssocket } isc_nmsocket_type; @@ -293,7 +319,7 @@ struct isc_nmsocket { isc_nm_t *mgr; isc_nmsocket_t *parent; - /* + /*% * quota is the TCP client, attached when a TCP connection * is established. pquota is a non-attached pointer to the * TCP client quota, stored in listening sockets but only @@ -303,7 +329,7 @@ struct isc_nmsocket { isc_quota_t *pquota; bool overquota; - /* + /*% * TCP read timeout timer. */ uv_timer_t timer; @@ -316,13 +342,18 @@ struct isc_nmsocket { /*% server socket for connections */ isc_nmsocket_t *server; - /*% children sockets for multi-socket setups */ + /*% Child sockets for multi-socket setups */ isc_nmsocket_t *children; int nchildren; isc_nmiface_t *iface; isc_nmhandle_t *tcphandle; - /*% extra data allocated at the end of each isc_nmhandle_t */ + /*% Used to transfer listening TCP sockets to children */ + uv_pipe_t ipc; + char ipc_pipe_name[64]; + atomic_int_fast32_t schildren; + + /*% Extra data allocated at the end of each isc_nmhandle_t */ size_t extrahandlesize; /*% TCP backlog */ @@ -332,16 +363,17 @@ struct isc_nmsocket { uv_os_sock_t fd; union uv_any_handle uv_handle; + /*% Peer address */ isc_sockaddr_t peer; /* Atomic */ - /*% Number of running (e.g. listening) children sockets */ + /*% Number of running (e.g. listening) child sockets */ atomic_int_fast32_t rchildren; /*% - * Socket if active if it's listening, working, etc., if we're - * closing a socket it doesn't make any sense to e.g. still - * push handles or reqs for reuse + * Socket is active if it's listening, working, etc. If it's + * closing, then it doesn't make a sense, for example, to + * push handles or reqs for reuse. */ atomic_bool active; atomic_bool destroying; @@ -349,7 +381,8 @@ struct isc_nmsocket { /*% * Socket is closed if it's not active and all the possible * callbacks were fired, there are no active handles, etc. - * active==false, closed==false means the socket is closing. + * If active==false but closed==false, that means the socket + * is closing. */ atomic_bool closed; atomic_bool listening; @@ -391,10 +424,19 @@ struct isc_nmsocket { isc_astack_t *inactivehandles; isc_astack_t *inactivereqs; - /* Used for active/rchildren during shutdown */ + /*% + * Used to wait for TCP listening events to complete, and + * for the number of running children to reach zero during + * shutdown. + */ isc_mutex_t lock; isc_condition_t cond; + /*% + * Used to pass a result back from TCP listening events. + */ + isc_result_t result; + /*% * List of active handles. * ah - current position in 'ah_frees'; this represents the @@ -421,12 +463,12 @@ struct isc_nmsocket { size_t *ah_frees; isc_nmhandle_t **ah_handles; - /* Buffer for TCPDNS processing, optional */ + /*% Buffer for TCPDNS processing */ size_t buf_size; size_t buf_len; unsigned char *buf; - /* + /*% * This function will be called with handle->sock * as the argument whenever a handle's references drop * to zero, after its reset callback has been called. @@ -524,13 +566,13 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock); */ void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Issue a 'handle closed' callback on the socket. */ void -isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Walk through all uv handles, get the underlying sockets and issue * close on them. @@ -544,13 +586,12 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, */ void -isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_udpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous UDP events (listen, stoplisten, send). */ @@ -575,20 +616,23 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock); */ void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous TCP events (connect, listen, * stoplisten, send, read, pause, close). @@ -607,6 +651,9 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock); * Close a TCPDNS socket. */ +void +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); + #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) isc_result_t diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 902fe2d1cc..0c1cba4561 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -29,6 +29,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" /* @@ -42,6 +43,12 @@ ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#ifdef WIN32 +#define NAMED_PIPE_PATTERN "\\\\.\\pipe\\named-%d-%u.pipe" +#else +#define NAMED_PIPE_PATTERN "/tmp/named-%d-%u.pipe" +#endif + static void nmsocket_maybe_destroy(isc_nmsocket_t *sock); static void @@ -50,6 +57,8 @@ static void * nm_thread(void *worker0); static void async_cb(uv_async_t *handle); +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue); int isc_nm_tid() { @@ -128,6 +137,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); + worker->ievents_prio = isc_queue_new(mgr->mctx, 128); /* * We need to do this here and not in nm_thread to avoid a @@ -175,17 +185,29 @@ nm_destroy(isc_nm_t **mgr0) { UNLOCK(&mgr->lock); for (size_t i = 0; i < mgr->nworkers; i++) { - /* Empty the async event queue */ - isc__netievent_t *ievent; + isc__networker_t *worker = &mgr->workers[i]; + isc__netievent_t *ievent = NULL; + int r; + + /* Empty the async event queues */ while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(mgr->workers[i].ievents)) != NULL) + isc_queue_dequeue(worker->ievents)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - int r = uv_loop_close(&mgr->workers[i].loop); + + while ((ievent = (isc__netievent_t *) + isc_queue_dequeue(worker->ievents_prio)) != NULL) + { + isc_mempool_put(mgr->evpool, ievent); + } + + r = uv_loop_close(&worker->loop); INSIST(r == 0); - isc_queue_destroy(mgr->workers[i].ievents); - isc_thread_join(mgr->workers[i].thread, NULL); + + isc_queue_destroy(worker->ievents); + isc_queue_destroy(worker->ievents_prio); + isc_thread_join(worker->thread, NULL); } isc_condition_destroy(&mgr->wkstatecond); @@ -315,10 +337,18 @@ isc_nm_destroy(isc_nm_t **mgr0) { * Wait for the manager to be dereferenced elsewhere. */ while (isc_refcount_current(&mgr->references) > 1) { + /* + * Sometimes libuv gets stuck, pausing and unpausing + * netmgr goes over all events in async queue for all + * the workers, and since it's done only on shutdown it + * doesn't cost us anything. + */ + isc_nm_pause(mgr); + isc_nm_resume(mgr); #ifdef WIN32 - _sleep(1000); + _sleep(1000); #else - usleep(1000000); + usleep(1000000); #endif } @@ -403,6 +433,9 @@ nm_thread(void *worker0) { UNLOCK(&worker->mgr->lock); WAIT(&worker->cond, &worker->lock); + + /* Process priority events */ + process_queue(worker, worker->ievents_prio); } if (pausing) { uint32_t wp = atomic_fetch_sub_explicit( @@ -452,7 +485,8 @@ nm_thread(void *worker0) { /* * Empty the async queue. */ - async_cb(&worker->async); + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); } LOCK(&worker->mgr->lock); @@ -472,21 +506,27 @@ nm_thread(void *worker0) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *) handle->loop->data; - isc__netievent_t *ievent; + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); +} + +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue) { + isc__netievent_t *ievent = NULL; while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(worker->ievents)) != NULL) + isc_queue_dequeue(queue)) != NULL) { switch (ievent->type) { case netievent_stop: - uv_stop(handle->loop); + uv_stop(&worker->loop); isc_mempool_put(worker->mgr->evpool, ievent); return; case netievent_udplisten: isc__nm_async_udplisten(worker, ievent); break; - case netievent_udpstoplisten: - isc__nm_async_udpstoplisten(worker, ievent); + case netievent_udpstop: + isc__nm_async_udpstop(worker, ievent); break; case netievent_udpsend: isc__nm_async_udpsend(worker, ievent); @@ -497,6 +537,9 @@ async_cb(uv_async_t *handle) { case netievent_tcplisten: isc__nm_async_tcplisten(worker, ievent); break; + case netievent_tcpchildlisten: + isc__nm_async_tcpchildlisten(worker, ievent); + break; case netievent_tcpstartread: isc__nm_async_startread(worker, ievent); break; @@ -506,12 +549,18 @@ async_cb(uv_async_t *handle) { case netievent_tcpsend: isc__nm_async_tcpsend(worker, ievent); break; - case netievent_tcpstoplisten: - isc__nm_async_tcpstoplisten(worker, ievent); + case netievent_tcpstop: + isc__nm_async_tcpstop(worker, ievent); + break; + case netievent_tcpchildstop: + isc__nm_async_tcpchildstop(worker, ievent); break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; + case netievent_tcpdnsclose: + isc__nm_async_tcpdnsclose(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); break; @@ -544,7 +593,18 @@ isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) { void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { - isc_queue_enqueue(worker->ievents, (uintptr_t)event); + if (event->type > netievent_prio) { + /* + * We need to make sure this signal will be delivered and + * the queue will be processed. + */ + LOCK(&worker->lock); + isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); + SIGNAL(&worker->cond); + UNLOCK(&worker->lock); + } else { + isc_queue_enqueue(worker->ievents, (uintptr_t)event); + } uv_async_send(&worker->async); } @@ -626,8 +686,9 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { sock->pquota = NULL; if (sock->timer_initialized) { - uv_close((uv_handle_t *)&sock->timer, NULL); sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *)&sock->timer, NULL); } isc_astack_destroy(sock->inactivehandles); @@ -652,11 +713,9 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { static void nmsocket_maybe_destroy(isc_nmsocket_t *sock) { - int active_handles = 0; + int active_handles; bool destroy = false; - REQUIRE(!isc__nmsocket_active(sock)); - if (sock->parent != NULL) { /* * This is a child socket and cannot be destroyed except @@ -673,7 +732,14 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { * accept destruction. */ LOCK(&sock->lock); - active_handles += atomic_load(&sock->ah); + if (atomic_load(&sock->active) || atomic_load(&sock->destroying) || + !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) + { + UNLOCK(&sock->lock); + return; + } + + active_handles = atomic_load(&sock->ah); if (sock->children != NULL) { for (int i = 0; i < sock->nchildren; i++) { LOCK(&sock->children[i].lock); @@ -682,10 +748,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { } } - if (atomic_load(&sock->closed) && - atomic_load(&sock->references) == 0 && - (active_handles == 0 || sock->tcphandle != NULL)) - { + if (active_handles == 0 || sock->tcphandle != NULL) { destroy = true; } UNLOCK(&sock->lock); @@ -790,6 +853,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, sock->ah_handles[i] = NULL; } + /* + * Use a random number in the named pipe name. Also add getpid() + * to the name to make sure we don't get a conflict between + * different unit tests running at the same time, where the PRNG + * is initialized to a constant seed. + */ + snprintf(sock->ipc_pipe_name, sizeof(sock->ipc_pipe_name), + NAMED_PIPE_PATTERN, getpid(), isc_random32()); + sock->ipc_pipe_name[sizeof(sock->ipc_pipe_name) - 1] = '\0'; + isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); isc_refcount_init(&sock->references, 1); @@ -805,7 +878,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, void isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data(handle); isc__networker_t *worker = NULL; REQUIRE(VALID_NMSOCK(sock)); @@ -958,11 +1031,37 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra); } +static void +nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { + /* + * We do all of this under lock to avoid races with socket + * destruction. We have to do this now, because at this point the + * socket is either unused or still attached to event->sock. + */ + LOCK(&sock->lock); + + INSIST(sock->ah_handles[handle->ah_pos] == handle); + INSIST(sock->ah_size > handle->ah_pos); + INSIST(atomic_load(&sock->ah) > 0); + + sock->ah_handles[handle->ah_pos] = NULL; + size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; + sock->ah_frees[handlenum] = handle->ah_pos; + handle->ah_pos = 0; + bool reuse = false; + if (atomic_load(&sock->active)) { + reuse = isc_astack_trypush(sock->inactivehandles, + handle); + } + if (!reuse) { + nmhandle_free(sock, handle); + } + UNLOCK(&sock->lock); +} + void isc_nmhandle_unref(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; - size_t handlenum; - bool reuse = false; int refs; REQUIRE(VALID_NMHANDLE(handle)); @@ -980,61 +1079,34 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { handle->doreset(handle->opaque); } - /* - * We do all of this under lock to avoid races with socket - * destruction. - */ - LOCK(&sock->lock); - - INSIST(sock->ah_handles[handle->ah_pos] == handle); - INSIST(sock->ah_size > handle->ah_pos); - INSIST(atomic_load(&sock->ah) > 0); - - sock->ah_handles[handle->ah_pos] = NULL; - handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; - sock->ah_frees[handlenum] = handle->ah_pos; - handle->ah_pos = 0; - - if (atomic_load(&sock->active)) { - reuse = isc_astack_trypush(sock->inactivehandles, - handle); - } - - UNLOCK(&sock->lock); - - if (!reuse) { - nmhandle_free(sock, handle); - } - /* * The handle is closed. If the socket has a callback configured * for that (e.g., to perform cleanup after request processing), - * call it now. + * call it now, or schedule it to run asynchronously. */ if (sock->closehandle_cb != NULL) { if (sock->tid == isc_nm_tid()) { sock->closehandle_cb(sock); } else { - isc__netievent_closecb_t * event = + isc__netievent_closecb_t *event = isc__nm_get_ievent(sock->mgr, netievent_closecb); isc_nmsocket_attach(sock, &event->sock); + event->handle = handle; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); + /* - * If we do this asynchronously then the async event - * will clean the socket, so just exit. + * If we're doing this asynchronously, then the + * async event will take care of cleaning up the + * handle and closing the socket. */ return; } } - if (atomic_load(&sock->ah) == 0 && - !atomic_load(&sock->active) && - !atomic_load(&sock->destroying)) - { - nmsocket_maybe_destroy(sock); - } + nmhandle_deactivate(sock, handle); + nmsocket_maybe_destroy(sock); } void * @@ -1166,9 +1238,9 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, } void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_closecb_t *ievent = - (isc__netievent_closecb_t *) ievent0; + (isc__netievent_closecb_t *) ev0; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -1176,6 +1248,8 @@ isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) { UNUSED(worker); + nmhandle_deactivate(ievent->sock, ievent->handle); + ievent->sock->closehandle_cb(ievent->sock); isc_nmsocket_detach(&ievent->sock); } @@ -1186,7 +1260,7 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) { switch(handle->type) { case UV_TCP: - isc__nm_tcp_shutdown((isc_nmsocket_t *) handle->data); + isc__nm_tcp_shutdown(uv_handle_get_data(handle)); break; default: break; @@ -1194,8 +1268,8 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) { } void -isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0) { - UNUSED(ievent0); +isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { + UNUSED(ev0); uv_walk(&worker->loop, shutdown_walk_cb, NULL); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 75dcd3a7e2..a996acfb01 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -10,6 +10,7 @@ */ #include +#include #include #include @@ -28,6 +29,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" static int @@ -50,9 +52,24 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); static void tcp_close_cb(uv_handle_t *uvhandle); +static void +ipc_connection_cb(uv_stream_t *stream, int status); +static void +ipc_write_cb(uv_write_t *uvreq, int status); +static void +ipc_close_cb(uv_handle_t *handle); +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status); +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); +static void +stoplistening(isc_nmsocket_t *sock); +static void +tcp_listenclose_cb(uv_handle_t *handle); + static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { - isc__networker_t *worker; + isc__networker_t *worker = NULL; int r; REQUIRE(isc__nm_in_netthread()); @@ -71,16 +88,16 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (r); } } - + uv_handle_set_data(&sock->uv_handle.handle, sock); r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, &req->peer.type.sa, tcp_connect_cb); return (r); } void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpconnect_t *ievent = - (isc__netievent_tcpconnect_t *) ievent0; + (isc__netievent_tcpconnect_t *) ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; int r; @@ -98,7 +115,8 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data; - isc_nmsocket_t *sock = uvreq->handle->data; + isc_nmsocket_t *sock = NULL; + sock = uv_handle_get_data((uv_handle_t *) uvreq->handle); REQUIRE(VALID_UVREQ(req)); @@ -131,21 +149,26 @@ isc_result_t isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize, int backlog, - isc_quota_t *quota, - isc_nmsocket_t **sockp) + isc_quota_t *quota, isc_nmsocket_t **sockp) { - isc__netievent_tcplisten_t *ievent = NULL; isc_nmsocket_t *nsock = NULL; + isc__netievent_tcplisten_t *ievent = NULL; REQUIRE(VALID_NM(mgr)); nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener); nsock->iface = iface; + nsock->nchildren = mgr->nworkers; + atomic_init(&nsock->rchildren, mgr->nworkers); + nsock->children = isc_mem_get(mgr->mctx, + mgr->nworkers * sizeof(*nsock)); + memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); nsock->rcb.accept = cb; nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; nsock->backlog = backlog; + nsock->result = ISC_R_SUCCESS; if (quota != NULL) { /* * We don't attach to quota, just assign - to avoid @@ -153,82 +176,330 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, */ nsock->pquota = quota; } - nsock->tid = isc_random_uniform(mgr->nworkers); - /* - * Listening to TCP is rare enough not to care about the - * added overhead from passing this to another thread. - */ ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; - isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], - (isc__netievent_t *) ievent); - *sockp = nsock; + if (isc__nm_in_netthread()) { + nsock->tid = isc_nm_tid(); + isc__nm_async_tcplisten(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(mgr, ievent); + } else { + nsock->tid = isc_random_uniform(mgr->nworkers); + LOCK(&nsock->lock); + isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + WAIT(&nsock->cond, &nsock->lock); + UNLOCK(&nsock->lock); + } - return (ISC_R_SUCCESS); + if (nsock->result == ISC_R_SUCCESS) { + *sockp = nsock; + return (ISC_R_SUCCESS); + } else { + isc_result_t result = nsock->result; + isc_nmsocket_detach(&nsock); + return (result); + } } +#ifndef WIN32 +/* + * Run fsync() on the directory containing a socket's IPC pipe, to + * ensure that all threads can see the pipe. + */ +static void +syncdir(const isc_nmsocket_t *sock) { + char *pipe = isc_mem_strdup(sock->mgr->mctx, sock->ipc_pipe_name); + int fd = open(dirname(pipe), O_RDONLY); + + RUNTIME_CHECK(fd >= 0); + + isc_mem_free(sock->mgr->mctx, pipe); + + fsync(fd); + close(fd); +} +#endif + +/* + * For multi-threaded TCP listening, we create a single "parent" socket, + * bind to it, and then pass its uv_handle to a set of child sockets, one + * per worker. For thread safety, the passing of the socket's uv_handle has + * to be done via IPC socket. + * + * This design pattern is ugly but it's what's recommended by the libuv + * documentation. (A prior version of libuv had uv_export() and + * uv_import() functions which would have simplified this greatly, but + * they have been deprecated and removed.) + */ void -isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcplisten_t *ievent = - (isc__netievent_tcplisten_t *) ievent0; + (isc__netievent_tcplisten_t *) ev0; isc_nmsocket_t *sock = ievent->sock; int r; REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->type == isc_nm_tcplistener); - r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); - if (r != 0) { - return; + /* Initialize children now to make cleaning up easier */ + for (int i = 0; i < sock->nchildren; i++) { + isc_nmsocket_t *csock = &sock->children[i]; + + isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); + csock->parent = sock; + csock->iface = sock->iface; + csock->tid = i; + csock->pquota = sock->pquota; + csock->backlog = sock->backlog; + csock->extrahandlesize = sock->extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.accept = sock->rcb.accept; + csock->rcbarg = sock->rcbarg; + csock->fd = -1; } - uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); - r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, - tcp_connection_cb); + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { - return; + /* It was never opened */ + atomic_store(&sock->closed, true); + sock->result = isc__nm_uverr2result(r); + goto done; + } + + r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); + if (r != 0) { + uv_close(&sock->uv_handle.handle, tcp_close_cb); + sock->result = isc__nm_uverr2result(r); + goto done; + } + uv_handle_set_data(&sock->uv_handle.handle, sock); + + /* + * uv_pipe_init() is incorrectly documented in libuv, and the + * example in benchmark-multi-accept.c is also wrong. + * + * The third parameter ('ipc') indicates that the pipe will be + * used to *send* a handle to other threads. Therefore, it must + * must be set to 0 for a 'listening' IPC socket, and 1 + * only for sockets that are really passing FDs between + * threads. + */ + r = uv_pipe_init(&worker->loop, &sock->ipc, 0); + RUNTIME_CHECK(r == 0); + + uv_handle_set_data((uv_handle_t *)&sock->ipc, sock); + r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); + RUNTIME_CHECK(r == 0); + + r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, + ipc_connection_cb); + RUNTIME_CHECK(r == 0); + +#ifndef WIN32 + /* + * On Unices a child thread might not see the pipe yet; + * that happened quite often in unit tests on FreeBSD. + * Syncing the directory ensures that the pipe is visible + * to everyone. + * This isn't done on Windows because named pipes exist + * within a different namespace, not on VFS. + */ + syncdir(sock); +#endif + + /* + * For each worker we send a 'tcpchildlisten' event. The child + * listener will then receive its version of the socket + * uv_handle via IPC in isc__nm_async_tcpchildlisten(). + */ + for (int i = 0; i < sock->nchildren; i++) { + isc_nmsocket_t *csock = &sock->children[i]; + isc__netievent_tcpchildlisten_t *event = NULL; + + event = isc__nm_get_ievent(csock->mgr, + netievent_tcpchildlisten); + event->sock = csock; + if (csock->tid == isc_nm_tid()) { + isc__nm_async_tcpchildlisten(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } } atomic_store(&sock->listening, true); + done: + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); return; } +/* + * The parent received an IPC connection from a child and can now send + * the uv_handle to the child for listening. + */ +static void +ipc_connection_cb(uv_stream_t *stream, int status) { + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); + isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; + isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); + int r; + + REQUIRE(status == 0); + + /* + * The buffer can be anything, it will be ignored, but it has to + * be something that won't disappear. + */ + nreq->uvbuf = uv_buf_init((char *)nreq, 1); + uv_pipe_init(&worker->loop, &nreq->ipc, 1); + uv_handle_set_data((uv_handle_t *)&nreq->ipc, nreq); + + r = uv_accept((uv_stream_t *) &sock->ipc, (uv_stream_t *) &nreq->ipc); + RUNTIME_CHECK(r == 0); + + r = uv_write2(&nreq->uv_req.write, + (uv_stream_t *) &nreq->ipc, &nreq->uvbuf, 1, + (uv_stream_t *) &sock->uv_handle.stream, ipc_write_cb); + RUNTIME_CHECK(r == 0); +} + +/* + * The call to send a socket uv_handle is complete; we may be able + * to close the IPC connection. + */ +static void +ipc_write_cb(uv_write_t *uvreq, int status) { + isc__nm_uvreq_t *req = uvreq->data; + + UNUSED(status); + + /* + * We want all children to get the socket. Once that's done, + * we can stop listening on the IPC socket. + */ + if (atomic_fetch_add(&req->sock->schildren, 1) == + req->sock->nchildren - 1) + { + uv_close((uv_handle_t *) &req->sock->ipc, NULL); + } + uv_close((uv_handle_t *) &req->ipc, ipc_close_cb); +} + +/* + * The IPC socket is closed: free its resources. + */ +static void +ipc_close_cb(uv_handle_t *handle) { + isc__nm_uvreq_t *req = uv_handle_get_data(handle); + isc__nm_uvreq_put(&req, req->sock); +} + +/* + * Connect to the parent socket and be ready to receive the uv_handle + * for the socket we'll be listening on. + */ +void +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpchildlisten_t *ievent = + (isc__netievent_tcpchildlisten_t *) ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = NULL; + int r; + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + + r = uv_pipe_init(&worker->loop, &sock->ipc, 1); + RUNTIME_CHECK(r == 0); + + uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); + + req = isc__nm_uvreq_get(sock->mgr, sock); + uv_pipe_connect(&req->uv_req.connect, &sock->ipc, + sock->parent->ipc_pipe_name, + childlisten_ipc_connect_cb); +} + +/* Child is now connected to parent via IPC and can begin reading. */ +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { + isc__nm_uvreq_t *req = uvreq->data; + isc_nmsocket_t *sock = req->sock; + int r; + + UNUSED(status); + + isc__nm_uvreq_put(&req, sock); + + r = uv_read_start((uv_stream_t *) &sock->ipc, isc__nm_alloc_cb, + childlisten_read_cb); + RUNTIME_CHECK(r == 0); +} + +/* + * Child has received the socket uv_handle via IPC, and can now begin + * listening for connections and can close the IPC socket. + */ +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); + isc__networker_t *worker = NULL; + uv_pipe_t *ipc = NULL; + uv_handle_type type; + int r; + + UNUSED(nread); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(buf != NULL); + + ipc = (uv_pipe_t *) stream; + type = uv_pipe_pending_type(ipc); + INSIST(type == UV_TCP); + + isc__nm_free_uvbuf(sock, buf); + worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, (uv_tcp_t *) &sock->uv_handle.tcp); + uv_handle_set_data(&sock->uv_handle.handle, sock); + + uv_accept(stream, &sock->uv_handle.stream); + r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, + tcp_connection_cb); + uv_close((uv_handle_t *) ipc, NULL); + if (r != 0) { + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "IPC socket close failed: %s", + isc_result_totext(isc__nm_uverr2result(r))); + return; + } +} + + void isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_tcpstoplisten_t *ievent = NULL; + isc__netievent_tcpstop_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(!isc__nm_in_netthread()); - ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstoplisten); + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop); isc_nmsocket_attach(sock, &ievent->sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); } -static void -stoplistening_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; - - LOCK(&sock->lock); - atomic_store(&sock->listening, false); - atomic_store(&sock->closed, true); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - - sock->pquota = NULL; - - isc_nmsocket_detach(&sock); -} - void -isc__nm_async_tcpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_tcpstoplisten_t *ievent = - (isc__netievent_tcpstoplisten_t *) ievent0; +isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *) ev0; isc_nmsocket_t *sock = ievent->sock; UNUSED(worker); @@ -237,12 +508,103 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcplistener); - uv_close(&sock->uv_handle.handle, stoplistening_cb); + /* + * If network manager is interlocked, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + isc__netievent_tcpstop_t *event = NULL; + + event = isc__nm_get_ievent(sock->mgr, + netievent_tcpstop); + event->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); + } else { + stoplistening(sock); + isc__nm_drop_interlocked(sock->mgr); + } +} + +static void +stoplistening(isc_nmsocket_t *sock) { + for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_tcpchildstop_t *event = NULL; + + /* + * We can ignore the overhead of event allocation because + * stoplistening is a rare event, and doing it this way + * simplifies sock reference counting. + */ + event = isc__nm_get_ievent(sock->mgr, netievent_tcpchildstop); + isc_nmsocket_attach(&sock->children[i], &event->sock); + + if (i == sock->tid) { + isc__nm_async_tcpchildstop(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } + } + + LOCK(&sock->lock); + while (atomic_load_relaxed(&sock->rchildren) > 0) { + WAIT(&sock->cond, &sock->lock); + } + UNLOCK(&sock->lock); + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); +} + +void +isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpchildstop_t *ievent = + (isc__netievent_tcpchildstop_t *) ev0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc_nm_tid() == sock->tid); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + REQUIRE(sock->parent != NULL); + + /* + * rchildren is atomic, but we still need to change it + * under a lock because the parent is waiting on conditional + * and without it we might deadlock. + */ + LOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); + UNLOCK(&sock->parent->lock); + + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); + BROADCAST(&sock->parent->cond); +} + +/* + * This callback is used for closing both child and parent listening + * sockets; that's why we need to choose the proper lock. + */ +static void +tcp_listenclose_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = uv_handle_get_data(handle); + isc_mutex_t *lock = ((sock->parent != NULL) + ? &sock->parent->lock + : &sock->lock); + + LOCK(lock); + atomic_store(&sock->closed, true); + atomic_store(&sock->listening, false); + sock->pquota = NULL; + UNLOCK(lock); + + isc_nmsocket_detach(&sock); } static void readtimeout_cb(uv_timer_t *handle) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -294,16 +656,16 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { } void -isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_startread_t *ievent = - (isc__netievent_startread_t *) ievent0; + (isc__netievent_startread_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(worker->id == isc_nm_tid()); if (sock->read_timeout != 0) { if (!sock->timer_initialized) { uv_timer_init(&worker->loop, &sock->timer); - sock->timer.data = sock; + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); sock->timer_initialized = true; } uv_timer_start(&sock->timer, readtimeout_cb, @@ -340,9 +702,9 @@ isc_nm_pauseread(isc_nmsocket_t *sock) { } void -isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_pauseread_t *ievent = - (isc__netievent_pauseread_t *) ievent0; + (isc__netievent_pauseread_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(VALID_NMSOCK(sock)); @@ -384,7 +746,7 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - isc_nmsocket_t *sock = stream->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); @@ -394,10 +756,14 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { .base = (unsigned char *) buf->base, .length = nread }; - - INSIST(sock->rcb.recv != NULL); - sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); - + /* + * This might happen if the inner socket is closing. + * It means that it's detached, so the socket will + * be closed. + */ + if (sock->rcb.recv != NULL) { + sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); + } sock->read_timeout = (atomic_load(&sock->keepalive) ? sock->mgr->keepalive : sock->mgr->idle); @@ -415,8 +781,14 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { if (sock->quota) { isc_quota_detach(&sock->quota); } - sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); - + /* + * This might happen if the inner socket is closing. + * It means that it's detached, so the socket will + * be closed. + */ + if (sock->rcb.recv != NULL) { + sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); + } /* * We don't need to clean up now; the socket will be closed and * resources and quota reclaimed when handle is freed in @@ -499,7 +871,7 @@ accept_connection(isc_nmsocket_t *ssock) { static void tcp_connection_cb(uv_stream_t *server, int status) { - isc_nmsocket_t *ssock = server->data; + isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *) server); isc_result_t result; UNUSED(status); @@ -576,9 +948,9 @@ tcp_send_cb(uv_write_t *req, int status) { * Handle 'tcpsend' async event - send a packet on the socket */ void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) { isc_result_t result; - isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *) ievent0; + isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); @@ -615,7 +987,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { static void tcp_close_cb(uv_handle_t *uvhandle) { - isc_nmsocket_t *sock = uvhandle->data; + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); REQUIRE(VALID_NMSOCK(sock)); @@ -625,7 +997,7 @@ tcp_close_cb(uv_handle_t *uvhandle) { static void timer_close_cb(uv_handle_t *uvhandle) { - isc_nmsocket_t *sock = uvhandle->data; + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); REQUIRE(VALID_NMSOCK(sock)); @@ -653,8 +1025,9 @@ tcp_close_direct(isc_nmsocket_t *sock) { } } if (sock->timer_initialized) { - uv_close((uv_handle_t *)&sock->timer, timer_close_cb); sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *)&sock->timer, timer_close_cb); } else { isc_nmsocket_detach(&sock->server); uv_close(&sock->uv_handle.handle, tcp_close_cb); @@ -682,9 +1055,8 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) { } void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0) { - isc__netievent_tcpclose_t *ievent = - (isc__netievent_tcpclose_t *) ievent0; +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); @@ -695,7 +1067,10 @@ void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL) { + if (sock->type == isc_nm_tcpsocket && + sock->tcphandle != NULL && + sock->rcb.recv != NULL) + { sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); } } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index d75fdda943..5048200aca 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -26,6 +26,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" #define TCPDNS_CLIENTS_PER_CONN 23 @@ -76,21 +77,22 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { static void timer_close_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = (isc_nmsocket_t *) uv_handle_get_data(handle); INSIST(VALID_NMSOCK(sock)); - sock->timer_initialized = false; atomic_store(&sock->closed, true); isc_nmsocket_detach(&sock); } static void dnstcp_readtimeout(uv_timer_t *timer) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data; + isc_nmsocket_t *sock = + (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *) timer); REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); isc_nmsocket_detach(&sock->outer); - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); } /* @@ -201,6 +203,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { REQUIRE(VALID_NMSOCK(dnssock)); REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(dnssock->tid == isc_nm_tid()); if (region == NULL) { /* Connection closed */ @@ -296,11 +299,15 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock, extrahandlesize, backlog, quota, &dnslistensock->outer); - - atomic_store(&dnslistensock->listening, true); - *sockp = dnslistensock; - - return (result); + if (result == ISC_R_SUCCESS) { + atomic_store(&dnslistensock->listening, true); + *sockp = dnslistensock; + return (ISC_R_SUCCESS); + } else { + atomic_store(&dnslistensock->closed, true); + isc_nmsocket_detach(&dnslistensock); + return (result); + } } void @@ -483,10 +490,44 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, } -void -isc__nm_tcpdns_close(isc_nmsocket_t *sock) { +static void +tcpdns_close_direct(isc_nmsocket_t *sock) { + REQUIRE(sock->tid == isc_nm_tid()); if (sock->outer != NULL) { + sock->outer->rcb.recv = NULL; isc_nmsocket_detach(&sock->outer); } - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + /* We don't need atomics here, it's all in single network thread */ + if (sock->timer_initialized) { + sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); + } +} + +void +isc__nm_tcpdns_close(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnssocket); + + if (sock->tid == isc_nm_tid()) { + tcpdns_close_direct(sock); + } else { + isc__netievent_tcpdnsclose_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpdnsclose); + + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } +} + +void +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpdnsclose_t *ievent = + (isc__netievent_tcpdnsclose_t *) ev0; + + REQUIRE(worker->id == ievent->sock->tid); + + tcpdns_close_direct(ievent->sock); } diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 0aae5577e0..abd950a279 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -25,6 +25,8 @@ #include #include #include + +#include "uv-compat.h" #include "netmgr-int.h" static isc_result_t @@ -113,9 +115,9 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, * handle 'udplisten' async call - start listening on a socket. */ void -isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udplisten_t *ievent = - (isc__netievent_udplisten_t *) ievent0; + (isc__netievent_udplisten_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(sock->type == isc_nm_udpsocket); @@ -123,7 +125,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { REQUIRE(sock->parent != NULL); uv_udp_init(&worker->loop, &sock->uv_handle.udp); - sock->uv_handle.udp.data = NULL; + uv_handle_set_data(&sock->uv_handle.handle, NULL); isc_nmsocket_attach(sock, (isc_nmsocket_t **)&sock->uv_handle.udp.data); @@ -140,7 +142,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { static void udp_close_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; + isc_nmsocket_t *sock = uv_handle_get_data(handle); atomic_store(&sock->closed, true); isc_nmsocket_detach((isc_nmsocket_t **)&sock->uv_handle.udp.data); @@ -171,14 +173,14 @@ stoplistening(isc_nmsocket_t *sock) { INSIST(sock->type == isc_nm_udplistener); for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_udplisten_t *event = NULL; + isc__netievent_udpstop_t *event = NULL; if (i == sock->tid) { stop_udp_child(&sock->children[i]); continue; } - event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event = isc__nm_get_ievent(sock->mgr, netievent_udpstop); event->sock = &sock->children[i]; isc__nm_enqueue_ievent(&sock->mgr->workers[i], (isc__netievent_t *) event); @@ -196,7 +198,7 @@ stoplistening(isc_nmsocket_t *sock) { void isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_udpstoplisten_t *ievent = NULL; + isc__netievent_udpstop_t *ievent = NULL; /* We can't be launched from network thread, we'd deadlock */ REQUIRE(!isc__nm_in_netthread()); @@ -208,7 +210,7 @@ isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { * event. Otherwise, go ahead and stop listening right away. */ if (!isc__nm_acquire_interlocked(sock->mgr)) { - ievent = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpstop); ievent->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); @@ -219,14 +221,11 @@ isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { } /* - * handle 'udpstoplisten' async call - stop listening on a socket. + * handle 'udpstop' async call - stop listening on a socket. */ void -isc__nm_async_udpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_udplisten_t *ievent = - (isc__netievent_udplisten_t *) ievent0; +isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(sock->iface != NULL); @@ -246,7 +245,7 @@ isc__nm_async_udpstoplisten(isc__networker_t *worker, if (!isc__nm_acquire_interlocked(sock->mgr)) { isc__netievent_udplisten_t *event = NULL; - event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event = isc__nm_get_ievent(sock->mgr, netievent_udpstop); event->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); @@ -271,7 +270,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc_sockaddr_t sockaddr; isc_sockaddr_t localaddr; struct sockaddr_storage laddr; - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_region_t region; uint32_t maxudp; @@ -337,7 +336,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nmsocket_t *psock = NULL, *rsock = NULL; isc_nmsocket_t *sock = handle->sock; isc_sockaddr_t *peer = &handle->peer; - isc__netievent_udpsend_t *ievent; + isc__netievent_udpsend_t *ievent = NULL; isc__nm_uvreq_t *uvreq = NULL; int ntid; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); @@ -405,9 +404,9 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, * handle 'udpsend' async event - send a packet on the socket */ void -isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpsend_t *ievent = - (isc__netievent_udpsend_t *) ievent0; + (isc__netievent_udpsend_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); diff --git a/lib/isc/netmgr/uv-compat.h b/lib/isc/netmgr/uv-compat.h new file mode 100644 index 0000000000..6ec2f3237b --- /dev/null +++ b/lib/isc/netmgr/uv-compat.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#pragma once +#include + +/* + * Those functions were introduced in newer libuv, we still + * want BIND9 compile on older ones so we emulate them. + * They're inline to avoid conflicts when running with a newer + * library version. + */ + +#ifndef HAVE_UV_HANDLE_GET_DATA +static inline void* +uv_handle_get_data(const uv_handle_t* handle) { + return (handle->data); +} +#endif + +#ifndef HAVE_UV_HANDLE_SET_DATA +static inline void +uv_handle_set_data(uv_handle_t* handle, void* data) { + handle->data = data; +}; +#endif diff --git a/lib/ns/interfacemgr.c b/lib/ns/interfacemgr.c index 2dd03a0eee..3bbeee2a50 100644 --- a/lib/ns/interfacemgr.c +++ b/lib/ns/interfacemgr.c @@ -83,6 +83,7 @@ struct ns_interfacemgr { ISC_LIST(isc_sockaddr_t) listenon; int backlog; /*%< Listen queue size */ unsigned int udpdisp; /*%< UDP dispatch count */ + atomic_bool shuttingdown; /*%< Interfacemgr is shutting down */ #ifdef USE_ROUTE_SOCKET isc_task_t * task; isc_socket_t * route; @@ -217,6 +218,7 @@ ns_interfacemgr_create(isc_mem_t *mctx, mgr->listenon4 = NULL; mgr->listenon6 = NULL; mgr->udpdisp = udpdisp; + atomic_init(&mgr->shuttingdown, false); ISC_LIST_INIT(mgr->interfaces); ISC_LIST_INIT(mgr->listenon); @@ -360,6 +362,7 @@ ns_interfacemgr_shutdown(ns_interfacemgr_t *mgr) { * consider all interfaces "old". */ mgr->generation++; + atomic_store(&mgr->shuttingdown, true); #ifdef USE_ROUTE_SOCKET LOCK(&mgr->lock); if (mgr->route != NULL) { @@ -1230,7 +1233,13 @@ ns_interfacemgr_listeningon(ns_interfacemgr_t *mgr, bool result = false; REQUIRE(NS_INTERFACEMGR_VALID(mgr)); - + /* + * If the manager is shutting down it's safer to + * return true. + */ + if (atomic_load(&mgr->shuttingdown)) { + return (true); + } LOCK(&mgr->lock); for (old = ISC_LIST_HEAD(mgr->listenon); old != NULL; diff --git a/lib/ns/tests/nstest.c b/lib/ns/tests/nstest.c index 927bcab5d3..6a6bd45e72 100644 --- a/lib/ns/tests/nstest.c +++ b/lib/ns/tests/nstest.c @@ -75,8 +75,8 @@ static dns_zone_t *served_zone = NULL; /* * We don't want to use netmgr-based client accounting, we need to emulate it. */ -atomic_uint_fast32_t client_refs[16]; -atomic_uintptr_t client_addrs[16]; +atomic_uint_fast32_t client_refs[32]; +atomic_uintptr_t client_addrs[32]; void __wrap_isc_nmhandle_unref(isc_nmhandle_t *handle); @@ -86,12 +86,12 @@ __wrap_isc_nmhandle_unref(isc_nmhandle_t *handle) { ns_client_t *client = (ns_client_t *)handle; int i; - for (i = 0; i < 16; i++) { + for (i = 0; i < 32; i++) { if (atomic_load(&client_addrs[i]) == (uintptr_t) client) { break; } } - REQUIRE(i < 16); + REQUIRE(i < 32); if (atomic_fetch_sub(&client_refs[i], 1) == 1) { dns_view_detach(&client->view); @@ -561,14 +561,14 @@ ns_test_getclient(ns_interface_t *ifp0, bool tcp, result = ns__client_setup(client, clientmgr, true); - for (i = 0; i < 16; i++) { + for (i = 0; i < 32; i++) { if (atomic_load(&client_addrs[i]) == (uintptr_t) NULL || atomic_load(&client_addrs[i]) == (uintptr_t) client) { break; } } - REQUIRE(i < 16); + REQUIRE(i < 32); atomic_store(&client_refs[i], 2); atomic_store(&client_addrs[i], (uintptr_t) client); diff --git a/util/copyrights b/util/copyrights index d4f8105162..a14a28c124 100644 --- a/util/copyrights +++ b/util/copyrights @@ -2258,6 +2258,7 @@ ./lib/isc/netmgr/tcp.c C 2019 ./lib/isc/netmgr/tcpdns.c C 2019 ./lib/isc/netmgr/udp.c C 2019 +./lib/isc/netmgr/uv-compat.h C 2019 ./lib/isc/netmgr/uverr2result.c C 2019 ./lib/isc/netscope.c C 2002,2004,2005,2006,2007,2016,2018,2019 ./lib/isc/nonce.c C 2018,2019