diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4a8f10aff7..e882f3586b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -122,7 +122,11 @@ typedef enum isc__netievent_type { netievent_udpstoplisten, netievent_tcpstoplisten, netievent_tcpclose, - netievent_prio = 0xff, + netievent_prio = 0xff, /* event type values higher than this + * will be treated as high-priority + * events, which can be processed + * while the netmgr is paused. + */ netievent_udplisten, netievent_tcplisten, } isc__netievent_type; @@ -304,7 +308,7 @@ struct isc_nmsocket { isc_nm_t *mgr; isc_nmsocket_t *parent; - /* + /*% * quota is the TCP client, attached when a TCP connection * is established. pquota is a non-attached pointer to the * TCP client quota, stored in listening sockets but only @@ -314,7 +318,7 @@ struct isc_nmsocket { isc_quota_t *pquota; bool overquota; - /* + /*% * TCP read timeout timer. */ uv_timer_t timer; @@ -327,18 +331,18 @@ struct isc_nmsocket { /*% server socket for connections */ isc_nmsocket_t *server; - /*% children sockets for multi-socket setups */ + /*% Child sockets for multi-socket setups */ isc_nmsocket_t *children; int nchildren; isc_nmiface_t *iface; isc_nmhandle_t *tcphandle; - /* used to send listening TCP sockets to children */ + /*% Used to transfer listening TCP sockets to children */ uv_pipe_t ipc; char ipc_pipe_name[32]; atomic_int_fast32_t schildren; - /*% extra data allocated at the end of each isc_nmhandle_t */ + /*% Extra data allocated at the end of each isc_nmhandle_t */ size_t extrahandlesize; /*% TCP backlog */ @@ -348,16 +352,17 @@ struct isc_nmsocket { uv_os_sock_t fd; union uv_any_handle uv_handle; + /*% Peer address */ isc_sockaddr_t peer; /* Atomic */ - /*% Number of running (e.g. listening) children sockets */ + /*% Number of running (e.g. listening) child sockets */ atomic_int_fast32_t rchildren; /*% - * Socket if active if it's listening, working, etc., if we're - * closing a socket it doesn't make any sense to e.g. still - * push handles or reqs for reuse + * Socket is active if it's listening, working, etc. If it's + * closing, then it doesn't make a sense, for example, to + * push handles or reqs for reuse. */ atomic_bool active; atomic_bool destroying; @@ -365,7 +370,8 @@ struct isc_nmsocket { /*% * Socket is closed if it's not active and all the possible * callbacks were fired, there are no active handles, etc. - * active==false, closed==false means the socket is closing. + * If active==false but closed==false, that means the socket + * is closing. */ atomic_bool closed; atomic_bool listening; @@ -407,13 +413,17 @@ struct isc_nmsocket { isc_astack_t *inactivehandles; isc_astack_t *inactivereqs; - /* - * Used to wait for listening event to be done and active/rchildren - * during shutdown. + /*% + * Used to wait for TCP listening events to complete, and + * for the number of running children to reach zero during + * shutdown. */ isc_mutex_t lock; isc_condition_t cond; + /*% + * Used to pass a result back from TCP listening events. + */ isc_result_t result; /*% @@ -442,12 +452,12 @@ struct isc_nmsocket { size_t *ah_frees; isc_nmhandle_t **ah_handles; - /* Buffer for TCPDNS processing, optional */ + /*% Buffer for TCPDNS processing */ size_t buf_size; size_t buf_len; unsigned char *buf; - /* + /*% * This function will be called with handle->sock * as the argument whenever a handle's references drop * to zero, after its reset callback has been called. @@ -600,7 +610,8 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0); void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0); +isc__nm_async_tcpchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0); void isc__nm_async_tcpstoplisten(isc__networker_t *worker, isc__netievent_t *ievent0); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index f91651c009..ab9f956b22 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -186,20 +186,25 @@ nm_destroy(isc_nm_t **mgr0) { for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - /* Empty the async event queue */ - isc__netievent_t *ievent; + isc__netievent_t *ievent = NULL; + int r; + + /* Empty the async event queues */ while ((ievent = (isc__netievent_t *) isc_queue_dequeue(worker->ievents)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } + while ((ievent = (isc__netievent_t *) isc_queue_dequeue(worker->ievents_prio)) != NULL) { isc_mempool_put(mgr->evpool, ievent); } - int r = uv_loop_close(&worker->loop); + + r = uv_loop_close(&worker->loop); INSIST(r == 0); + isc_queue_destroy(worker->ievents); isc_queue_destroy(worker->ievents_prio); isc_thread_join(worker->thread, NULL); @@ -499,7 +504,7 @@ async_cb(uv_async_t *handle) { static void process_queue(isc__networker_t *worker, isc_queue_t *queue) { - isc__netievent_t *ievent; + isc__netievent_t *ievent = NULL; while ((ievent = (isc__netievent_t *) isc_queue_dequeue(queue)) != NULL) @@ -839,8 +844,8 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, * be random? */ strcpy(sock->ipc_pipe_name, NAMED_PIPE_PREFIX); - for (int i=strlen(sock->ipc_pipe_name); i<31; i++) { - sock->ipc_pipe_name[i] = isc_random8()%24 + 'a'; + for (int i = strlen(sock->ipc_pipe_name); i < 31; i++) { + sock->ipc_pipe_name[i] = isc_random8() % 24 + 'a'; } sock->ipc_pipe_name[31] = '\0'; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 63573d8921..22d21d57bb 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -54,7 +54,7 @@ tcp_close_cb(uv_handle_t *uvhandle); static void ipc_connection_cb(uv_stream_t *stream, int status); static void -ipc_write_cb(uv_write_t* uvreq, int status); +ipc_write_cb(uv_write_t *uvreq, int status); static void parent_pipe_close_cb(uv_handle_t *handle); static void @@ -68,7 +68,7 @@ tcp_listenclose_cb(uv_handle_t *handle); static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { - isc__networker_t *worker; + isc__networker_t *worker = NULL; int r; REQUIRE(isc__nm_in_netthread()); @@ -114,7 +114,7 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ievent0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc__nm_uvreq_t *req = (isc__nm_uvreq_t *) uvreq->data; - isc_nmsocket_t *sock; + isc_nmsocket_t *sock = NULL; sock = uv_handle_get_data((uv_handle_t *) uvreq->handle); REQUIRE(VALID_UVREQ(req)); @@ -152,6 +152,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t **sockp) { isc_nmsocket_t *nsock = NULL; + isc__netievent_tcplisten_t *ievent = NULL; REQUIRE(VALID_NM(mgr)); @@ -176,7 +177,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock->pquota = quota; } - isc__netievent_tcplisten_t *ievent; ievent = isc__nm_get_ievent(mgr, netievent_tcplisten); ievent->sock = nsock; if (isc__nm_in_netthread()) { @@ -204,11 +204,12 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, } /* - * For TCP listening we create a single socket, bind it, and then pass it - * to `ncpu` child sockets - the passing is done over IPC. + * For TCP listening, we create a single socket, bind it, and then + * pass it to `ncpu` child sockets - the passing is done over IPC. + * * XXXWPK This design pattern is ugly but it's "the way to do it" recommended * by libuv documentation - which also mentions that there should be - * uv_export/uv_import functions which would simplify this greatly. + * uv_export/uv_import functions, which would simplify this greatly. */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { @@ -243,19 +244,21 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { /* It was never opened */ atomic_store(&sock->closed, true); sock->result = isc__nm_uverr2result(r); - goto fini; + goto done; } r = uv_tcp_bind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, 0); if (r != 0) { uv_close(&sock->uv_handle.handle, tcp_close_cb); sock->result = isc__nm_uverr2result(r); - goto fini; + goto done; } uv_handle_set_data(&sock->uv_handle.handle, sock); + /* * This is not properly documented in libuv, and the example * (benchmark-multi-accept) is wrong: + * * 'ipc' parameter must be '0' for 'listening' IPC socket, '1' * only for the sockets are really passing the FDs between * threads. This works without any problems on Unices, but @@ -263,9 +266,11 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ r = uv_pipe_init(&worker->loop, &sock->ipc, 0); INSIST(r == 0); + uv_handle_set_data((uv_handle_t *)&sock->ipc, sock); r = uv_pipe_bind(&sock->ipc, sock->ipc_pipe_name); INSIST(r == 0); + r = uv_listen((uv_stream_t *) &sock->ipc, sock->nchildren, ipc_connection_cb); INSIST(r == 0); @@ -276,8 +281,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { */ for (int i = 0; i < sock->nchildren; i++) { isc_nmsocket_t *csock = &sock->children[i]; + isc__netievent_tcpchildlisten_t *event = NULL; - isc__netievent_tcpchildlisten_t *event; event = isc__nm_get_ievent(csock->mgr, netievent_tcpchildlisten); event->sock = csock; @@ -293,21 +298,25 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ievent0) { atomic_store(&sock->listening, true); -fini: + done: LOCK(&sock->lock); SIGNAL(&sock->cond); UNLOCK(&sock->lock); return; } -/* Parent got an IPC connection from child */ +/* + * Parent received an IPC connection from child + */ static void ipc_connection_cb(uv_stream_t *stream, int status) { - int r; - REQUIRE(status == 0); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); isc__networker_t *worker = &sock->mgr->workers[isc_nm_tid()]; isc__nm_uvreq_t *nreq = isc__nm_uvreq_get(sock->mgr, sock); + int r; + + REQUIRE(status == 0); + /* * The buffer can be anything, it will be ignored, but it has to * be something that won't disappear. @@ -318,30 +327,32 @@ ipc_connection_cb(uv_stream_t *stream, int status) { /* Failure here is critical */ r = uv_accept((uv_stream_t *) &sock->ipc, - (uv_stream_t*) &nreq->pipe); + (uv_stream_t *) &nreq->pipe); INSIST(r == 0); + r = uv_write2(&nreq->uv_req.write, - (uv_stream_t*) &nreq->pipe, - &nreq->uvbuf, - 1, - (uv_stream_t*) &sock->uv_handle.stream, + (uv_stream_t *) &nreq->pipe, &nreq->uvbuf, 1, + (uv_stream_t *) &sock->uv_handle.stream, ipc_write_cb); INSIST(r == 0); } static void -ipc_write_cb(uv_write_t* uvreq, int status) { - UNUSED(status); +ipc_write_cb(uv_write_t *uvreq, int status) { isc__nm_uvreq_t *req = uvreq->data; + + UNUSED(status); + /* - * We want all children to get the socket. If we're done we can stop - * listening on the IPC socket. + * We want all children to get the socket. If we're done, we + * can stop listening on the IPC socket. */ if (atomic_fetch_add(&req->sock->schildren, 1) == - req->sock->nchildren - 1) { - uv_close((uv_handle_t*) &req->sock->ipc, NULL); + req->sock->nchildren - 1) + { + uv_close((uv_handle_t *) &req->sock->ipc, NULL); } - uv_close((uv_handle_t*) &req->pipe, parent_pipe_close_cb); + uv_close((uv_handle_t *) &req->pipe, parent_pipe_close_cb); } static void @@ -351,10 +362,13 @@ parent_pipe_close_cb(uv_handle_t *handle) { } void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0) { +isc__nm_async_tcpchildlisten(isc__networker_t *worker, + isc__netievent_t *ievent0) +{ isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *) ievent0; isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *req = NULL; int r; REQUIRE(isc__nm_in_netthread()); @@ -362,48 +376,58 @@ isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ievent0 r = uv_pipe_init(&worker->loop, &sock->ipc, 1); INSIST(r == 0); - uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); - isc__nm_uvreq_t * req = isc__nm_uvreq_get(sock->mgr, sock); - uv_pipe_connect(&req->uv_req.connect, - &sock->ipc, + uv_handle_set_data((uv_handle_t *) &sock->ipc, sock); + + req = isc__nm_uvreq_get(sock->mgr, sock); + uv_pipe_connect(&req->uv_req.connect, &sock->ipc, sock->parent->ipc_pipe_name, childlisten_ipc_connect_cb); } -/* child connected to parent over IPC */ +/* Child connected to parent over IPC */ static void childlisten_ipc_connect_cb(uv_connect_t *uvreq, int status) { - UNUSED(status); isc__nm_uvreq_t *req = uvreq->data; isc_nmsocket_t *sock = req->sock; + int r; + + UNUSED(status); + isc__nm_uvreq_put(&req, sock); - int r = uv_read_start((uv_stream_t*) &sock->ipc, - isc__nm_alloc_cb, - childlisten_read_cb); + + r = uv_read_start((uv_stream_t *) &sock->ipc, isc__nm_alloc_cb, + childlisten_read_cb); INSIST(r == 0); } -/* child got the socket over IPC */ +/* Child received the socket via IPC */ static void childlisten_read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - UNUSED(nread); - int r; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); + isc__networker_t *worker = NULL; + uv_pipe_t *ipc = NULL; + uv_handle_type type; + int r; + + UNUSED(nread); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); - uv_pipe_t* ipc = (uv_pipe_t*) stream; - uv_handle_type type = uv_pipe_pending_type(ipc); + + ipc = (uv_pipe_t *) stream; + type = uv_pipe_pending_type(ipc); INSIST(type == UV_TCP); + isc__nm_free_uvbuf(sock, buf); - isc__networker_t * worker = &sock->mgr->workers[isc_nm_tid()]; - uv_tcp_init(&worker->loop, (uv_tcp_t*) &sock->uv_handle.tcp); + worker = &sock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, (uv_tcp_t *) &sock->uv_handle.tcp); uv_handle_set_data(&sock->uv_handle.handle, sock); + uv_accept(stream, &sock->uv_handle.stream); r = uv_listen((uv_stream_t *) &sock->uv_handle.tcp, sock->backlog, tcp_connection_cb); - uv_close((uv_handle_t*) ipc, NULL); + uv_close((uv_handle_t *) ipc, NULL); if (r != 0) { /* XXX log it? */ return; @@ -458,19 +482,21 @@ isc__nm_async_tcpstoplisten(isc__networker_t *worker, static void stoplistening(isc_nmsocket_t *sock) { for (int i = 0; i < sock->nchildren; i++) { + isc__netievent_tcpstopchildlisten_t *event = NULL; + /* - * Stoplistening is a rare event, we can ignore the overhead - * caused by allocating an event, and doing it this way + * We can ignore the overhead of event allocation because + * stoplistening is a rare event, and doing it this way * simplifies sock reference counting. */ - isc__netievent_tcpstopchildlisten_t *event = NULL; event = isc__nm_get_ievent(sock->mgr, netievent_tcpstopchildlisten); isc_nmsocket_attach(&sock->children[i], &event->sock); if (i == sock->tid) { - isc__nm_async_tcpstopchildlisten(&sock->mgr->workers[i], - (isc__netievent_t *) event); + isc__nm_async_tcpstopchildlisten( + &sock->mgr->workers[i], + (isc__netievent_t *) event); isc__nm_put_ievent(sock->mgr, event); } else { isc__nm_enqueue_ievent(&sock->mgr->workers[i], @@ -502,8 +528,8 @@ isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, REQUIRE(sock->parent != NULL); /* - * rchildren is atomic but we still need to change it - * under a lock as the parent is waiting on conditional + * rchildren is atomic, but we still need to change it + * under a lock because the parent is waiting on conditional * and without it we might deadlock. */ LOCK(&sock->parent->lock); @@ -515,19 +541,22 @@ isc__nm_async_tcpstopchildlisten(isc__networker_t *worker, } /* - * This callback is used for closing child and parent listening sockets - - * that's why we need to choose the proper lock. + * This callback is used for closing both child and parent listening + * sockets; that's why we need to choose the proper lock. */ static void tcp_listenclose_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc_mutex_t * lock = (sock->parent != NULL) ? - &sock->parent->lock : &sock->lock; + isc_mutex_t *lock = ((sock->parent != NULL) + ? &sock->parent->lock + : &sock->lock); + LOCK(lock); atomic_store(&sock->closed, true); atomic_store(&sock->listening, false); sock->pquota = NULL; UNLOCK(lock); + isc_nmsocket_detach(&sock); } @@ -675,7 +704,7 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { - isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t*) stream); + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *) stream); REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); @@ -790,7 +819,7 @@ accept_connection(isc_nmsocket_t *ssock) { static void tcp_connection_cb(uv_stream_t *server, int status) { - isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t*) server); + isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *) server); isc_result_t result; UNUSED(status); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index cf3456c175..2b719e8191 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -86,13 +86,13 @@ timer_close_cb(uv_handle_t *handle) { static void dnstcp_readtimeout(uv_timer_t *timer) { - isc_nmsocket_t *sock; - sock = (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *)timer); + isc_nmsocket_t *sock = + (isc_nmsocket_t *) uv_handle_get_data((uv_handle_t *) timer); REQUIRE(VALID_NMSOCK(sock)); isc_nmsocket_detach(&sock->outer); - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); } /* @@ -494,5 +494,5 @@ isc__nm_tcpdns_close(isc_nmsocket_t *sock) { if (sock->outer != NULL) { isc_nmsocket_detach(&sock->outer); } - uv_close((uv_handle_t*) &sock->timer, timer_close_cb); + uv_close((uv_handle_t *) &sock->timer, timer_close_cb); } diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 97df0ef9ec..102ff0dbda 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -339,7 +339,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nmsocket_t *psock = NULL, *rsock = NULL; isc_nmsocket_t *sock = handle->sock; isc_sockaddr_t *peer = &handle->peer; - isc__netievent_udpsend_t *ievent; + isc__netievent_udpsend_t *ievent = NULL; isc__nm_uvreq_t *uvreq = NULL; int ntid; uint32_t maxudp = atomic_load(&sock->mgr->maxudp);