diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 28b62853d4..2124815f8d 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -471,7 +471,7 @@ struct isc_nmsocket { atomic_bool connecting; atomic_bool connected; atomic_bool connect_error; - atomic_bool reading; + bool accepting; isc_refcount_t references; /*% @@ -719,7 +719,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, * Back-end implementation of isc_nm_send() for UDP handles. */ -isc_result_t +void isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); /* * Back-end implementation of isc_nm_read() for UDP handles. diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index b13847cc77..fb511f198b 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1913,8 +1913,8 @@ nmsocket_dump(isc_nmsocket_t *sock) { fprintf(stderr, "Active socket %p, type %s, refs %lu\n", sock, nmsocket_type_totext(sock->type), isc_refcount_current(&sock->references)); - fprintf(stderr, "Parent %p, listener %p\n", sock->parent, - sock->listener); + fprintf(stderr, "Parent %p, listener %p, server %p\n", sock->parent, + sock->listener, sock->server); fprintf(stderr, "Created by:\n"); backtrace_symbols_fd(sock->backtrace, sock->backtrace_size, STDERR_FILENO); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 2147e5ad15..a74f1ed992 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -77,6 +77,42 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota); static void quota_accept_cb(isc_quota_t *quota, void *sock0); +static void +failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); + +static void +failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { + /* + * Detach the quota early to make room for other connections; + * otherwise it'd be detached later asynchronously, and clog + * the quota unnecessarily. + */ + if (sock->quota != NULL) { + isc_quota_detach(&sock->quota); + } + + if (!sock->accepting) { + return; + } + sock->accepting = false; + + switch (eresult) { + case ISC_R_NOTCONNECTED: + /* IGNORE: The client disconnected before we could accept */ + break; + default: + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "Accepting TCP connection failed: %s", + isc_result_totext(eresult)); + } + + /* + * Detach the socket properly to make sure uv_close() is called. + */ + isc__nmsocket_detach(&sock); +} + static void failed_connect_cb(isc_nmsocket_t *sock, isc_result_t eresult) { isc__nm_uvreq_t *req; @@ -87,10 +123,11 @@ failed_connect_cb(isc_nmsocket_t *sock, isc_result_t eresult) { sock->timer_running = false; } - if (!sock->connecting) { + if (!atomic_load(&sock->connecting)) { return; } - sock->connecting = false; + + atomic_store(&sock->connecting, false); req = uv_handle_get_data((uv_handle_t *)&sock->timer); @@ -131,7 +168,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { sock->timer_initialized = true; } - sock->connecting = true; + atomic_store(&sock->connecting, true); r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { @@ -225,10 +262,11 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { sock->timer_running = false; } - if (!sock->connecting) { + if (!atomic_load(&sock->connecting)) { return; } - sock->connecting = false; + + atomic_store(&sock->connecting, false); REQUIRE(VALID_UVREQ(req)); @@ -514,6 +552,23 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); + if (!sock->accepting) { + return; + } + + /* Socket was closed midflight by isc__nm_tcp_shutdown() */ + if (!isc__nmsocket_active(sock)) { + failed_accept_cb(sock, ISC_R_CANCELED); + return; + } + + INSIST(sock->server != NULL); + + if (!isc__nmsocket_active(sock->server)) { + failed_accept_cb(sock, ISC_R_CANCELED); + return; + } + sock->quota = ievent->quota; ievent->quota = NULL; @@ -553,6 +608,7 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { if (result != ISC_R_SUCCESS) { goto error; } + sock->accepting = false; handle = isc__nmhandle_get(sock, NULL, &local); @@ -576,30 +632,7 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { return; error: - /* - * Detach the quota early to make room for other connections; - * otherwise it'd be detached later asynchronously, and clog - * the quota unnecessarily. - */ - if (sock->quota != NULL) { - isc_quota_detach(&sock->quota); - } - - switch (result) { - case ISC_R_NOTCONNECTED: - /* IGNORE: The client disconnected before we could accept */ - break; - default: - isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, - "Accepting TCP connection failed: %s", - isc_result_totext(result)); - } - - /* - * Detach the socket properly to make sure uv_close() is called. - */ - isc__nmsocket_detach(&sock); + failed_accept_cb(sock, result); } void @@ -666,8 +699,6 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); - uv_read_stop(&sock->uv_handle.stream); - if (sock->timer_initialized) { uv_timer_stop(&sock->timer); sock->timer_running = false; @@ -677,6 +708,8 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { isc_quota_detach(&sock->quota); } + uv_read_stop(&sock->uv_handle.stream); + cb = sock->recv_cb; cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); @@ -699,6 +732,7 @@ readtimeout_cb(uv_timer_t *handle) { */ if (atomic_load(&sock->processing)) { uv_timer_start(handle, readtimeout_cb, sock->read_timeout, 0); + sock->timer_running = true; return; } @@ -722,6 +756,18 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { return; } + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, NULL, cbarg); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, NULL, cbarg); + return; + } + REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; @@ -774,6 +820,21 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(worker->id == isc_nm_tid()); + if (!isc__nmsocket_active(sock)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + r = uv_read_start(&sock->uv_handle.stream, tcp_alloc_cb, read_cb); if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); @@ -894,6 +955,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { /* The timer will be updated */ uv_timer_start(&sock->timer, readtimeout_cb, sock->read_timeout, 0); + sock->timer_running = true; } } else { /* @@ -1053,6 +1115,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { isc__nmsocket_attach(ssock, &csock->server); csock->accept_cb = ssock->accept_cb; csock->accept_cbarg = ssock->accept_cbarg; + csock->accepting = true; event->sock = csock; event->quota = quota; @@ -1086,6 +1149,18 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, return; } + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, cbarg); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, cbarg); + return; + } + uvreq = isc__nm_uvreq_get(sock->mgr, sock); uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.len = region->length; @@ -1172,6 +1247,12 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (!isc__nmsocket_active(sock)) { return (ISC_R_CANCELED); } + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + return (ISC_R_CANCELED); + } + if (atomic_load(&sock->mgr->closing)) { + return (ISC_R_CANCELED); + } r = uv_write(&req->uv_req.write, &sock->uv_handle.stream, &req->uvbuf, 1, tcp_send_cb); @@ -1269,6 +1350,15 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); + if (sock->type != isc_nm_tcpsocket) { + return; + } + + if (atomic_load(&sock->connecting)) { + failed_connect_cb(sock, ISC_R_CANCELED); + return; + } + /* * If the socket is active, mark it inactive and * continue. If it isn't active, stop now. @@ -1277,12 +1367,12 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { return; } - if (sock->connecting) { - failed_connect_cb(sock, ISC_R_CANCELED); + if (sock->accepting) { + failed_accept_cb(sock, ISC_R_CANCELED); return; } - if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) { + if (sock->statichandle != NULL) { failed_read_cb(sock, ISC_R_CANCELED); } } @@ -1296,6 +1386,7 @@ isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { sock = handle->sock; + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpcancel); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 5d9faf305a..2f604b6a9b 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -50,6 +50,10 @@ udp_close_direct(isc_nmsocket_t *sock); static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); +static void +failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); + isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { @@ -208,7 +212,6 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { &(int){ ISC_SEND_BUFFER_SIZE }); #endif uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb); - atomic_store(&sock->reading, true); } static void @@ -225,7 +228,7 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->tid == isc_nm_tid()); uv_udp_recv_stop(&sock->uv_handle.udp); - atomic_store(&sock->reading, false); + uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); @@ -273,14 +276,6 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udplistener); - /* - * If the socket is active, mark it inactive and - * continue. If it isn't active, stop now. - */ - if (!isc__nmsocket_deactivate(sock)) { - return; - } - /* * If the manager is interlocked, re-enqueue this as an asynchronous * event. Otherwise, go ahead and stop listening right away. @@ -440,6 +435,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, return; } + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, cbarg); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, cbarg); + return; + } + /* * We're simulating a firewall blocking UDP packets bigger than * 'maxudp' bytes, for testing purposes. @@ -580,6 +587,12 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, if (!isc__nmsocket_active(sock)) { return (ISC_R_CANCELED); } + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + return (ISC_R_CANCELED); + } + if (atomic_load(&sock->mgr->closing)) { + return (ISC_R_CANCELED); + } #ifdef HAVE_UV_UDP_CONNECT /* @@ -701,13 +714,17 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { r = udp_connect_direct(sock, req); if (r != 0) { - result = isc__nm_uverr2result(r); - } else { - atomic_store(&sock->connected, true); - atomic_store(&sock->result, ISC_R_SUCCESS); - result = atomic_load(&sock->result); + failed_connect_cb(sock, req, isc__nm_uverr2result(r)); + LOCK(&sock->lock); + SIGNAL(&sock->cond); + UNLOCK(&sock->lock); + return; } + atomic_store(&sock->connected, true); + atomic_store(&sock->result, ISC_R_SUCCESS); + result = atomic_load(&sock->result); + handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); cb(handle, result, cbarg); @@ -821,10 +838,10 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, if (sock->timer_running) { uv_timer_stop(&sock->timer); + sock->timer_running = false; } udp_recv_cb(handle, nrecv, buf, addr, flags); uv_udp_recv_stop(&sock->uv_handle.udp); - atomic_store(&sock->reading, false); } static void @@ -842,15 +859,12 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { uv_udp_recv_stop(&sock->uv_handle.udp); - if (atomic_compare_exchange_strong(&sock->reading, &(bool){ true }, - false)) { - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); - } + if (cb != NULL) { + cb(sock->statichandle, result, NULL, cbarg); } } @@ -876,6 +890,21 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; + if (!isc__nmsocket_active(sock)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + failed_read_cb(sock, ISC_R_CANCELED); + return; + } + REQUIRE(worker->id == isc_nm_tid()); if (sock->read_timeout != 0) { if (!sock->timer_initialized) { @@ -889,19 +918,34 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { } uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb); - atomic_store(&sock->reading, true); } -isc_result_t +void isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { - isc_nmsocket_t *sock = NULL; + isc_nmsocket_t *sock = handle->sock; isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(handle->sock->type == isc_nm_udpsocket); - sock = handle->sock; + if (!isc__nmsocket_active(sock)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); + cb(handle, ISC_R_CANCELED, NULL, cbarg); + return; + } + + if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, NULL, cbarg); + return; + } + + if (atomic_load(&sock->mgr->closing)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + cb(handle, ISC_R_CANCELED, NULL, cbarg); + return; + } REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; @@ -918,8 +962,6 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } - - return (ISC_R_SUCCESS); } static void @@ -988,37 +1030,60 @@ isc__nm_udp_close(isc_nmsocket_t *sock) { } } +static void +failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(sock->tid == isc_nm_tid()); + + if (sock->timer_running) { + uv_timer_stop(&sock->timer); + sock->timer_running = false; + } + + if (!atomic_load(&sock->connecting)) { + return; + } + + atomic_store(&sock->connecting, false); + + INSIST(req != NULL); + + req = uv_handle_get_data((uv_handle_t *)&sock->timer); + + isc__nmsocket_clearcb(sock); + + if (req->cb.connect != NULL) { + req->cb.connect(NULL, eresult, req->cbarg); + } + req->cb.connect = NULL; + req->cbarg = NULL; + + isc__nmsocket_detach(&sock); +} + void isc__nm_udp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); + if (sock->type != isc_nm_udpsocket) { + return; + } + if (atomic_load(&sock->connecting)) { - isc__nm_uvreq_t *req = NULL; + failed_connect_cb(sock, NULL, ISC_R_CANCELED); + return; + } - atomic_store(&sock->connecting, false); - req = uv_handle_get_data((uv_handle_t *)&sock->timer); - uv_timer_stop(&sock->timer); - sock->timer_running = false; - - isc__nmsocket_clearcb(sock); - if (sock->connect_cb != NULL) { - sock->connect_cb(NULL, ISC_R_CANCELED, - sock->connect_cbarg); - } - - isc__nm_uvreq_put(&req, sock); - isc__nmsocket_detach(&sock); - } else if (sock->type == isc_nm_udpsocket && sock->statichandle != NULL) - { - /* - * If the socket is active, mark it inactive and - * continue. If it isn't active, stop now. - */ - if (!isc__nmsocket_deactivate(sock)) { - return; - } + /* + * If the socket is active, mark it inactive and + * continue. If it isn't active, stop now. + */ + if (!isc__nmsocket_deactivate(sock)) { + return; + } + if (sock->statichandle != NULL) { failed_read_cb(sock, ISC_R_CANCELED); } }