From bc5aae1579c773833ad2c3d17c0e42610de2822b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 28 Nov 2019 10:21:34 +0100 Subject: [PATCH 01/17] netmgr: make tcp listening multithreaded. When listening for TCP connections we create a socket, bind it and then pass it over IPC to all threads - which then listen on in and accept connections. This sounds broken, but it's the official way of dealing with multithreaded TCP listeners in libuv, and works on all platforms supported by libuv. --- lib/isc/netmgr/netmgr-int.h | 16 ++ lib/isc/netmgr/netmgr.c | 22 +++ lib/isc/netmgr/tcp.c | 312 +++++++++++++++++++++++++++++++++--- lib/isc/netmgr/udp.c | 2 +- 4 files changed, 325 insertions(+), 27 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 3acacc756b..129f4001c4 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -114,7 +114,9 @@ typedef enum isc__netievent_type { netievent_tcpstartread, netievent_tcppauseread, netievent_tcplisten, + netievent_tcpchildlisten, netievent_tcpstoplisten, + netievent_tcpstopchildlisten, netievent_tcpclose, netievent_closecb, netievent_shutdown, @@ -159,6 +161,7 @@ typedef struct isc__nm_uvreq { isc_sockaddr_t peer; /* peer address */ isc__nm_cb_t cb; /* callback */ void * cbarg; /* callback argument */ + uv_pipe_t pipe; union { uv_req_t req; uv_getaddrinfo_t getaddrinfo; @@ -180,6 +183,7 @@ typedef struct isc__netievent__socket { typedef isc__netievent__socket_t isc__netievent_udplisten_t; typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; +typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; @@ -193,6 +197,7 @@ typedef struct isc__netievent__socket_req { typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; +typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; typedef struct isc__netievent_udpsend { @@ -274,6 +279,7 @@ typedef enum isc_nmsocket_type { isc_nm_udplistener, /* Aggregate of nm_udpsocks */ isc_nm_tcpsocket, isc_nm_tcplistener, + isc_nm_tcpchildlistener, isc_nm_tcpdnslistener, isc_nm_tcpdnssocket } isc_nmsocket_type; @@ -322,6 +328,11 @@ struct isc_nmsocket { isc_nmiface_t *iface; isc_nmhandle_t *tcphandle; + /* used to send listening TCP sockets to children */ + uv_pipe_t ipc; + char ipc_pipe_name[32]; + atomic_int_fast32_t schildren; + /*% extra data allocated at the end of each isc_nmhandle_t */ size_t extrahandlesize; @@ -579,9 +590,14 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0); +void isc__nm_async_tcpstoplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void +isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0); +void isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 902fe2d1cc..479d327469 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -42,6 +42,12 @@ ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; +#ifdef WIN32 +#define NAMED_PIPE_PREFIX "\\\\.\\pipe\\named-ipc" +#else +#define NAMED_PIPE_PREFIX ".named-ipc" +#endif + static void nmsocket_maybe_destroy(isc_nmsocket_t *sock); static void @@ -497,6 +503,9 @@ async_cb(uv_async_t *handle) { case netievent_tcplisten: isc__nm_async_tcplisten(worker, ievent); break; + case netievent_tcpchildlisten: + isc__nm_async_tcpchildlisten(worker, ievent); + break; case netievent_tcpstartread: isc__nm_async_startread(worker, ievent); break; @@ -509,6 +518,9 @@ async_cb(uv_async_t *handle) { case netievent_tcpstoplisten: isc__nm_async_tcpstoplisten(worker, ievent); break; + case netievent_tcpstopchildlisten: + isc__nm_async_tcpstopchildlisten(worker, ievent); + break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; @@ -790,6 +802,16 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, sock->ah_handles[i] = NULL; } + /* + * XXXWPK Maybe it should be in tmp, maybe it should not + * be random? + */ + strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX); + for (int i=strlen(sock->ipc_pipe_name); i<31; i++) { + sock->ipc_pipe_name[i] = isc_random8()%24 + 'a'; + } + sock->ipc_pipe_name[31] = '\0'; + isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); isc_refcount_init(&sock->references, 1); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 75dcd3a7e2..c30db8fe99 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -50,6 +50,21 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); static void tcp_close_cb(uv_handle_t *uvhandle); +static void +ipc_connection_cb(uv_stream_t *stream, int status); +static void +ipc_write_cb(uv_write_t* uvreq, int status); +static void +parent_pipe_close_cb(uv_handle_t *handle); +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status); +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); +static void +stoplistening(isc_nmsocket_t *sock); +static void +tcp_listenclose_cb(uv_handle_t *handle); + static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker; @@ -71,7 +86,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (r); } } - + sock->uv_handle.tcp.data = sock; r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, &req->peer.type.sa, tcp_connect_cb); return (r); @@ -134,7 +149,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_quota_t *quota, isc_nmsocket_t **sockp) { - isc__netievent_tcplisten_t *ievent = NULL; isc_nmsocket_t *nsock = NULL; REQUIRE(VALID_NM(mgr)); @@ -142,6 +156,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener); nsock->iface = iface; + nsock->nchildren = mgr->nworkers; + atomic_init(&nsock->rchildren, mgr->nworkers); + nsock->children = isc_mem_get(mgr->mctx, + mgr->nworkers * sizeof(*nsock)); + memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); nsock->rcb.accept = cb; nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; @@ -156,18 +175,27 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->tid = isc_random_uniform(mgr->nworkers); /* - * Listening to TCP is rare enough not to care about the - * added overhead from passing this to another thread. - */ - ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); + * Listening to TCP is rare enough not to care about the + * added overhead from passing this to another thread. + */ + isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], (isc__netievent_t *) ievent); + + *sockp = nsock; return (ISC_R_SUCCESS); } +/* + * For TCP listening we create a single socket, bind it, and then pass it + * to `ncpu` child sockets - the passing is done over IPC. + * XXXWPK This design pattern is ugly but it's "the way to do it" recommended + * by libuv documentation - which also mentions that there should be + * uv_export/uv_import functions which would simplify this greatly. + */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { isc__netievent_tcplisten_t *ievent = @@ -184,10 +212,56 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { } uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); - r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, - tcp_connection_cb); - if (r != 0) { - return; + sock->uv_handle.tcp.data = sock; + /* + * This is not properly documented in libuv, and the example + * (benchmark-multi-accept) is wrong: + * 'ipc' parameter must be '0' for 'listening' IPC socket, '1' + * only for the sockets are really passing the FDs between + * threads. This works without any problems on Unices, but + * breaks horribly on Windows. + */ + r = uv_pipe_init(&worker->loop, &sock->ipc, 0); + INSIST(r == 0); + sock->ipc.data = sock; + r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); + INSIST(r == 0); + r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, + ipc_connection_cb); + INSIST(r == 0); + + /* + * We launch n 'tcpchildlistener' that will receive + * sockets to be listened on over ipc. + */ + for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_tcpchildlisten_t *event = NULL; + isc_nmsocket_t *csock = &sock->children[i]; + + isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); + csock->parent = sock; + csock->iface = sock->iface; + csock->tid = i; + csock->pquota = sock->pquota; + csock->backlog = sock->backlog; + csock->extrahandlesize = sock->extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.accept = sock->rcb.accept; + csock->rcbarg = sock->rcbarg; + csock->fd = -1; + + event = isc__nm_get_ievent(csock->mgr, + netievent_tcpchildlisten); + event->sock = csock; + if (csock->tid == isc_nm_tid()) { + isc__nm_async_tcpchildlisten(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } } atomic_store(&sock->listening, true); @@ -195,6 +269,117 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { return; } +/* Parent got an IPC connection from child */ +static void +ipc_connection_cb(uv_stream_t *stream, int status) { + int r; + REQUIRE(status == 0); + isc_nmsocket_t *sock = stream->data; + isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; + isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); + /* + * The buffer can be anything, it will be ignored, but it has to + * be something that won't disappear. + */ + nreq->uvbuf = uv_buf_init((char *)nreq, 1); + uv_pipe_init(&worker->loop, &nreq->pipe, 1); + nreq->pipe.data = nreq; + + /* Failure here is critical */ + r = uv_accept((uv_stream_t *) &sock->ipc, + (uv_stream_t*) &nreq->pipe); + INSIST(r == 0); + r = uv_write2(&nreq->uv_req.write, + (uv_stream_t*) &nreq->pipe, + &nreq->uvbuf, + 1, + (uv_stream_t*) &sock->uv_handle.stream, + ipc_write_cb); + INSIST(r == 0); +} + +static void +ipc_write_cb(uv_write_t* uvreq, int status) { + UNUSED(status); + isc__nm_uvreq_t *req = uvreq->data; + /* + * We want all children to get the socket. If we're done we can stop + * listening on the IPC socket. + */ + if (atomic_fetch_add(&req->sock->schildren, 1) == + req->sock->nchildren - 1) { + uv_close((uv_handle_t*) &req->sock->ipc, NULL); + } + uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb); +} + +static void +parent_pipe_close_cb(uv_handle_t *handle) { + isc__nm_uvreq_t *req = handle->data; + isc__nm_uvreq_put(&req, req->sock); +} + +void +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcplisten_t *ievent = + (isc__netievent_tcplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + int r; + + REQUIRE(isc__nm_in_netthread()); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + + r = uv_pipe_init(&worker->loop, &sock->ipc, 1); + INSIST(r == 0); + sock->ipc.data = sock; + isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock); + + uv_pipe_connect(&req->uv_req.connect, + &sock->ipc, + sock->parent->ipc_pipe_name, + childlisten_ipc_connect_cb); +} + +/* child connected to parent over IPC */ +static void +childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { + UNUSED(status); + isc__nm_uvreq_t *req = uvreq->data; + isc_nmsocket_t *sock = req->sock; + isc__nm_uvreq_put(&req, sock); + int r = uv_read_start((uv_stream_t*) &sock->ipc, + isc__nm_alloc_cb, + childlisten_read_cb); + INSIST(r == 0); +} + +/* child got the socket over IPC */ +static void +childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { + UNUSED(nread); + int r; + isc_nmsocket_t *sock = stream->data; + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(buf != NULL); + uv_pipe_t* ipc = (uv_pipe_t*) stream; + uv_handle_type type = uv_pipe_pending_type(ipc); + INSIST(type == UV_TCP); + isc__nm_free_uvbuf(sock, buf); + isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp); + sock->uv_handle.tcp.data = sock; + uv_accept(stream, &sock->uv_handle.stream); + r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, + tcp_connection_cb); + uv_close((uv_handle_t*) ipc, NULL); + if (r != 0) { + /* XXX log it? */ + return; + } +} + + void isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { isc__netievent_tcpstoplisten_t *ievent = NULL; @@ -208,21 +393,6 @@ isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { (isc__netievent_t *) ievent); } -static void -stoplistening_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; - - LOCK(&sock->lock); - atomic_store(&sock->listening, false); - atomic_store(&sock->closed, true); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - - sock->pquota = NULL; - - isc_nmsocket_detach(&sock); -} - void isc__nm_async_tcpstoplisten(isc__networker_t *worker, isc__netievent_t *ievent0) @@ -237,7 +407,97 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcplistener); - uv_close(&sock->uv_handle.handle, stoplistening_cb); + /* + * If network manager is interlocked, re-enqueue the event for later. + */ + if (!isc__nm_acquire_interlocked(sock->mgr)) { + isc__netievent_tcpstoplisten_t *event = NULL; + + event = isc__nm_get_ievent(sock->mgr, + netievent_tcpstoplisten); + event->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); + } else { + stoplistening(sock); + isc__nm_drop_interlocked(sock->mgr); + } +} + +static void +stoplistening(isc_nmsocket_t *sock) { + for (int i = 0; i < sock->nchildren; i++) { + /* + * Stoplistening is a rare event, we can ignore the overhead + * caused by allocating an event, and doing it this way + * simplifies sock reference counting. + */ + isc__netievent_tcpstopchildlisten_t *event = NULL; + event = isc__nm_get_ievent(sock->mgr, + netievent_tcpstopchildlisten); + isc_nmsocket_attach(&sock->children[i], &event->sock); + + if (i == sock->tid) { + isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i], + (isc__netievent_t *) event); + isc__nm_put_ievent(sock->mgr, event); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[i], + (isc__netievent_t *) event); + } + } + + LOCK(&sock->lock); + while (atomic_load_relaxed(&sock->rchildren) > 0) { + WAIT(&sock->cond, &sock->lock); + } + UNLOCK(&sock->lock); + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); +} + +void +isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ + isc__netievent_tcpstoplisten_t *ievent = + (isc__netievent_tcpstoplisten_t *) ievent0; + isc_nmsocket_t *sock = ievent->sock; + + UNUSED(worker); + + REQUIRE(isc_nm_tid() == sock->tid); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpchildlistener); + REQUIRE(sock->parent != NULL); + + /* + * rchildren is atomic but we still need to change it + * under a lock as the parent is waiting on conditional + * and without it we might deadlock. + */ + LOCK(&sock->parent->lock); + atomic_fetch_sub(&sock->parent->rchildren, 1); + UNLOCK(&sock->parent->lock); + + uv_close((uv_handle_t *) &sock->uv_handle.tcp, tcp_listenclose_cb); + BROADCAST(&sock->parent->cond); +} + +/* + * This callback is used for closing child and parent listening sockets - + * that's why we need to choose the proper lock. + */ +static void +tcp_listenclose_cb(uv_handle_t *handle) { + isc_nmsocket_t *sock = handle->data; + isc_mutex_t * lock = (sock->parent != NULL) ? + &sock->parent->lock : &sock->lock; + LOCK(lock); + atomic_store(&sock->closed, true); + atomic_store(&sock->listening, false); + sock->pquota = NULL; + UNLOCK(lock); + isc_nmsocket_detach(&sock); } static void diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 0aae5577e0..8abdd0c46f 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -171,7 +171,7 @@ stoplistening(isc_nmsocket_t *sock) { INSIST(sock->type == isc_nm_udplistener); for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_udplisten_t *event = NULL; + isc__netievent_udpstoplisten_t *event = NULL; if (i == sock->tid) { stop_udp_child(&sock->children[i]); From 5a65ec0affbebbdc63726de97960eaa898c7852a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 2 Dec 2019 11:19:55 +0100 Subject: [PATCH 02/17] Add uv_handle_{get,set}_data functions that's absent in pre-1.19 libuv to make code clearer. This might be removed when we stop supporting older libuv versions. --- config.h.in | 6 ++++++ configure | 15 ++++++++++++++ configure.ac | 4 ++++ lib/isc/netmgr/netmgr.c | 5 +++-- lib/isc/netmgr/tcp.c | 41 +++++++++++++++++++++----------------- lib/isc/netmgr/tcpdns.c | 6 ++++-- lib/isc/netmgr/udp.c | 8 +++++--- lib/isc/netmgr/uv-compat.h | 34 +++++++++++++++++++++++++++++++ util/copyrights | 1 + 9 files changed, 95 insertions(+), 25 deletions(-) create mode 100644 lib/isc/netmgr/uv-compat.h diff --git a/config.h.in b/config.h.in index 88f46849b8..1b0a2a6a62 100644 --- a/config.h.in +++ b/config.h.in @@ -465,6 +465,12 @@ /* Define to 1 if you have the `usleep' function. */ #undef HAVE_USLEEP +/* Define to 1 if you have the `uv_handle_get_data' function. */ +#undef HAVE_UV_HANDLE_GET_DATA + +/* Define to 1 if you have the `uv_handle_set_data' function. */ +#undef HAVE_UV_HANDLE_SET_DATA + /* Use zlib library */ #undef HAVE_ZLIB diff --git a/configure b/configure index 7d1629af0e..a3eb8aa1d2 100755 --- a/configure +++ b/configure @@ -15943,6 +15943,21 @@ fi CFLAGS="$CFLAGS $LIBUV_CFLAGS" LIBS="$LIBS $LIBUV_LIBS" +# Those functions are only provided in newer versions of libuv, we'll be emulating them +# for now +for ac_func in uv_handle_get_data uv_handle_set_data +do : + as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` +ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" +if eval test \"x\$"$as_ac_var"\" = x"yes"; then : + cat >>confdefs.h <<_ACEOF +#define `$as_echo "HAVE_$ac_func" | $as_tr_cpp` 1 +_ACEOF + +fi +done + + # # flockfile is usually provided by pthreads # diff --git a/configure.ac b/configure.ac index 25b0ce8959..19b4816540 100644 --- a/configure.ac +++ b/configure.ac @@ -663,6 +663,10 @@ AX_SAVE_FLAGS([libuv]) CFLAGS="$CFLAGS $LIBUV_CFLAGS" LIBS="$LIBS $LIBUV_LIBS" +# Those functions are only provided in newer versions of libuv, we'll be emulating them +# for now +AC_CHECK_FUNCS([uv_handle_get_data uv_handle_set_data]) + # # flockfile is usually provided by pthreads # diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 479d327469..9ef847bf2b 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -29,6 +29,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" /* @@ -827,7 +828,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, void isc__nm_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data(handle); isc__networker_t *worker = NULL; REQUIRE(VALID_NMSOCK(sock)); @@ -1208,7 +1209,7 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) { switch(handle->type) { case UV_TCP: - isc__nm_tcp_shutdown((isc_nmsocket_t *) handle->data); + isc__nm_tcp_shutdown(uv_handle_get_data(handle)); break; default: break; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index c30db8fe99..9a4746cc40 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -28,6 +28,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" static int @@ -86,7 +87,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { return (r); } } - sock->uv_handle.tcp.data = sock; + uv_handle_set_data(&sock->uv_handle.handle, sock); r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, &req->peer.type.sa, tcp_connect_cb); return (r); @@ -113,7 +114,8 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data; - isc_nmsocket_t *sock = uvreq->handle->data; + isc_nmsocket_t *sock; + sock = uv_handle_get_data((uv_handle_t *) uvreq->handle); REQUIRE(VALID_UVREQ(req)); @@ -211,8 +213,11 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { return; } - uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); - sock->uv_handle.tcp.data = sock; + r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); + if (r != 0) { + return; + } + uv_handle_set_data(&sock->uv_handle.handle, sock); /* * This is not properly documented in libuv, and the example * (benchmark-multi-accept) is wrong: @@ -223,7 +228,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ r = uv_pipe_init(&worker->loop, &sock->ipc, 0); INSIST(r == 0); - sock->ipc.data = sock; + uv_handle_set_data((uv_handle_t *)&sock->ipc, sock); r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); INSIST(r == 0); r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, @@ -274,7 +279,7 @@ static void ipc_connection_cb(uv_stream_t *stream, int status) { int r; REQUIRE(status == 0); - isc_nmsocket_t *sock = stream->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); /* @@ -283,7 +288,7 @@ ipc_connection_cb(uv_stream_t *stream, int status) { */ nreq->uvbuf = uv_buf_init((char *)nreq, 1); uv_pipe_init(&worker->loop, &nreq->pipe, 1); - nreq->pipe.data = nreq; + uv_handle_set_data((uv_handle_t *)&nreq->pipe, nreq); /* Failure here is critical */ r = uv_accept((uv_stream_t *) &sock->ipc, @@ -315,7 +320,7 @@ ipc_write_cb(uv_write_t* uvreq, int status) { static void parent_pipe_close_cb(uv_handle_t *handle) { - isc__nm_uvreq_t *req = handle->data; + isc__nm_uvreq_t *req = uv_handle_get_data(handle); isc__nm_uvreq_put(&req, req->sock); } @@ -331,7 +336,7 @@ isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0 r = uv_pipe_init(&worker->loop, &sock->ipc, 1); INSIST(r == 0); - sock->ipc.data = sock; + uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock); uv_pipe_connect(&req->uv_req.connect, @@ -358,7 +363,7 @@ static void childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { UNUSED(nread); int r; - isc_nmsocket_t *sock = stream->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); @@ -368,7 +373,7 @@ childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc__nm_free_uvbuf(sock, buf); isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()]; uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp); - sock->uv_handle.tcp.data = sock; + uv_handle_set_data(&sock->uv_handle.handle, sock); uv_accept(stream, &sock->uv_handle.stream); r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, tcp_connection_cb); @@ -489,7 +494,7 @@ isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, */ static void tcp_listenclose_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; + isc_nmsocket_t *sock = uv_handle_get_data(handle); isc_mutex_t * lock = (sock->parent != NULL) ? &sock->parent->lock : &sock->lock; LOCK(lock); @@ -502,7 +507,7 @@ tcp_listenclose_cb(uv_handle_t *handle) { static void readtimeout_cb(uv_timer_t *handle) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -563,7 +568,7 @@ isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) { if (sock->read_timeout != 0) { if (!sock->timer_initialized) { uv_timer_init(&worker->loop, &sock->timer); - sock->timer.data = sock; + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); sock->timer_initialized = true; } uv_timer_start(&sock->timer, readtimeout_cb, @@ -644,7 +649,7 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - isc_nmsocket_t *sock = stream->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t*) stream); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); @@ -759,7 +764,7 @@ accept_connection(isc_nmsocket_t *ssock) { static void tcp_connection_cb(uv_stream_t *server, int status) { - isc_nmsocket_t *ssock = server->data; + isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t*) server); isc_result_t result; UNUSED(status); @@ -875,7 +880,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { static void tcp_close_cb(uv_handle_t *uvhandle) { - isc_nmsocket_t *sock = uvhandle->data; + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); REQUIRE(VALID_NMSOCK(sock)); @@ -885,7 +890,7 @@ tcp_close_cb(uv_handle_t *uvhandle) { static void timer_close_cb(uv_handle_t *uvhandle) { - isc_nmsocket_t *sock = uvhandle->data; + isc_nmsocket_t *sock = uv_handle_get_data(uvhandle); REQUIRE(VALID_NMSOCK(sock)); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index d75fdda943..def3993800 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -26,6 +26,7 @@ #include #include +#include "uv-compat.h" #include "netmgr-int.h" #define TCPDNS_CLIENTS_PER_CONN 23 @@ -76,7 +77,7 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { static void timer_close_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = (isc_nmsocket_t *) uv_handle_get_data(handle); INSIST(VALID_NMSOCK(sock)); sock->timer_initialized = false; atomic_store(&sock->closed, true); @@ -85,7 +86,8 @@ timer_close_cb(uv_handle_t *handle) { static void dnstcp_readtimeout(uv_timer_t *timer) { - isc_nmsocket_t *sock = (isc_nmsocket_t *) timer->data; + isc_nmsocket_t *sock; + sock = (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *)timer); REQUIRE(VALID_NMSOCK(sock)); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 8abdd0c46f..97df0ef9ec 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -25,6 +25,8 @@ #include #include #include + +#include "uv-compat.h" #include "netmgr-int.h" static isc_result_t @@ -123,7 +125,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { REQUIRE(sock->parent != NULL); uv_udp_init(&worker->loop, &sock->uv_handle.udp); - sock->uv_handle.udp.data = NULL; + uv_handle_set_data(&sock->uv_handle.handle, NULL); isc_nmsocket_attach(sock, (isc_nmsocket_t **)&sock->uv_handle.udp.data); @@ -140,7 +142,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { static void udp_close_cb(uv_handle_t *handle) { - isc_nmsocket_t *sock = handle->data; + isc_nmsocket_t *sock = uv_handle_get_data(handle); atomic_store(&sock->closed, true); isc_nmsocket_detach((isc_nmsocket_t **)&sock->uv_handle.udp.data); @@ -271,7 +273,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc_sockaddr_t sockaddr; isc_sockaddr_t localaddr; struct sockaddr_storage laddr; - isc_nmsocket_t *sock = (isc_nmsocket_t *) handle->data; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_region_t region; uint32_t maxudp; diff --git a/lib/isc/netmgr/uv-compat.h b/lib/isc/netmgr/uv-compat.h new file mode 100644 index 0000000000..6ec2f3237b --- /dev/null +++ b/lib/isc/netmgr/uv-compat.h @@ -0,0 +1,34 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#pragma once +#include + +/* + * Those functions were introduced in newer libuv, we still + * want BIND9 compile on older ones so we emulate them. + * They're inline to avoid conflicts when running with a newer + * library version. + */ + +#ifndef HAVE_UV_HANDLE_GET_DATA +static inline void* +uv_handle_get_data(const uv_handle_t* handle) { + return (handle->data); +} +#endif + +#ifndef HAVE_UV_HANDLE_SET_DATA +static inline void +uv_handle_set_data(uv_handle_t* handle, void* data) { + handle->data = data; +}; +#endif diff --git a/util/copyrights b/util/copyrights index d4f8105162..a14a28c124 100644 --- a/util/copyrights +++ b/util/copyrights @@ -2258,6 +2258,7 @@ ./lib/isc/netmgr/tcp.c C 2019 ./lib/isc/netmgr/tcpdns.c C 2019 ./lib/isc/netmgr/udp.c C 2019 +./lib/isc/netmgr/uv-compat.h C 2019 ./lib/isc/netmgr/uverr2result.c C 2019 ./lib/isc/netscope.c C 2002,2004,2005,2006,2007,2016,2018,2019 ./lib/isc/nonce.c C 2018,2019 From 8c5aaacbefa769454ddc2c2e7ad934415fcd933b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 2 Dec 2019 13:54:44 +0100 Subject: [PATCH 03/17] - Add separate priority event queue for events that must be processed even when worker is paused (e.g. interface reconfiguration). This is needed to prevent deadlocks when reconfiguring interfaces - as network manager is paused then, but we still need to stop/start listening. - Proper handling of TCP listen errors in netmgr - bind to the socket first, then return the error code. --- lib/isc/netmgr/netmgr-int.h | 26 ++++++++---- lib/isc/netmgr/netmgr.c | 47 +++++++++++++++++---- lib/isc/netmgr/tcp.c | 82 ++++++++++++++++++++++++------------- lib/isc/netmgr/tcpdns.c | 14 ++++--- 4 files changed, 120 insertions(+), 49 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 129f4001c4..4a8f10aff7 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -46,7 +46,11 @@ typedef struct isc__networker { bool paused; bool finished; isc_thread_t thread; - isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents_prio; /* priority async events + * used for listening etc. + * can be processed while + * worker is paused */ isc_refcount_t references; atomic_int_fast64_t pktcount; char recvbuf[65536]; @@ -103,9 +107,6 @@ struct isc_nmiface { }; typedef enum isc__netievent_type { - netievent_stop, - netievent_udplisten, - netievent_udpstoplisten, netievent_udpsend, netievent_udprecv, netievent_tcpconnect, @@ -113,13 +114,17 @@ typedef enum isc__netievent_type { netievent_tcprecv, netievent_tcpstartread, netievent_tcppauseread, - netievent_tcplisten, netievent_tcpchildlisten, - netievent_tcpstoplisten, netievent_tcpstopchildlisten, - netievent_tcpclose, netievent_closecb, netievent_shutdown, + netievent_stop, + netievent_udpstoplisten, + netievent_tcpstoplisten, + netievent_tcpclose, + netievent_prio = 0xff, + netievent_udplisten, + netievent_tcplisten, } isc__netievent_type; /* @@ -402,10 +407,15 @@ struct isc_nmsocket { isc_astack_t *inactivehandles; isc_astack_t *inactivereqs; - /* Used for active/rchildren during shutdown */ + /* + * Used to wait for listening event to be done and active/rchildren + * during shutdown. + */ isc_mutex_t lock; isc_condition_t cond; + isc_result_t result; + /*% * List of active handles. * ah - current position in 'ah_frees'; this represents the diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 9ef847bf2b..f91651c009 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -57,6 +57,8 @@ static void * nm_thread(void *worker0); static void async_cb(uv_async_t *handle); +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue); int isc_nm_tid() { @@ -135,6 +137,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); + worker->ievents_prio = isc_queue_new(mgr->mctx, 128); /* * We need to do this here and not in nm_thread to avoid a @@ -182,17 +185,24 @@ nm_destroy(isc_nm_t **mgr0) { UNLOCK(&mgr->lock); for (size_t i = 0; i < mgr->nworkers; i++) { + isc__networker_t *worker = &mgr->workers[i]; /* Empty the async event queue */ isc__netievent_t *ievent; while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(mgr->workers[i].ievents)) != NULL) + isc_queue_dequeue(worker->ievents)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - int r = uv_loop_close(&mgr->workers[i].loop); + while ((ievent = (isc__netievent_t *) + isc_queue_dequeue(worker->ievents_prio)) != NULL) + { + isc_mempool_put(mgr->evpool, ievent); + } + int r = uv_loop_close(&worker->loop); INSIST(r == 0); - isc_queue_destroy(mgr->workers[i].ievents); - isc_thread_join(mgr->workers[i].thread, NULL); + isc_queue_destroy(worker->ievents); + isc_queue_destroy(worker->ievents_prio); + isc_thread_join(worker->thread, NULL); } isc_condition_destroy(&mgr->wkstatecond); @@ -410,6 +420,9 @@ nm_thread(void *worker0) { UNLOCK(&worker->mgr->lock); WAIT(&worker->cond, &worker->lock); + + /* Process priority events */ + process_queue(worker, worker->ievents_prio); } if (pausing) { uint32_t wp = atomic_fetch_sub_explicit( @@ -459,7 +472,8 @@ nm_thread(void *worker0) { /* * Empty the async queue. */ - async_cb(&worker->async); + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); } LOCK(&worker->mgr->lock); @@ -479,14 +493,20 @@ nm_thread(void *worker0) { static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *) handle->loop->data; + process_queue(worker, worker->ievents_prio); + process_queue(worker, worker->ievents); +} + +static void +process_queue(isc__networker_t *worker, isc_queue_t *queue) { isc__netievent_t *ievent; while ((ievent = (isc__netievent_t *) - isc_queue_dequeue(worker->ievents)) != NULL) + isc_queue_dequeue(queue)) != NULL) { switch (ievent->type) { case netievent_stop: - uv_stop(handle->loop); + uv_stop(&worker->loop); isc_mempool_put(worker->mgr->evpool, ievent); return; case netievent_udplisten: @@ -557,7 +577,18 @@ isc__nm_put_ievent(isc_nm_t *mgr, void *ievent) { void isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { - isc_queue_enqueue(worker->ievents, (uintptr_t)event); + if (event->type > netievent_prio) { + /* + * We need to make sure this signal will be delivered and + * the queue will be processed. + */ + LOCK(&worker->lock); + isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); + SIGNAL(&worker->cond); + UNLOCK(&worker->lock); + } else { + isc_queue_enqueue(worker->ievents, (uintptr_t)event); + } uv_async_send(&worker->async); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 9a4746cc40..63573d8921 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -167,6 +167,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; nsock->backlog = backlog; + nsock->result = ISC_R_SUCCESS; if (quota != NULL) { /* * We don't attach to quota, just assign - to avoid @@ -174,21 +175,32 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, */ nsock->pquota = quota; } - nsock->tid = isc_random_uniform(mgr->nworkers); - /* - * Listening to TCP is rare enough not to care about the - * added overhead from passing this to another thread. - */ - isc__netievent_tcplisten_t *ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); + isc__netievent_tcplisten_t *ievent; + ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; - isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], - (isc__netievent_t *) ievent); + if (isc__nm_in_netthread()) { + nsock->tid = isc_nm_tid(); + isc__nm_async_tcplisten(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + isc__nm_put_ievent(mgr, ievent); + } else { + nsock->tid = isc_random_uniform(mgr->nworkers); + LOCK(&nsock->lock); + isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + (isc__netievent_t *) ievent); + WAIT(&nsock->cond, &nsock->lock); + UNLOCK(&nsock->lock); + } - - *sockp = nsock; - - return (ISC_R_SUCCESS); + if (nsock->result == ISC_R_SUCCESS) { + *sockp = nsock; + return (ISC_R_SUCCESS); + } else { + isc_result_t result = nsock->result; + isc_nmsocket_detach(&nsock); + return (result); + } } /* @@ -208,14 +220,37 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->type == isc_nm_tcplistener); + /* Initialize children now to make cleaning up easier */ + for (int i = 0; i < sock->nchildren; i++) { + isc_nmsocket_t *csock = &sock->children[i]; + + isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); + csock->parent = sock; + csock->iface = sock->iface; + csock->tid = i; + csock->pquota = sock->pquota; + csock->backlog = sock->backlog; + csock->extrahandlesize = sock->extrahandlesize; + + INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); + csock->rcb.accept = sock->rcb.accept; + csock->rcbarg = sock->rcbarg; + csock->fd = -1; + } + r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { - return; + /* It was never opened */ + atomic_store(&sock->closed, true); + sock->result = isc__nm_uverr2result(r); + goto fini; } r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); if (r != 0) { - return; + uv_close(&sock->uv_handle.handle, tcp_close_cb); + sock->result = isc__nm_uverr2result(r); + goto fini; } uv_handle_set_data(&sock->uv_handle.handle, sock); /* @@ -240,22 +275,9 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { * sockets to be listened on over ipc. */ for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpchildlisten_t *event = NULL; isc_nmsocket_t *csock = &sock->children[i]; - isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener); - csock->parent = sock; - csock->iface = sock->iface; - csock->tid = i; - csock->pquota = sock->pquota; - csock->backlog = sock->backlog; - csock->extrahandlesize = sock->extrahandlesize; - - INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); - csock->rcb.accept = sock->rcb.accept; - csock->rcbarg = sock->rcbarg; - csock->fd = -1; - + isc__netievent_tcpchildlisten_t *event; event = isc__nm_get_ievent(csock->mgr, netievent_tcpchildlisten); event->sock = csock; @@ -271,6 +293,10 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { atomic_store(&sock->listening, true); +fini: + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); return; } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index def3993800..cf3456c175 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -298,11 +298,15 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock, extrahandlesize, backlog, quota, &dnslistensock->outer); - - atomic_store(&dnslistensock->listening, true); - *sockp = dnslistensock; - - return (result); + if (result == ISC_R_SUCCESS) { + atomic_store(&dnslistensock->listening, true); + *sockp = dnslistensock; + return (ISC_R_SUCCESS); + } else { + atomic_store(&dnslistensock->closed, true); + isc_nmsocket_detach(&dnslistensock); + return (result); + } } void From b05194160bc1387542baa6b12d6252df0e6165ea Mon Sep 17 00:00:00 2001 From: Evan Hunt Date: Tue, 3 Dec 2019 00:07:59 -0800 Subject: [PATCH 04/17] style, comments --- lib/isc/netmgr/netmgr-int.h | 45 +++++++----- lib/isc/netmgr/netmgr.c | 17 +++-- lib/isc/netmgr/tcp.c | 143 ++++++++++++++++++++++-------------- lib/isc/netmgr/tcpdns.c | 8 +- lib/isc/netmgr/udp.c | 2 +- 5 files changed, 130 insertions(+), 85 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4a8f10aff7..e882f3586b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -122,7 +122,11 @@ typedef enum isc__netievent_type { netievent_udpstoplisten, netievent_tcpstoplisten, netievent_tcpclose, - netievent_prio = 0xff, + netievent_prio = 0xff, /* event type values higher than this + * will be treated as high-priority + * events, which can be processed + * while the netmgr is paused. + */ netievent_udplisten, netievent_tcplisten, } isc__netievent_type; @@ -304,7 +308,7 @@ struct isc_nmsocket { isc_nm_t *mgr; isc_nmsocket_t *parent; - /* + /*% * quota is the TCP client, attached when a TCP connection * is established. pquota is a non-attached pointer to the * TCP client quota, stored in listening sockets but only @@ -314,7 +318,7 @@ struct isc_nmsocket { isc_quota_t *pquota; bool overquota; - /* + /*% * TCP read timeout timer. */ uv_timer_t timer; @@ -327,18 +331,18 @@ struct isc_nmsocket { /*% server socket for connections */ isc_nmsocket_t *server; - /*% children sockets for multi-socket setups */ + /*% Child sockets for multi-socket setups */ isc_nmsocket_t *children; int nchildren; isc_nmiface_t *iface; isc_nmhandle_t *tcphandle; - /* used to send listening TCP sockets to children */ + /*% Used to transfer listening TCP sockets to children */ uv_pipe_t ipc; char ipc_pipe_name[32]; atomic_int_fast32_t schildren; - /*% extra data allocated at the end of each isc_nmhandle_t */ + /*% Extra data allocated at the end of each isc_nmhandle_t */ size_t extrahandlesize; /*% TCP backlog */ @@ -348,16 +352,17 @@ struct isc_nmsocket { uv_os_sock_t fd; union uv_any_handle uv_handle; + /*% Peer address */ isc_sockaddr_t peer; /* Atomic */ - /*% Number of running (e.g. listening) children sockets */ + /*% Number of running (e.g. listening) child sockets */ atomic_int_fast32_t rchildren; /*% - * Socket if active if it's listening, working, etc., if we're - * closing a socket it doesn't make any sense to e.g. still - * push handles or reqs for reuse + * Socket is active if it's listening, working, etc. If it's + * closing, then it doesn't make a sense, for example, to + * push handles or reqs for reuse. */ atomic_bool active; atomic_bool destroying; @@ -365,7 +370,8 @@ struct isc_nmsocket { /*% * Socket is closed if it's not active and all the possible * callbacks were fired, there are no active handles, etc. - * active==false, closed==false means the socket is closing. + * If active==false but closed==false, that means the socket + * is closing. */ atomic_bool closed; atomic_bool listening; @@ -407,13 +413,17 @@ struct isc_nmsocket { isc_astack_t *inactivehandles; isc_astack_t *inactivereqs; - /* - * Used to wait for listening event to be done and active/rchildren - * during shutdown. + /*% + * Used to wait for TCP listening events to complete, and + * for the number of running children to reach zero during + * shutdown. */ isc_mutex_t lock; isc_condition_t cond; + /*% + * Used to pass a result back from TCP listening events. + */ isc_result_t result; /*% @@ -442,12 +452,12 @@ struct isc_nmsocket { size_t *ah_frees; isc_nmhandle_t **ah_handles; - /* Buffer for TCPDNS processing, optional */ + /*% Buffer for TCPDNS processing */ size_t buf_size; size_t buf_len; unsigned char *buf; - /* + /*% * This function will be called with handle->sock * as the argument whenever a handle's references drop * to zero, after its reset callback has been called. @@ -600,7 +610,8 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0); void isc__nm_async_tcpstoplisten(isc__networker_t *worker, isc__netievent_t *ievent0); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index f91651c009..ab9f956b22 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -186,20 +186,25 @@ nm_destroy(isc_nm_t **mgr0) { for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - /* Empty the async event queue */ - isc__netievent_t *ievent; + isc__netievent_t *ievent = NULL; + int r; + + /* Empty the async event queues */ while ((ievent = (isc__netievent_t *) isc_queue_dequeue(worker->ievents)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } + while ((ievent = (isc__netievent_t *) isc_queue_dequeue(worker->ievents_prio)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - int r = uv_loop_close(&worker->loop); + + r = uv_loop_close(&worker->loop); INSIST(r == 0); + isc_queue_destroy(worker->ievents); isc_queue_destroy(worker->ievents_prio); isc_thread_join(worker->thread, NULL); @@ -499,7 +504,7 @@ async_cb(uv_async_t *handle) { static void process_queue(isc__networker_t *worker, isc_queue_t *queue) { - isc__netievent_t *ievent; + isc__netievent_t *ievent = NULL; while ((ievent = (isc__netievent_t *) isc_queue_dequeue(queue)) != NULL) @@ -839,8 +844,8 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, * be random? */ strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX); - for (int i=strlen(sock->ipc_pipe_name); i<31; i++) { - sock->ipc_pipe_name[i] = isc_random8()%24 + 'a'; + for (int i = strlen(sock->ipc_pipe_name); i < 31; i++) { + sock->ipc_pipe_name[i] = isc_random8() % 24 + 'a'; } sock->ipc_pipe_name[31] = '\0'; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 63573d8921..22d21d57bb 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -54,7 +54,7 @@ tcp_close_cb(uv_handle_t *uvhandle); static void ipc_connection_cb(uv_stream_t *stream, int status); static void -ipc_write_cb(uv_write_t* uvreq, int status); +ipc_write_cb(uv_write_t *uvreq, int status); static void parent_pipe_close_cb(uv_handle_t *handle); static void @@ -68,7 +68,7 @@ tcp_listenclose_cb(uv_handle_t *handle); static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { - isc__networker_t *worker; + isc__networker_t *worker = NULL; int r; REQUIRE(isc__nm_in_netthread()); @@ -114,7 +114,7 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data; - isc_nmsocket_t *sock; + isc_nmsocket_t *sock = NULL; sock = uv_handle_get_data((uv_handle_t *) uvreq->handle); REQUIRE(VALID_UVREQ(req)); @@ -152,6 +152,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t **sockp) { isc_nmsocket_t *nsock = NULL; + isc__netievent_tcplisten_t *ievent = NULL; REQUIRE(VALID_NM(mgr)); @@ -176,7 +177,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->pquota = quota; } - isc__netievent_tcplisten_t *ievent; ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; if (isc__nm_in_netthread()) { @@ -204,11 +204,12 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, } /* - * For TCP listening we create a single socket, bind it, and then pass it - * to `ncpu` child sockets - the passing is done over IPC. + * For TCP listening, we create a single socket, bind it, and then + * pass it to `ncpu` child sockets - the passing is done over IPC. + * * XXXWPK This design pattern is ugly but it's "the way to do it" recommended * by libuv documentation - which also mentions that there should be - * uv_export/uv_import functions which would simplify this greatly. + * uv_export/uv_import functions, which would simplify this greatly. */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { @@ -243,19 +244,21 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { /* It was never opened */ atomic_store(&sock->closed, true); sock->result = isc__nm_uverr2result(r); - goto fini; + goto done; } r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); if (r != 0) { uv_close(&sock->uv_handle.handle, tcp_close_cb); sock->result = isc__nm_uverr2result(r); - goto fini; + goto done; } uv_handle_set_data(&sock->uv_handle.handle, sock); + /* * This is not properly documented in libuv, and the example * (benchmark-multi-accept) is wrong: + * * 'ipc' parameter must be '0' for 'listening' IPC socket, '1' * only for the sockets are really passing the FDs between * threads. This works without any problems on Unices, but @@ -263,9 +266,11 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ r = uv_pipe_init(&worker->loop, &sock->ipc, 0); INSIST(r == 0); + uv_handle_set_data((uv_handle_t *)&sock->ipc, sock); r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); INSIST(r == 0); + r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, ipc_connection_cb); INSIST(r == 0); @@ -276,8 +281,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ for (int i = 0; i < sock->nchildren; i++) { isc_nmsocket_t *csock = &sock->children[i]; + isc__netievent_tcpchildlisten_t *event = NULL; - isc__netievent_tcpchildlisten_t *event; event = isc__nm_get_ievent(csock->mgr, netievent_tcpchildlisten); event->sock = csock; @@ -293,21 +298,25 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { atomic_store(&sock->listening, true); -fini: + done: LOCK(&sock->lock); SIGNAL(&sock->cond); UNLOCK(&sock->lock); return; } -/* Parent got an IPC connection from child */ +/* + * Parent received an IPC connection from child + */ static void ipc_connection_cb(uv_stream_t *stream, int status) { - int r; - REQUIRE(status == 0); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); + int r; + + REQUIRE(status == 0); + /* * The buffer can be anything, it will be ignored, but it has to * be something that won't disappear. @@ -318,30 +327,32 @@ ipc_connection_cb(uv_stream_t *stream, int status) { /* Failure here is critical */ r = uv_accept((uv_stream_t *) &sock->ipc, - (uv_stream_t*) &nreq->pipe); + (uv_stream_t *) &nreq->pipe); INSIST(r == 0); + r = uv_write2(&nreq->uv_req.write, - (uv_stream_t*) &nreq->pipe, - &nreq->uvbuf, - 1, - (uv_stream_t*) &sock->uv_handle.stream, + (uv_stream_t *) &nreq->pipe, &nreq->uvbuf, 1, + (uv_stream_t *) &sock->uv_handle.stream, ipc_write_cb); INSIST(r == 0); } static void -ipc_write_cb(uv_write_t* uvreq, int status) { - UNUSED(status); +ipc_write_cb(uv_write_t *uvreq, int status) { isc__nm_uvreq_t *req = uvreq->data; + + UNUSED(status); + /* - * We want all children to get the socket. If we're done we can stop - * listening on the IPC socket. + * We want all children to get the socket. If we're done, we + * can stop listening on the IPC socket. */ if (atomic_fetch_add(&req->sock->schildren, 1) == - req->sock->nchildren - 1) { - uv_close((uv_handle_t*) &req->sock->ipc, NULL); + req->sock->nchildren - 1) + { + uv_close((uv_handle_t *) &req->sock->ipc, NULL); } - uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb); + uv_close((uv_handle_t *) &req->pipe, parent_pipe_close_cb); } static void @@ -351,10 +362,13 @@ parent_pipe_close_cb(uv_handle_t *handle) { } void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *) ievent0; isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = NULL; int r; REQUIRE(isc__nm_in_netthread()); @@ -362,48 +376,58 @@ isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0 r = uv_pipe_init(&worker->loop, &sock->ipc, 1); INSIST(r == 0); - uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); - isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock); - uv_pipe_connect(&req->uv_req.connect, - &sock->ipc, + uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); + + req = isc__nm_uvreq_get(sock->mgr, sock); + uv_pipe_connect(&req->uv_req.connect, &sock->ipc, sock->parent->ipc_pipe_name, childlisten_ipc_connect_cb); } -/* child connected to parent over IPC */ +/* Child connected to parent over IPC */ static void childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { - UNUSED(status); isc__nm_uvreq_t *req = uvreq->data; isc_nmsocket_t *sock = req->sock; + int r; + + UNUSED(status); + isc__nm_uvreq_put(&req, sock); - int r = uv_read_start((uv_stream_t*) &sock->ipc, - isc__nm_alloc_cb, - childlisten_read_cb); + + r = uv_read_start((uv_stream_t *) &sock->ipc, isc__nm_alloc_cb, + childlisten_read_cb); INSIST(r == 0); } -/* child got the socket over IPC */ +/* Child received the socket via IPC */ static void childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - UNUSED(nread); - int r; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); + isc__networker_t *worker = NULL; + uv_pipe_t *ipc = NULL; + uv_handle_type type; + int r; + + UNUSED(nread); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); - uv_pipe_t* ipc = (uv_pipe_t*) stream; - uv_handle_type type = uv_pipe_pending_type(ipc); + + ipc = (uv_pipe_t *) stream; + type = uv_pipe_pending_type(ipc); INSIST(type == UV_TCP); + isc__nm_free_uvbuf(sock, buf); - isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()]; - uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp); + worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, (uv_tcp_t *) &sock->uv_handle.tcp); uv_handle_set_data(&sock->uv_handle.handle, sock); + uv_accept(stream, &sock->uv_handle.stream); r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, tcp_connection_cb); - uv_close((uv_handle_t*) ipc, NULL); + uv_close((uv_handle_t *) ipc, NULL); if (r != 0) { /* XXX log it? */ return; @@ -458,19 +482,21 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, static void stoplistening(isc_nmsocket_t *sock) { for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_tcpstopchildlisten_t *event = NULL; + /* - * Stoplistening is a rare event, we can ignore the overhead - * caused by allocating an event, and doing it this way + * We can ignore the overhead of event allocation because + * stoplistening is a rare event, and doing it this way * simplifies sock reference counting. */ - isc__netievent_tcpstopchildlisten_t *event = NULL; event = isc__nm_get_ievent(sock->mgr, netievent_tcpstopchildlisten); isc_nmsocket_attach(&sock->children[i], &event->sock); if (i == sock->tid) { - isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i], - (isc__netievent_t *) event); + isc__nm_async_tcpstopchildlisten( + &sock->mgr->workers[i], + (isc__netievent_t *) event); isc__nm_put_ievent(sock->mgr, event); } else { isc__nm_enqueue_ievent(&sock->mgr->workers[i], @@ -502,8 +528,8 @@ isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, REQUIRE(sock->parent != NULL); /* - * rchildren is atomic but we still need to change it - * under a lock as the parent is waiting on conditional + * rchildren is atomic, but we still need to change it + * under a lock because the parent is waiting on conditional * and without it we might deadlock. */ LOCK(&sock->parent->lock); @@ -515,19 +541,22 @@ isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, } /* - * This callback is used for closing child and parent listening sockets - - * that's why we need to choose the proper lock. + * This callback is used for closing both child and parent listening + * sockets; that's why we need to choose the proper lock. */ static void tcp_listenclose_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc_mutex_t * lock = (sock->parent != NULL) ? - &sock->parent->lock : &sock->lock; + isc_mutex_t *lock = ((sock->parent != NULL) + ? &sock->parent->lock + : &sock->lock); + LOCK(lock); atomic_store(&sock->closed, true); atomic_store(&sock->listening, false); sock->pquota = NULL; UNLOCK(lock); + isc_nmsocket_detach(&sock); } @@ -675,7 +704,7 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t*) stream); + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); @@ -790,7 +819,7 @@ accept_connection(isc_nmsocket_t *ssock) { static void tcp_connection_cb(uv_stream_t *server, int status) { - isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t*) server); + isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *) server); isc_result_t result; UNUSED(status); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index cf3456c175..2b719e8191 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -86,13 +86,13 @@ timer_close_cb(uv_handle_t *handle) { static void dnstcp_readtimeout(uv_timer_t *timer) { - isc_nmsocket_t *sock; - sock = (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *)timer); + isc_nmsocket_t *sock = + (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *) timer); REQUIRE(VALID_NMSOCK(sock)); isc_nmsocket_detach(&sock->outer); - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); } /* @@ -494,5 +494,5 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock) { if (sock->outer != NULL) { isc_nmsocket_detach(&sock->outer); } - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); } diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 97df0ef9ec..102ff0dbda 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -339,7 +339,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nmsocket_t *psock = NULL, *rsock = NULL; isc_nmsocket_t *sock = handle->sock; isc_sockaddr_t *peer = &handle->peer; - isc__netievent_udpsend_t *ievent; + isc__netievent_udpsend_t *ievent = NULL; isc__nm_uvreq_t *uvreq = NULL; int ntid; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); From 0bf74ac79284688c0b1cefbf8972235a75ee51e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 3 Dec 2019 19:48:10 +0100 Subject: [PATCH 05/17] netmgr: - make tcp listening IPC pipe name saner - put the pipe in /tmp on unices - add pid to the pipe name to avoid conflicts between processes - fsync directory in which the pipe resides to make sure that the child threads will see it and be able to open it --- lib/isc/netmgr/netmgr-int.h | 2 +- lib/isc/netmgr/netmgr.c | 18 +++++++++--------- lib/isc/netmgr/tcp.c | 32 +++++++++++++++++++++++++++++++- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index e882f3586b..1d7b09db32 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -339,7 +339,7 @@ struct isc_nmsocket { /*% Used to transfer listening TCP sockets to children */ uv_pipe_t ipc; - char ipc_pipe_name[32]; + char ipc_pipe_name[64]; atomic_int_fast32_t schildren; /*% Extra data allocated at the end of each isc_nmhandle_t */ diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index ab9f956b22..1216918d26 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -44,9 +44,9 @@ ISC_THREAD_LOCAL int isc__nm_tid_v = ISC_NETMGR_TID_UNKNOWN; #ifdef WIN32 -#define NAMED_PIPE_PREFIX "\\\\.\\pipe\\named-ipc" +#define NAMED_PIPE_PATTERN "\\\\.\\pipe\\named-%d-%u.pipe" #else -#define NAMED_PIPE_PREFIX ".named-ipc" +#define NAMED_PIPE_PATTERN "/tmp/named-%d-%u.pipe" #endif static void @@ -840,14 +840,14 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, } /* - * XXXWPK Maybe it should be in tmp, maybe it should not - * be random? + * Use a random number in the named pipe name. Also add getpid() + * to the name to make sure we don't get a conflict between + * different unit tests running at the same time, where the PRNG + * is initialized to a constant seed. */ - strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX); - for (int i = strlen(sock->ipc_pipe_name); i < 31; i++) { - sock->ipc_pipe_name[i] = isc_random8() % 24 + 'a'; - } - sock->ipc_pipe_name[31] = '\0'; + snprintf(sock->ipc_pipe_name, sizeof(sock->ipc_pipe_name), + NAMED_PIPE_PATTERN, getpid(), isc_random32()); + sock->ipc_pipe_name[sizeof(sock->ipc_pipe_name) - 1] = '\0'; isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 22d21d57bb..8a9eac74f1 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -10,6 +10,7 @@ */ #include +#include #include #include @@ -203,6 +204,25 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, } } +#ifndef WIN32 +/* + * Run fsync() on the directory containing a socket's IPC pipe, to + * ensure that all threads can see the pipe. + */ +static void +syncdir(const isc_nmsocket_t *sock) { + char *pipe = isc_mem_strdup(sock->mgr->mctx, sock->ipc_pipe_name); + int fd = open(dirname(pipe), O_RDONLY); + + RUNTIME_CHECK(fd >= 0); + + isc_mem_free(sock->mgr->mctx, pipe); + + fsync(fd); + close(fd); +} +#endif + /* * For TCP listening, we create a single socket, bind it, and then * pass it to `ncpu` child sockets - the passing is done over IPC. @@ -274,7 +294,17 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, ipc_connection_cb); INSIST(r == 0); - +#ifndef WIN32 + /* + * On Unices a child thread might not see the pipe yet; + * that happened quite often in unit tests on FreeBSD. + * Syncing the directory ensures that the pipe is visible + * to everyone. + * This isn't done on Windows because named pipes exist + * within a different namespace, not on VFS. + */ + syncdir(sock); +#endif /* * We launch n 'tcpchildlistener' that will receive * sockets to be listened on over ipc. From 23ab349bbd47cc168050caabe9e859b931bdff62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Fri, 6 Dec 2019 20:45:00 +0100 Subject: [PATCH 06/17] netmgr: fix a race in socket destruction, happening if we close the socket externally and, at the same time, a timeout timer callback was called. --- lib/isc/netmgr/netmgr.c | 58 ++++++++++++++++++++++++----------------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 1216918d26..09f5ef3b79 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1039,9 +1039,38 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { handle->doreset(handle->opaque); } + + + /* + * The handle is closed. If the socket has a callback configured + * for that (e.g., to perform cleanup after request processing), + * call it now. + */ + bool do_close = true; + if (sock->closehandle_cb != NULL) { + if (sock->tid == isc_nm_tid()) { + sock->closehandle_cb(sock); + } else { + isc__netievent_closecb_t * event = + isc__nm_get_ievent(sock->mgr, + netievent_closecb); + isc_nmsocket_attach(sock, &event->sock); + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) event); + /* + * If we do this asynchronously then the async event + * will clean the socket, so clean up the handle from + * socket and exit. + */ + do_close = false; + } + } + /* * We do all of this under lock to avoid races with socket * destruction. + * We have to do this now otherwise we might race - at this point + * the socket is either unused or attached to event->sock. */ LOCK(&sock->lock); @@ -1058,36 +1087,17 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { reuse = isc_astack_trypush(sock->inactivehandles, handle); } - - UNLOCK(&sock->lock); - if (!reuse) { nmhandle_free(sock, handle); } + UNLOCK(&sock->lock); - /* - * The handle is closed. If the socket has a callback configured - * for that (e.g., to perform cleanup after request processing), - * call it now. - */ - if (sock->closehandle_cb != NULL) { - if (sock->tid == isc_nm_tid()) { - sock->closehandle_cb(sock); - } else { - isc__netievent_closecb_t * event = - isc__nm_get_ievent(sock->mgr, - netievent_closecb); - isc_nmsocket_attach(sock, &event->sock); - isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], - (isc__netievent_t *) event); - /* - * If we do this asynchronously then the async event - * will clean the socket, so just exit. - */ - return; - } + /* Close callback will clean everything up */ + if (!do_close) { + return; } + if (atomic_load(&sock->ah) == 0 && !atomic_load(&sock->active) && !atomic_load(&sock->destroying)) From 3e66b7ba1c76a5d7da7558d0e1d1ac785580c71d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Fri, 6 Dec 2019 22:25:52 +0100 Subject: [PATCH 07/17] Fix a race in tcpdns close with uv_close on timer stop timers before closing netmgr: tcpdns_close needs to be asynchronous, it manipulates sock->timer --- lib/isc/netmgr/netmgr-int.h | 5 +++++ lib/isc/netmgr/netmgr.c | 39 +++++++++++++++++-------------------- lib/isc/netmgr/tcp.c | 3 ++- lib/isc/netmgr/tcpdns.c | 39 +++++++++++++++++++++++++++++++++---- 4 files changed, 60 insertions(+), 26 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 1d7b09db32..b2a201c553 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -122,6 +122,7 @@ typedef enum isc__netievent_type { netievent_udpstoplisten, netievent_tcpstoplisten, netievent_tcpclose, + netievent_tcpdnsclose, netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -194,6 +195,7 @@ typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_closecb_t; @@ -644,6 +646,9 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock); * Close a TCPDNS socket. */ +void +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ievent0); + #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) isc_result_t diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 09f5ef3b79..8d1c711683 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -550,6 +550,9 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; + case netievent_tcpdnsclose: + isc__nm_async_tcpdnsclose(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); break; @@ -675,8 +678,9 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { sock->pquota = NULL; if (sock->timer_initialized) { - uv_close((uv_handle_t *)&sock->timer, NULL); sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *)&sock->timer, NULL); } isc_astack_destroy(sock->inactivehandles); @@ -1022,6 +1026,7 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; size_t handlenum; bool reuse = false; + bool do_close = true; int refs; REQUIRE(VALID_NMHANDLE(handle)); @@ -1039,28 +1044,28 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { handle->doreset(handle->opaque); } - - /* * The handle is closed. If the socket has a callback configured * for that (e.g., to perform cleanup after request processing), - * call it now. + * call it now, or schedule it to run asynchronously. */ - bool do_close = true; if (sock->closehandle_cb != NULL) { if (sock->tid == isc_nm_tid()) { sock->closehandle_cb(sock); } else { - isc__netievent_closecb_t * event = + isc__netievent_closecb_t *event = isc__nm_get_ievent(sock->mgr, netievent_closecb); isc_nmsocket_attach(sock, &event->sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); + /* - * If we do this asynchronously then the async event - * will clean the socket, so clean up the handle from - * socket and exit. + * If we're doing this asynchronously, then the + * async event will take care of closing the + * socket, so we can clean up the handle + * from the socket, but skip calling + * nmsocket_maybe_destory() */ do_close = false; } @@ -1068,9 +1073,8 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { /* * We do all of this under lock to avoid races with socket - * destruction. - * We have to do this now otherwise we might race - at this point - * the socket is either unused or attached to event->sock. + * destruction. We have to do this now, because at this point the + * socket is either unused or still attached to event->sock. */ LOCK(&sock->lock); @@ -1092,15 +1096,8 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { } UNLOCK(&sock->lock); - /* Close callback will clean everything up */ - if (!do_close) { - return; - } - - - if (atomic_load(&sock->ah) == 0 && - !atomic_load(&sock->active) && - !atomic_load(&sock->destroying)) + if (do_close && atomic_load(&sock->ah) == 0 && + !atomic_load(&sock->active) && !atomic_load(&sock->destroying)) { nmsocket_maybe_destroy(sock); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 8a9eac74f1..72a402643e 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -1003,8 +1003,9 @@ tcp_close_direct(isc_nmsocket_t *sock) { } } if (sock->timer_initialized) { - uv_close((uv_handle_t *)&sock->timer, timer_close_cb); sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *)&sock->timer, timer_close_cb); } else { isc_nmsocket_detach(&sock->server); uv_close(&sock->uv_handle.handle, tcp_close_cb); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 2b719e8191..1dd19daf34 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -79,7 +79,6 @@ static void timer_close_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = (isc_nmsocket_t *) uv_handle_get_data(handle); INSIST(VALID_NMSOCK(sock)); - sock->timer_initialized = false; atomic_store(&sock->closed, true); isc_nmsocket_detach(&sock); } @@ -489,10 +488,42 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, } -void -isc__nm_tcpdns_close(isc_nmsocket_t *sock) { +static void +tcpdns_close_direct(isc_nmsocket_t *sock) { if (sock->outer != NULL) { isc_nmsocket_detach(&sock->outer); } - uv_close((uv_handle_t *) &sock->timer, timer_close_cb); + /* We don't need atomics here, it's all in single network thread */ + if (sock->timer_initialized) { + sock->timer_initialized = false; + uv_timer_stop(&sock->timer); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); + } +} + +void +isc__nm_tcpdns_close(isc_nmsocket_t *sock) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpdnssocket); + + if (sock->tid == isc_nm_tid()) { + tcpdns_close_direct(sock); + } else { + isc__netievent_tcpdnsclose_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_tcpdnsclose); + + ievent->sock = sock; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *) ievent); + } +} + +void +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ievent0) { + isc__netievent_tcpdnsclose_t *ievent = + (isc__netievent_tcpdnsclose_t *) ievent0; + + REQUIRE(worker->id == ievent->sock->tid); + + tcpdns_close_direct(ievent->sock); } From c7b86d1cac2bf4a1d3a5dea58e57459a27fc5a7d Mon Sep 17 00:00:00 2001 From: Evan Hunt Date: Sat, 7 Dec 2019 01:48:11 +0100 Subject: [PATCH 08/17] Style fixes --- lib/isc/netmgr/netmgr-int.h | 5 +- lib/isc/netmgr/netmgr.c | 4 +- lib/isc/netmgr/tcp.c | 99 ++++++++++++++++++++++--------------- 3 files changed, 65 insertions(+), 43 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index b2a201c553..852c65cdb2 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -166,12 +166,13 @@ typedef struct isc__nm_uvreq { isc_nmsocket_t * sock; isc_nmhandle_t * handle; uv_buf_t uvbuf; /* translated isc_region_t, to be - sent or received */ + * sent or received */ isc_sockaddr_t local; /* local address */ isc_sockaddr_t peer; /* peer address */ isc__nm_cb_t cb; /* callback */ void * cbarg; /* callback argument */ - uv_pipe_t pipe; + uv_pipe_t ipc; /* used for sending socket + * uv_handles to other threads */ union { uv_req_t req; uv_getaddrinfo_t getaddrinfo; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 8d1c711683..636c919730 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -338,9 +338,9 @@ isc_nm_destroy(isc_nm_t **mgr0) { */ while (isc_refcount_current(&mgr->references) > 1) { #ifdef WIN32 - _sleep(1000); + _sleep(1000); #else - usleep(1000000); + usleep(1000000); #endif } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 72a402643e..7e3972d07a 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -57,7 +57,7 @@ ipc_connection_cb(uv_stream_t *stream, int status); static void ipc_write_cb(uv_write_t *uvreq, int status); static void -parent_pipe_close_cb(uv_handle_t *handle); +ipc_close_cb(uv_handle_t *handle); static void childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status); static void @@ -149,8 +149,7 @@ isc_result_t isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize, int backlog, - isc_quota_t *quota, - isc_nmsocket_t **sockp) + isc_quota_t *quota, isc_nmsocket_t **sockp) { isc_nmsocket_t *nsock = NULL; isc__netievent_tcplisten_t *ievent = NULL; @@ -224,12 +223,15 @@ syncdir(const isc_nmsocket_t *sock) { #endif /* - * For TCP listening, we create a single socket, bind it, and then - * pass it to `ncpu` child sockets - the passing is done over IPC. + * For multi-threaded TCP listening, we create a single "parent" socket, + * bind to it, and then pass its uv_handle to a set of child sockets, one + * per worker. For thread safety, the passing of the socket's uv_handle has + * to be done via IPC socket. * - * XXXWPK This design pattern is ugly but it's "the way to do it" recommended - * by libuv documentation - which also mentions that there should be - * uv_export/uv_import functions, which would simplify this greatly. + * This design pattern is ugly but it's what's recommended by the libuv + * documentation. (A prior version of libuv had uv_export() and + * uv_import() functions which would have simplified this greatly, but + * they have been deprecated and removed.) */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { @@ -276,24 +278,26 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { uv_handle_set_data(&sock->uv_handle.handle, sock); /* - * This is not properly documented in libuv, and the example - * (benchmark-multi-accept) is wrong: + * uv_pipe_init() is incorrectly documented in libuv, and the + * example in benchmark-multi-accept.c is also wrong. * - * 'ipc' parameter must be '0' for 'listening' IPC socket, '1' - * only for the sockets are really passing the FDs between - * threads. This works without any problems on Unices, but - * breaks horribly on Windows. + * The third parameter ('ipc') indicates that the pipe will be + * used to *send* a handle to other threads. Therefore, it must + * must be set to 0 for a 'listening' IPC socket, and 1 + * only for sockets that are really passing FDs between + * threads. */ r = uv_pipe_init(&worker->loop, &sock->ipc, 0); - INSIST(r == 0); + RUNTIME_CHECK(r == 0); uv_handle_set_data((uv_handle_t *)&sock->ipc, sock); r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); - INSIST(r == 0); + RUNTIME_CHECK(r == 0); r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, ipc_connection_cb); - INSIST(r == 0); + RUNTIME_CHECK(r == 0); + #ifndef WIN32 /* * On Unices a child thread might not see the pipe yet; @@ -305,9 +309,11 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ syncdir(sock); #endif + /* - * We launch n 'tcpchildlistener' that will receive - * sockets to be listened on over ipc. + * For each worker we send a 'tcpchildlisten' event. The child + * listener will then receive its version of the socket + * uv_handle via IPC in isc__nm_async_tcpchildlisten(). */ for (int i = 0; i < sock->nchildren; i++) { isc_nmsocket_t *csock = &sock->children[i]; @@ -336,7 +342,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { } /* - * Parent received an IPC connection from child + * The parent received an IPC connection from a child and can now send + * the uv_handle to the child for listening. */ static void ipc_connection_cb(uv_stream_t *stream, int status) { @@ -352,21 +359,22 @@ ipc_connection_cb(uv_stream_t *stream, int status) { * be something that won't disappear. */ nreq->uvbuf = uv_buf_init((char *)nreq, 1); - uv_pipe_init(&worker->loop, &nreq->pipe, 1); - uv_handle_set_data((uv_handle_t *)&nreq->pipe, nreq); + uv_pipe_init(&worker->loop, &nreq->ipc, 1); + uv_handle_set_data((uv_handle_t *)&nreq->ipc, nreq); - /* Failure here is critical */ - r = uv_accept((uv_stream_t *) &sock->ipc, - (uv_stream_t *) &nreq->pipe); - INSIST(r == 0); + r = uv_accept((uv_stream_t *) &sock->ipc, (uv_stream_t *) &nreq->ipc); + RUNTIME_CHECK(r == 0); r = uv_write2(&nreq->uv_req.write, - (uv_stream_t *) &nreq->pipe, &nreq->uvbuf, 1, - (uv_stream_t *) &sock->uv_handle.stream, - ipc_write_cb); - INSIST(r == 0); + (uv_stream_t *) &nreq->ipc, &nreq->uvbuf, 1, + (uv_stream_t *) &sock->uv_handle.stream, ipc_write_cb); + RUNTIME_CHECK(r == 0); } +/* + * The call to send a socket uv_handle is complete; we may be able + * to close the IPC connection. + */ static void ipc_write_cb(uv_write_t *uvreq, int status) { isc__nm_uvreq_t *req = uvreq->data; @@ -374,23 +382,30 @@ ipc_write_cb(uv_write_t *uvreq, int status) { UNUSED(status); /* - * We want all children to get the socket. If we're done, we - * can stop listening on the IPC socket. + * We want all children to get the socket. Once that's done, + * we can stop listening on the IPC socket. */ if (atomic_fetch_add(&req->sock->schildren, 1) == req->sock->nchildren - 1) { uv_close((uv_handle_t *) &req->sock->ipc, NULL); } - uv_close((uv_handle_t *) &req->pipe, parent_pipe_close_cb); + uv_close((uv_handle_t *) &req->ipc, ipc_close_cb); } +/* + * The IPC socket is closed: free its resources. + */ static void -parent_pipe_close_cb(uv_handle_t *handle) { +ipc_close_cb(uv_handle_t *handle) { isc__nm_uvreq_t *req = uv_handle_get_data(handle); isc__nm_uvreq_put(&req, req->sock); } +/* + * Connect to the parent socket and be ready to receive the uv_handle + * for the socket we'll be listening on. + */ void isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) @@ -405,7 +420,7 @@ isc__nm_async_tcpchildlisten(isc__networker_t *worker, REQUIRE(sock->type == isc_nm_tcpchildlistener); r = uv_pipe_init(&worker->loop, &sock->ipc, 1); - INSIST(r == 0); + RUNTIME_CHECK(r == 0); uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); @@ -415,7 +430,7 @@ isc__nm_async_tcpchildlisten(isc__networker_t *worker, childlisten_ipc_connect_cb); } -/* Child connected to parent over IPC */ +/* Child is now connected to parent via IPC and can begin reading. */ static void childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_t *req = uvreq->data; @@ -428,10 +443,13 @@ childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { r = uv_read_start((uv_stream_t *) &sock->ipc, isc__nm_alloc_cb, childlisten_read_cb); - INSIST(r == 0); + RUNTIME_CHECK(r == 0); } -/* Child received the socket via IPC */ +/* + * Child has received the socket uv_handle via IPC, and can now begin + * listening for connections and can close the IPC socket. + */ static void childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); @@ -459,7 +477,10 @@ childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { tcp_connection_cb); uv_close((uv_handle_t *) ipc, NULL); if (r != 0) { - /* XXX log it? */ + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "IPC socket close failed: %s", + isc_result_totext(isc__nm_uverr2result(r))); return; } } From ef2dff5c7a34b59630045fa9c72fd0e6e0a75bf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Sun, 8 Dec 2019 21:14:08 +0100 Subject: [PATCH 09/17] pause and unpause netmgr in isc_nm_destroy to flush all events from worker queues --- lib/isc/netmgr/netmgr.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 636c919730..0713aa42fc 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -337,6 +337,14 @@ isc_nm_destroy(isc_nm_t **mgr0) { * Wait for the manager to be dereferenced elsewhere. */ while (isc_refcount_current(&mgr->references) > 1) { + /* + * Sometimes libuv gets stuck, pausing and unpausing + * netmgr goes over all events in async queue for all + * the workers, and since it's done only on shutdown it + * doesn't cost us anything. + */ + isc_nm_pause(mgr); + isc_nm_resume(mgr); #ifdef WIN32 _sleep(1000); #else From b0779cc429a5c68364d31af1039bb2a6172babcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Sat, 7 Dec 2019 23:43:52 +0100 Subject: [PATCH 10/17] netmgr: Add more DbC checks for asynchronous calls. --- lib/isc/netmgr/tcpdns.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 1dd19daf34..ad53aeface 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -89,6 +89,7 @@ dnstcp_readtimeout(uv_timer_t *timer) { (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *) timer); REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); isc_nmsocket_detach(&sock->outer); uv_close((uv_handle_t *) &sock->timer, timer_close_cb); @@ -202,6 +203,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { REQUIRE(VALID_NMSOCK(dnssock)); REQUIRE(VALID_NMHANDLE(handle)); + REQUIRE(dnssock->tid == isc_nm_tid()); if (region == NULL) { /* Connection closed */ @@ -490,6 +492,7 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, static void tcpdns_close_direct(isc_nmsocket_t *sock) { + REQUIRE(sock->tid == isc_nm_tid()); if (sock->outer != NULL) { isc_nmsocket_detach(&sock->outer); } From b804d3a395fd0038670f51df6269efcbeb67088a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Sat, 7 Dec 2019 23:44:16 +0100 Subject: [PATCH 11/17] always return true in ns_interfacemgr_listeningon if interfacemgr is shutting down to avoid deadlocks on shutdown. --- lib/ns/interfacemgr.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/ns/interfacemgr.c b/lib/ns/interfacemgr.c index 2dd03a0eee..3bbeee2a50 100644 --- a/lib/ns/interfacemgr.c +++ b/lib/ns/interfacemgr.c @@ -83,6 +83,7 @@ struct ns_interfacemgr { ISC_LIST(isc_sockaddr_t) listenon; int backlog; /*%< Listen queue size */ unsigned int udpdisp; /*%< UDP dispatch count */ + atomic_bool shuttingdown; /*%< Interfacemgr is shutting down */ #ifdef USE_ROUTE_SOCKET isc_task_t * task; isc_socket_t * route; @@ -217,6 +218,7 @@ ns_interfacemgr_create(isc_mem_t *mctx, mgr->listenon4 = NULL; mgr->listenon6 = NULL; mgr->udpdisp = udpdisp; + atomic_init(&mgr->shuttingdown, false); ISC_LIST_INIT(mgr->interfaces); ISC_LIST_INIT(mgr->listenon); @@ -360,6 +362,7 @@ ns_interfacemgr_shutdown(ns_interfacemgr_t *mgr) { * consider all interfaces "old". */ mgr->generation++; + atomic_store(&mgr->shuttingdown, true); #ifdef USE_ROUTE_SOCKET LOCK(&mgr->lock); if (mgr->route != NULL) { @@ -1230,7 +1233,13 @@ ns_interfacemgr_listeningon(ns_interfacemgr_t *mgr, bool result = false; REQUIRE(NS_INTERFACEMGR_VALID(mgr)); - + /* + * If the manager is shutting down it's safer to + * return true. + */ + if (atomic_load(&mgr->shuttingdown)) { + return (true); + } LOCK(&mgr->lock); for (old = ISC_LIST_HEAD(mgr->listenon); old != NULL; From 86a847314a16c7fcd6960aec3c0790efa3232e42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Sun, 8 Dec 2019 22:44:08 +0100 Subject: [PATCH 12/17] Fix a race in socket destruction - we need to remove handle from socket in async close callback or we might race between destruction in the callback and in the original nmhandle_unref --- lib/isc/netmgr/netmgr-int.h | 10 ++++- lib/isc/netmgr/netmgr.c | 90 ++++++++++++++++++------------------- 2 files changed, 54 insertions(+), 46 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 852c65cdb2..bed5622f8b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -199,7 +199,6 @@ typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; -typedef isc__netievent__socket_t isc__netievent_closecb_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -212,6 +211,15 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpchildlisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; +typedef struct isc__netievent__socket_handle { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc_nmhandle_t *handle; +} isc__netievent__socket_handle_t; + +typedef isc__netievent__socket_handle_t isc__netievent_closecb_t; + + typedef struct isc__netievent_udpsend { isc__netievent_type type; isc_nmsocket_t *sock; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 0713aa42fc..78a596bb50 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -713,11 +713,9 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { static void nmsocket_maybe_destroy(isc_nmsocket_t *sock) { - int active_handles = 0; + int active_handles; bool destroy = false; - REQUIRE(!isc__nmsocket_active(sock)); - if (sock->parent != NULL) { /* * This is a child socket and cannot be destroyed except @@ -734,7 +732,13 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { * accept destruction. */ LOCK(&sock->lock); - active_handles += atomic_load(&sock->ah); + if (atomic_load(&sock->active) || atomic_load(&sock->destroying) || + !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) { + UNLOCK(&sock->lock); + return; + } + + active_handles = atomic_load(&sock->ah); if (sock->children != NULL) { for (int i = 0; i < sock->nchildren; i++) { LOCK(&sock->children[i].lock); @@ -743,9 +747,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { } } - if (atomic_load(&sock->closed) && - atomic_load(&sock->references) == 0 && - (active_handles == 0 || sock->tcphandle != NULL)) + if (active_handles == 0 || sock->tcphandle != NULL) { destroy = true; } @@ -1029,12 +1031,37 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { isc_mem_put(sock->mgr->mctx, handle, sizeof(isc_nmhandle_t) + extra); } +static void +nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { + /* + * We do all of this under lock to avoid races with socket + * destruction. We have to do this now, because at this point the + * socket is either unused or still attached to event->sock. + */ + LOCK(&sock->lock); + + INSIST(sock->ah_handles[handle->ah_pos] == handle); + INSIST(sock->ah_size > handle->ah_pos); + INSIST(atomic_load(&sock->ah) > 0); + + sock->ah_handles[handle->ah_pos] = NULL; + size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; + sock->ah_frees[handlenum] = handle->ah_pos; + handle->ah_pos = 0; + bool reuse = false; + if (atomic_load(&sock->active)) { + reuse = isc_astack_trypush(sock->inactivehandles, + handle); + } + if (!reuse) { + nmhandle_free(sock, handle); + } + UNLOCK(&sock->lock); +} + void isc_nmhandle_unref(isc_nmhandle_t *handle) { isc_nmsocket_t *sock = NULL; - size_t handlenum; - bool reuse = false; - bool do_close = true; int refs; REQUIRE(VALID_NMHANDLE(handle)); @@ -1065,50 +1092,21 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { isc__nm_get_ievent(sock->mgr, netievent_closecb); isc_nmsocket_attach(sock, &event->sock); + event->handle = handle; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); /* * If we're doing this asynchronously, then the - * async event will take care of closing the - * socket, so we can clean up the handle - * from the socket, but skip calling - * nmsocket_maybe_destory() + * async event will take care of cleaning up the + * handle and closing the socket. */ - do_close = false; + return; } } - /* - * We do all of this under lock to avoid races with socket - * destruction. We have to do this now, because at this point the - * socket is either unused or still attached to event->sock. - */ - LOCK(&sock->lock); - - INSIST(sock->ah_handles[handle->ah_pos] == handle); - INSIST(sock->ah_size > handle->ah_pos); - INSIST(atomic_load(&sock->ah) > 0); - - sock->ah_handles[handle->ah_pos] = NULL; - handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; - sock->ah_frees[handlenum] = handle->ah_pos; - handle->ah_pos = 0; - - if (atomic_load(&sock->active)) { - reuse = isc_astack_trypush(sock->inactivehandles, - handle); - } - if (!reuse) { - nmhandle_free(sock, handle); - } - UNLOCK(&sock->lock); - - if (do_close && atomic_load(&sock->ah) == 0 && - !atomic_load(&sock->active) && !atomic_load(&sock->destroying)) - { - nmsocket_maybe_destroy(sock); - } + nmhandle_deactivate(sock, handle); + nmsocket_maybe_destroy(sock); } void * @@ -1250,6 +1248,8 @@ isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) { UNUSED(worker); + nmhandle_deactivate(ievent->sock, ievent->handle); + ievent->sock->closehandle_cb(ievent->sock); isc_nmsocket_detach(&ievent->sock); } From a34ced776e83fb13afa55c71894dfd8f25020d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Sun, 8 Dec 2019 23:09:16 +0100 Subject: [PATCH 13/17] Remove read callback before detaching from inner socket in tcpdns --- lib/isc/netmgr/tcp.c | 26 +++++++++++++++++++------- lib/isc/netmgr/tcpdns.c | 1 + 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 7e3972d07a..852c3cd4d6 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -765,10 +765,14 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { .base = (unsigned char *) buf->base, .length = nread }; - - INSIST(sock->rcb.recv != NULL); - sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); - + /* + * This might happen if the inner socket is closing. + * It means that it's detached, so the socket will + * be closed. + */ + if (sock->rcb.recv != NULL) { + sock->rcb.recv(sock->tcphandle, ®ion, sock->rcbarg); + } sock->read_timeout = (atomic_load(&sock->keepalive) ? sock->mgr->keepalive : sock->mgr->idle); @@ -786,8 +790,14 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { if (sock->quota) { isc_quota_detach(&sock->quota); } - sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); - + /* + * This might happen if the inner socket is closing. + * It means that it's detached, so the socket will + * be closed. + */ + if (sock->rcb.recv != NULL) { + sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); + } /* * We don't need to clean up now; the socket will be closed and * resources and quota reclaimed when handle is freed in @@ -1067,7 +1077,9 @@ void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL) { + if (sock->type == isc_nm_tcpsocket && + sock->tcphandle != NULL && + sock->rcb.recv != NULL) { sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); } } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index ad53aeface..94fb021bc6 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -494,6 +494,7 @@ static void tcpdns_close_direct(isc_nmsocket_t *sock) { REQUIRE(sock->tid == isc_nm_tid()); if (sock->outer != NULL) { + sock->outer->rcb.recv = NULL; isc_nmsocket_detach(&sock->outer); } /* We don't need atomics here, it's all in single network thread */ From 35679aef9bb7a009267dc865ea5e4d4183ab568a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 9 Dec 2019 14:39:38 +0100 Subject: [PATCH 14/17] unittest: Allow for 32 (not 16) mock nmhandles in ns tests --- lib/ns/tests/nstest.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/ns/tests/nstest.c b/lib/ns/tests/nstest.c index 927bcab5d3..6a6bd45e72 100644 --- a/lib/ns/tests/nstest.c +++ b/lib/ns/tests/nstest.c @@ -75,8 +75,8 @@ static dns_zone_t *served_zone = NULL; /* * We don't want to use netmgr-based client accounting, we need to emulate it. */ -atomic_uint_fast32_t client_refs[16]; -atomic_uintptr_t client_addrs[16]; +atomic_uint_fast32_t client_refs[32]; +atomic_uintptr_t client_addrs[32]; void __wrap_isc_nmhandle_unref(isc_nmhandle_t *handle); @@ -86,12 +86,12 @@ __wrap_isc_nmhandle_unref(isc_nmhandle_t *handle) { ns_client_t *client = (ns_client_t *)handle; int i; - for (i = 0; i < 16; i++) { + for (i = 0; i < 32; i++) { if (atomic_load(&client_addrs[i]) == (uintptr_t) client) { break; } } - REQUIRE(i < 16); + REQUIRE(i < 32); if (atomic_fetch_sub(&client_refs[i], 1) == 1) { dns_view_detach(&client->view); @@ -561,14 +561,14 @@ ns_test_getclient(ns_interface_t *ifp0, bool tcp, result = ns__client_setup(client, clientmgr, true); - for (i = 0; i < 16; i++) { + for (i = 0; i < 32; i++) { if (atomic_load(&client_addrs[i]) == (uintptr_t) NULL || atomic_load(&client_addrs[i]) == (uintptr_t) client) { break; } } - REQUIRE(i < 16); + REQUIRE(i < 32); atomic_store(&client_refs[i], 2); atomic_store(&client_addrs[i], (uintptr_t) client); From 8c0792723d45c04b9ea2526c4ffafa0815e0925d Mon Sep 17 00:00:00 2001 From: Evan Hunt Date: Mon, 9 Dec 2019 10:49:37 -0800 Subject: [PATCH 15/17] style nits --- lib/isc/netmgr/netmgr.c | 6 +++--- lib/isc/netmgr/tcp.c | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 78a596bb50..7964d55c72 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -733,7 +733,8 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { */ LOCK(&sock->lock); if (atomic_load(&sock->active) || atomic_load(&sock->destroying) || - !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) { + !atomic_load(&sock->closed) || atomic_load(&sock->references) != 0) + { UNLOCK(&sock->lock); return; } @@ -747,8 +748,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { } } - if (active_handles == 0 || sock->tcphandle != NULL) - { + if (active_handles == 0 || sock->tcphandle != NULL) { destroy = true; } UNLOCK(&sock->lock); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 852c3cd4d6..4567f9343a 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -1079,7 +1079,8 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL && - sock->rcb.recv != NULL) { + sock->rcb.recv != NULL) + { sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); } } From 31b3980ef07a3809dcda54f17795d7e2895f5bc2 Mon Sep 17 00:00:00 2001 From: Evan Hunt Date: Mon, 9 Dec 2019 12:24:46 -0800 Subject: [PATCH 16/17] shorten some names reduce line breaks and general unwieldiness by changing some function, type, and parameter names. --- lib/isc/netmgr/netmgr-int.h | 46 ++++++++++++-------------- lib/isc/netmgr/netmgr.c | 20 ++++++------ lib/isc/netmgr/tcp.c | 64 ++++++++++++++++--------------------- lib/isc/netmgr/tcpdns.c | 4 +-- lib/isc/netmgr/udp.c | 27 +++++++--------- 5 files changed, 72 insertions(+), 89 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index bed5622f8b..daae5fd880 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -115,12 +115,12 @@ typedef enum isc__netievent_type { netievent_tcpstartread, netievent_tcppauseread, netievent_tcpchildlisten, - netievent_tcpstopchildlisten, + netievent_tcpchildstop, netievent_closecb, netievent_shutdown, netievent_stop, - netievent_udpstoplisten, - netievent_tcpstoplisten, + netievent_udpstop, + netievent_tcpstop, netievent_tcpclose, netievent_tcpdnsclose, netievent_prio = 0xff, /* event type values higher than this @@ -192,9 +192,9 @@ typedef struct isc__netievent__socket { } isc__netievent__socket_t; typedef isc__netievent__socket_t isc__netievent_udplisten_t; -typedef isc__netievent__socket_t isc__netievent_udpstoplisten_t; -typedef isc__netievent__socket_t isc__netievent_tcpstoplisten_t; -typedef isc__netievent__socket_t isc__netievent_tcpstopchildlisten_t; +typedef isc__netievent__socket_t isc__netievent_udpstop_t; +typedef isc__netievent__socket_t isc__netievent_tcpstop_t; +typedef isc__netievent__socket_t isc__netievent_tcpchildstop_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; @@ -566,13 +566,13 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock); */ void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Issue a 'handle closed' callback on the socket. */ void -isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Walk through all uv handles, get the underlying sockets and issue * close on them. @@ -586,13 +586,12 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, */ void -isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_udpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous UDP events (listen, stoplisten, send). */ @@ -617,26 +616,23 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock); */ void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, - isc__netievent_t *ievent0); +isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0); /*%< * Callback handlers for asynchronous TCP events (connect, listen, * stoplisten, send, read, pause, close). @@ -656,7 +652,7 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock); */ void -isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0); #define isc__nm_uverr2result(x) \ isc___nm_uverr2result(x, true, __FILE__, __LINE__) diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 7964d55c72..0c1cba4561 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -525,8 +525,8 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_udplisten: isc__nm_async_udplisten(worker, ievent); break; - case netievent_udpstoplisten: - isc__nm_async_udpstoplisten(worker, ievent); + case netievent_udpstop: + isc__nm_async_udpstop(worker, ievent); break; case netievent_udpsend: isc__nm_async_udpsend(worker, ievent); @@ -549,11 +549,11 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpsend: isc__nm_async_tcpsend(worker, ievent); break; - case netievent_tcpstoplisten: - isc__nm_async_tcpstoplisten(worker, ievent); + case netievent_tcpstop: + isc__nm_async_tcpstop(worker, ievent); break; - case netievent_tcpstopchildlisten: - isc__nm_async_tcpstopchildlisten(worker, ievent); + case netievent_tcpchildstop: + isc__nm_async_tcpchildstop(worker, ievent); break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); @@ -1238,9 +1238,9 @@ isc_nm_send(isc_nmhandle_t *handle, isc_region_t *region, } void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_closecb_t *ievent = - (isc__netievent_closecb_t *) ievent0; + (isc__netievent_closecb_t *) ev0; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -1268,8 +1268,8 @@ shutdown_walk_cb(uv_handle_t *handle, void *arg) { } void -isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ievent0) { - UNUSED(ievent0); +isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { + UNUSED(ev0); uv_walk(&worker->loop, shutdown_walk_cb, NULL); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 4567f9343a..a996acfb01 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -95,9 +95,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { } void -isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpconnect_t *ievent = - (isc__netievent_tcpconnect_t *) ievent0; + (isc__netievent_tcpconnect_t *) ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; int r; @@ -234,9 +234,9 @@ syncdir(const isc_nmsocket_t *sock) { * they have been deprecated and removed.) */ void -isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcplisten_t *ievent = - (isc__netievent_tcplisten_t *) ievent0; + (isc__netievent_tcplisten_t *) ev0; isc_nmsocket_t *sock = ievent->sock; int r; @@ -407,11 +407,9 @@ ipc_close_cb(uv_handle_t *handle) { * for the socket we'll be listening on. */ void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_tcplisten_t *ievent = - (isc__netievent_tcplisten_t *) ievent0; +isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpchildlisten_t *ievent = + (isc__netievent_tcpchildlisten_t *) ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = NULL; int r; @@ -488,23 +486,20 @@ childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { void isc_nm_tcp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_tcpstoplisten_t *ievent = NULL; + isc__netievent_tcpstop_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(!isc__nm_in_netthread()); - ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstoplisten); + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop); isc_nmsocket_attach(sock, &ievent->sock); isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); } void -isc__nm_async_tcpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_tcpstoplisten_t *ievent = - (isc__netievent_tcpstoplisten_t *) ievent0; +isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpstop_t *ievent = (isc__netievent_tcpstop_t *) ev0; isc_nmsocket_t *sock = ievent->sock; UNUSED(worker); @@ -517,10 +512,10 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, * If network manager is interlocked, re-enqueue the event for later. */ if (!isc__nm_acquire_interlocked(sock->mgr)) { - isc__netievent_tcpstoplisten_t *event = NULL; + isc__netievent_tcpstop_t *event = NULL; event = isc__nm_get_ievent(sock->mgr, - netievent_tcpstoplisten); + netievent_tcpstop); event->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); @@ -533,20 +528,18 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, static void stoplistening(isc_nmsocket_t *sock) { for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpstopchildlisten_t *event = NULL; + isc__netievent_tcpchildstop_t *event = NULL; /* * We can ignore the overhead of event allocation because * stoplistening is a rare event, and doing it this way * simplifies sock reference counting. */ - event = isc__nm_get_ievent(sock->mgr, - netievent_tcpstopchildlisten); + event = isc__nm_get_ievent(sock->mgr, netievent_tcpchildstop); isc_nmsocket_attach(&sock->children[i], &event->sock); if (i == sock->tid) { - isc__nm_async_tcpstopchildlisten( - &sock->mgr->workers[i], + isc__nm_async_tcpchildstop(&sock->mgr->workers[i], (isc__netievent_t *) event); isc__nm_put_ievent(sock->mgr, event); } else { @@ -564,11 +557,9 @@ stoplistening(isc_nmsocket_t *sock) { } void -isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_tcpstoplisten_t *ievent = - (isc__netievent_tcpstoplisten_t *) ievent0; +isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpchildstop_t *ievent = + (isc__netievent_tcpchildstop_t *) ev0; isc_nmsocket_t *sock = ievent->sock; UNUSED(worker); @@ -665,9 +656,9 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { } void -isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_startread_t *ievent = - (isc__netievent_startread_t *) ievent0; + (isc__netievent_startread_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(worker->id == isc_nm_tid()); @@ -711,9 +702,9 @@ isc_nm_pauseread(isc_nmsocket_t *sock) { } void -isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_pauseread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_pauseread_t *ievent = - (isc__netievent_pauseread_t *) ievent0; + (isc__netievent_pauseread_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(VALID_NMSOCK(sock)); @@ -957,9 +948,9 @@ tcp_send_cb(uv_write_t *req, int status) { * Handle 'tcpsend' async event - send a packet on the socket */ void -isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0) { isc_result_t result; - isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *) ievent0; + isc__netievent_tcpsend_t *ievent = (isc__netievent_tcpsend_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); @@ -1064,9 +1055,8 @@ isc__nm_tcp_close(isc_nmsocket_t *sock) { } void -isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ievent0) { - isc__netievent_tcpclose_t *ievent = - (isc__netievent_tcpclose_t *) ievent0; +isc__nm_async_tcpclose(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpclose_t *ievent = (isc__netievent_tcpclose_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 94fb021bc6..5048200aca 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -523,9 +523,9 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock) { } void -isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpdnsclose_t *ievent = - (isc__netievent_tcpdnsclose_t *) ievent0; + (isc__netievent_tcpdnsclose_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 102ff0dbda..abd950a279 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -115,9 +115,9 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, * handle 'udplisten' async call - start listening on a socket. */ void -isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udplisten_t *ievent = - (isc__netievent_udplisten_t *) ievent0; + (isc__netievent_udplisten_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(sock->type == isc_nm_udpsocket); @@ -173,14 +173,14 @@ stoplistening(isc_nmsocket_t *sock) { INSIST(sock->type == isc_nm_udplistener); for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_udpstoplisten_t *event = NULL; + isc__netievent_udpstop_t *event = NULL; if (i == sock->tid) { stop_udp_child(&sock->children[i]); continue; } - event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event = isc__nm_get_ievent(sock->mgr, netievent_udpstop); event->sock = &sock->children[i]; isc__nm_enqueue_ievent(&sock->mgr->workers[i], (isc__netievent_t *) event); @@ -198,7 +198,7 @@ stoplistening(isc_nmsocket_t *sock) { void isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { - isc__netievent_udpstoplisten_t *ievent = NULL; + isc__netievent_udpstop_t *ievent = NULL; /* We can't be launched from network thread, we'd deadlock */ REQUIRE(!isc__nm_in_netthread()); @@ -210,7 +210,7 @@ isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { * event. Otherwise, go ahead and stop listening right away. */ if (!isc__nm_acquire_interlocked(sock->mgr)) { - ievent = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + ievent = isc__nm_get_ievent(sock->mgr, netievent_udpstop); ievent->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) ievent); @@ -221,14 +221,11 @@ isc_nm_udp_stoplistening(isc_nmsocket_t *sock) { } /* - * handle 'udpstoplisten' async call - stop listening on a socket. + * handle 'udpstop' async call - stop listening on a socket. */ void -isc__nm_async_udpstoplisten(isc__networker_t *worker, - isc__netievent_t *ievent0) -{ - isc__netievent_udplisten_t *ievent = - (isc__netievent_udplisten_t *) ievent0; +isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_udpstop_t *ievent = (isc__netievent_udpstop_t *) ev0; isc_nmsocket_t *sock = ievent->sock; REQUIRE(sock->iface != NULL); @@ -248,7 +245,7 @@ isc__nm_async_udpstoplisten(isc__networker_t *worker, if (!isc__nm_acquire_interlocked(sock->mgr)) { isc__netievent_udplisten_t *event = NULL; - event = isc__nm_get_ievent(sock->mgr, netievent_udpstoplisten); + event = isc__nm_get_ievent(sock->mgr, netievent_udpstop); event->sock = sock; isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *) event); @@ -407,9 +404,9 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, * handle 'udpsend' async event - send a packet on the socket */ void -isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpsend_t *ievent = - (isc__netievent_udpsend_t *) ievent0; + (isc__netievent_udpsend_t *) ev0; REQUIRE(worker->id == ievent->sock->tid); From 83e54f906de5cc98bd0014ddbe73b7f316033221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 9 Dec 2019 13:18:05 +0100 Subject: [PATCH 17/17] CHANGES entry --- CHANGES | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES b/CHANGES index da00868330..52577f1f06 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,5 @@ +5335. [func] Make TCP listening code multithreaded. [GL !2659] + 5334. [doc] Update documentation with dnssec-policy clarifications. Also change some defaults.