2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-04 16:45:24 +00:00

Refactor udp_recv_cb()

- more logical code flow.
- propagate errors back to the caller.
- add a 'reading' flag and call the callback from failed_read_cb()
  only when it the socket was actively reading.
This commit is contained in:
Ondřej Surý
2020-10-26 17:31:55 +01:00
committed by Ondřej Surý
parent cdccac4993
commit 5fcd52209a
2 changed files with 61 additions and 52 deletions

View File

@@ -471,6 +471,7 @@ struct isc_nmsocket {
atomic_bool connecting; atomic_bool connecting;
atomic_bool connected; atomic_bool connected;
atomic_bool connect_error; atomic_bool connect_error;
atomic_bool reading;
isc_refcount_t references; isc_refcount_t references;
/*% /*%

View File

@@ -47,6 +47,9 @@ udp_close_cb(uv_handle_t *uvhandle);
static void static void
udp_close_direct(isc_nmsocket_t *sock); udp_close_direct(isc_nmsocket_t *sock);
static void
failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
isc_result_t isc_result_t
isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) { void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
@@ -205,6 +208,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
&(int){ ISC_SEND_BUFFER_SIZE }); &(int){ ISC_SEND_BUFFER_SIZE });
#endif #endif
uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb); uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb);
atomic_store(&sock->reading, true);
} }
static void static void
@@ -221,6 +225,7 @@ stop_udp_child(isc_nmsocket_t *sock) {
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
uv_udp_recv_stop(&sock->uv_handle.udp); uv_udp_recv_stop(&sock->uv_handle.udp);
atomic_store(&sock->reading, false);
uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb); uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
@@ -337,24 +342,22 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
isc_result_t result; isc_result_t result;
isc_nmhandle_t *nmhandle = NULL; isc_nmhandle_t *nmhandle = NULL;
isc_sockaddr_t sockaddr; isc_sockaddr_t sockaddr;
isc_nmsocket_t *sock = NULL; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
isc_region_t region; isc_region_t region;
uint32_t maxudp; uint32_t maxudp;
bool free_buf = true; bool free_buf;
isc_nm_recv_cb_t cb; isc_nm_recv_cb_t cb;
void *cbarg; void *cbarg;
bool connected;
/* REQUIRE(VALID_NMSOCK(sock));
* Even though destruction of the socket can only happen from the REQUIRE(sock->tid == isc_nm_tid());
* network thread that we're in, we still attach to the socket here
* to ensure it won't be destroyed by the recv callback.
*/
isc__nmsocket_attach(uv_handle_get_data((uv_handle_t *)handle), &sock);
#ifdef UV_UDP_MMSG_CHUNK #ifdef UV_UDP_MMSG_FREE
free_buf = ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE);
#elif UV_UDP_MMSG_CHUNK
free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0); free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0);
#else #else
free_buf = true;
UNUSED(flags); UNUSED(flags);
#endif #endif
@@ -362,56 +365,56 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
* Three possible reasons to return now without processing: * Three possible reasons to return now without processing:
* - If addr == NULL, in which case it's the end of stream; * - If addr == NULL, in which case it's the end of stream;
* we can free the buffer and bail. * we can free the buffer and bail.
*/
if (addr == NULL) {
goto done;
}
/*
* - If we're simulating a firewall blocking UDP packets * - If we're simulating a firewall blocking UDP packets
* bigger than 'maxudp' bytes for testing purposes. * bigger than 'maxudp' bytes for testing purposes.
* - If the socket is no longer active.
*/ */
maxudp = atomic_load(&sock->mgr->maxudp); maxudp = atomic_load(&sock->mgr->maxudp);
if ((addr == NULL) || (maxudp != 0 && (uint32_t)nrecv > maxudp) || if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
(!isc__nmsocket_active(sock))) goto done;
{ }
if (free_buf) { /*
isc__nm_free_uvbuf(sock, buf); * - If the socket is no longer active.
} */
isc__nmsocket_detach(&sock); if (!isc__nmsocket_active(sock)) {
return; goto done;
} }
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
connected = atomic_load(&sock->connected);
if (!connected) {
nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
} else {
nmhandle = sock->statichandle;
}
region.base = (unsigned char *)buf->base; region.base = (unsigned char *)buf->base;
region.length = nrecv; region.length = nrecv;
INSIST(sock->tid == isc_nm_tid());
INSIST(sock->recv_cb != NULL);
cb = sock->recv_cb; cb = sock->recv_cb;
cbarg = sock->recv_cbarg; cbarg = sock->recv_cbarg;
cb(nmhandle, ISC_R_SUCCESS, &region, cbarg); if (atomic_load(&sock->client)) {
if (nrecv < 0) {
failed_read_cb(sock, isc__nm_uverr2result(nrecv));
return;
}
if (free_buf) { cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
isc__nm_free_uvbuf(sock, buf); } else {
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
/*
* If the recv callback wants to hold on to the handle,
* it needs to attach to it.
*/
isc_nmhandle_detach(&nmhandle);
} }
/* done:
* The sock is now attached to the handle, we can detach our ref. if (free_buf) {
*/ isc__nm_free_uvbuf(sock, buf);
isc__nmsocket_detach(&sock);
/*
* If the recv callback wants to hold on to the handle,
* it needs to attach to it.
*/
if (!connected) {
isc_nmhandle_detach(&nmhandle);
} }
} }
@@ -821,6 +824,7 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
} }
udp_recv_cb(handle, nrecv, buf, addr, flags); udp_recv_cb(handle, nrecv, buf, addr, flags);
uv_udp_recv_stop(&sock->uv_handle.udp); uv_udp_recv_stop(&sock->uv_handle.udp);
atomic_store(&sock->reading, false);
} }
static void static void
@@ -831,19 +835,22 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->statichandle != NULL); REQUIRE(sock->statichandle != NULL);
uv_udp_recv_stop(&sock->uv_handle.udp);
if (sock->timer_initialized) { if (sock->timer_initialized) {
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
sock->timer_running = false; sock->timer_running = false;
} }
cb = sock->recv_cb; uv_udp_recv_stop(&sock->uv_handle.udp);
cbarg = sock->recv_cbarg;
isc__nmsocket_clearcb(sock);
if (cb != NULL) { if (atomic_compare_exchange_strong(&sock->reading, &(bool){ true },
cb(sock->statichandle, result, NULL, cbarg); false)) {
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
isc__nmsocket_clearcb(sock);
if (cb != NULL) {
cb(sock->statichandle, result, NULL, cbarg);
}
} }
} }
@@ -882,6 +889,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
} }
uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb); uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb);
atomic_store(&sock->reading, true);
} }
isc_result_t isc_result_t