From 5fcd52209a9eeb7c6575b77c3bd4070ad9ba1c2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Mon, 26 Oct 2020 17:31:55 +0100 Subject: [PATCH] Refactor udp_recv_cb() - more logical code flow. - propagate errors back to the caller. - add a 'reading' flag and call the callback from failed_read_cb() only when it the socket was actively reading. --- lib/isc/netmgr/netmgr-int.h | 1 + lib/isc/netmgr/udp.c | 112 +++++++++++++++++++----------------- 2 files changed, 61 insertions(+), 52 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 56271e733d..28b62853d4 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -471,6 +471,7 @@ struct isc_nmsocket { atomic_bool connecting; atomic_bool connected; atomic_bool connect_error; + atomic_bool reading; isc_refcount_t references; /*% diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 716303f2d6..5d9faf305a 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -47,6 +47,9 @@ udp_close_cb(uv_handle_t *uvhandle); static void udp_close_direct(isc_nmsocket_t *sock); +static void +failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); + isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { @@ -205,6 +208,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { &(int){ ISC_SEND_BUFFER_SIZE }); #endif uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb); + atomic_store(&sock->reading, true); } static void @@ -221,6 +225,7 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->tid == isc_nm_tid()); uv_udp_recv_stop(&sock->uv_handle.udp); + atomic_store(&sock->reading, false); uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); @@ -337,24 +342,22 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc_result_t result; isc_nmhandle_t *nmhandle = NULL; isc_sockaddr_t sockaddr; - isc_nmsocket_t *sock = NULL; + isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_region_t region; uint32_t maxudp; - bool free_buf = true; + bool free_buf; isc_nm_recv_cb_t cb; void *cbarg; - bool connected; - /* - * Even though destruction of the socket can only happen from the - * network thread that we're in, we still attach to the socket here - * to ensure it won't be destroyed by the recv callback. - */ - isc__nmsocket_attach(uv_handle_get_data((uv_handle_t *)handle), &sock); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->tid == isc_nm_tid()); -#ifdef UV_UDP_MMSG_CHUNK +#ifdef UV_UDP_MMSG_FREE + free_buf = ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE); +#elif UV_UDP_MMSG_CHUNK free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0); #else + free_buf = true; UNUSED(flags); #endif @@ -362,56 +365,56 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, * Three possible reasons to return now without processing: * - If addr == NULL, in which case it's the end of stream; * we can free the buffer and bail. + */ + if (addr == NULL) { + goto done; + } + /* * - If we're simulating a firewall blocking UDP packets * bigger than 'maxudp' bytes for testing purposes. - * - If the socket is no longer active. */ maxudp = atomic_load(&sock->mgr->maxudp); - if ((addr == NULL) || (maxudp != 0 && (uint32_t)nrecv > maxudp) || - (!isc__nmsocket_active(sock))) - { - if (free_buf) { - isc__nm_free_uvbuf(sock, buf); - } - isc__nmsocket_detach(&sock); - return; + if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) { + goto done; + } + /* + * - If the socket is no longer active. + */ + if (!isc__nmsocket_active(sock)) { + goto done; } - result = isc_sockaddr_fromsockaddr(&sockaddr, addr); - RUNTIME_CHECK(result == ISC_R_SUCCESS); - connected = atomic_load(&sock->connected); - - if (!connected) { - nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); - } else { - nmhandle = sock->statichandle; - } region.base = (unsigned char *)buf->base; region.length = nrecv; - INSIST(sock->tid == isc_nm_tid()); - INSIST(sock->recv_cb != NULL); - cb = sock->recv_cb; cbarg = sock->recv_cbarg; - cb(nmhandle, ISC_R_SUCCESS, ®ion, cbarg); + if (atomic_load(&sock->client)) { + if (nrecv < 0) { + failed_read_cb(sock, isc__nm_uverr2result(nrecv)); + return; + } - if (free_buf) { - isc__nm_free_uvbuf(sock, buf); + cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); + } else { + result = isc_sockaddr_fromsockaddr(&sockaddr, addr); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + + nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); + + cb(nmhandle, ISC_R_SUCCESS, ®ion, cbarg); + + /* + * If the recv callback wants to hold on to the handle, + * it needs to attach to it. + */ + isc_nmhandle_detach(&nmhandle); } - /* - * The sock is now attached to the handle, we can detach our ref. - */ - isc__nmsocket_detach(&sock); - - /* - * If the recv callback wants to hold on to the handle, - * it needs to attach to it. - */ - if (!connected) { - isc_nmhandle_detach(&nmhandle); +done: + if (free_buf) { + isc__nm_free_uvbuf(sock, buf); } } @@ -821,6 +824,7 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, } udp_recv_cb(handle, nrecv, buf, addr, flags); uv_udp_recv_stop(&sock->uv_handle.udp); + atomic_store(&sock->reading, false); } static void @@ -831,19 +835,22 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); - uv_udp_recv_stop(&sock->uv_handle.udp); - if (sock->timer_initialized) { uv_timer_stop(&sock->timer); sock->timer_running = false; } - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + uv_udp_recv_stop(&sock->uv_handle.udp); - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); + if (atomic_compare_exchange_strong(&sock->reading, &(bool){ true }, + false)) { + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); + + if (cb != NULL) { + cb(sock->statichandle, result, NULL, cbarg); + } } } @@ -882,6 +889,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { } uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb); + atomic_store(&sock->reading, true); } isc_result_t