2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 22:45:39 +00:00

Prevent unexpected UDP client read callbacks

The network manager UDP code was misinterpreting when the libuv called
the udp_recv_cb with nrecv == 0 and addr == NULL -> this doesn't really
mean that the "stream" has ended, but the libuv indicates that the
receive buffer can be freed.  This could lead to assertion failure in
the code that calls isc_nm_read() from the network manager read callback
due to the extra spurious callbacks.

Properly handle the extra callback calls from the libuv in the client
read callback, and refactor the UDP isc_nm_read() implementation to be
synchronous, so no datagram is lost between the time that we stop the
reading from the UDP socket and we restart it again in the asychronous
udpread event.

Add a unit test that tests the isc_nm_read() call from the read
callback to receive two datagrams.
This commit is contained in:
Ondřej Surý
2022-09-15 09:48:34 +02:00
parent 94b32f2e0b
commit eac8bc5c1a
4 changed files with 218 additions and 90 deletions

View File

@@ -297,7 +297,6 @@ typedef enum isc__netievent_type {
netievent_udplisten, netievent_udplisten,
netievent_udpstop, netievent_udpstop,
netievent_udpread,
netievent_tcplisten, netievent_tcplisten,
netievent_tcpstop, netievent_tcpstop,
@@ -1313,8 +1312,6 @@ void
isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0); isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void void
isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0); isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0);
/*%< /*%<
* Callback handlers for asynchronous UDP events (listen, stoplisten, send). * Callback handlers for asynchronous UDP events (listen, stoplisten, send).
*/ */
@@ -1834,7 +1831,6 @@ NETIEVENT_SOCKET_TYPE(tlsstartread);
NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel); NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel);
NETIEVENT_SOCKET_TYPE(udplisten); NETIEVENT_SOCKET_TYPE(udplisten);
NETIEVENT_SOCKET_TYPE(udpstop); NETIEVENT_SOCKET_TYPE(udpstop);
NETIEVENT_SOCKET_TYPE(udpread);
NETIEVENT_SOCKET_TYPE(tcpdnsclose); NETIEVENT_SOCKET_TYPE(tcpdnsclose);
NETIEVENT_SOCKET_TYPE(tcpdnsread); NETIEVENT_SOCKET_TYPE(tcpdnsread);
@@ -1893,7 +1889,6 @@ NETIEVENT_SOCKET_DECL(tlsstartread);
NETIEVENT_SOCKET_HANDLE_DECL(tlscancel); NETIEVENT_SOCKET_HANDLE_DECL(tlscancel);
NETIEVENT_SOCKET_DECL(udplisten); NETIEVENT_SOCKET_DECL(udplisten);
NETIEVENT_SOCKET_DECL(udpstop); NETIEVENT_SOCKET_DECL(udpstop);
NETIEVENT_SOCKET_DECL(udpread);
NETIEVENT_SOCKET_DECL(tcpdnsclose); NETIEVENT_SOCKET_DECL(tcpdnsclose);
NETIEVENT_SOCKET_DECL(tcpdnsread); NETIEVENT_SOCKET_DECL(tcpdnsread);

View File

@@ -449,7 +449,6 @@ process_netievent(void *arg) {
NETIEVENT_CASE(udplisten); NETIEVENT_CASE(udplisten);
NETIEVENT_CASE(udpstop); NETIEVENT_CASE(udpstop);
NETIEVENT_CASE(udpcancel); NETIEVENT_CASE(udpcancel);
NETIEVENT_CASE(udpread);
NETIEVENT_CASE(tcpaccept); NETIEVENT_CASE(tcpaccept);
NETIEVENT_CASE(tcpconnect); NETIEVENT_CASE(tcpconnect);
@@ -536,7 +535,6 @@ NETIEVENT_SOCKET_HANDLE_DEF(tlscancel);
NETIEVENT_SOCKET_DEF(udplisten); NETIEVENT_SOCKET_DEF(udplisten);
NETIEVENT_SOCKET_DEF(udpstop); NETIEVENT_SOCKET_DEF(udpstop);
NETIEVENT_SOCKET_HANDLE_DEF(udpcancel); NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);
NETIEVENT_SOCKET_DEF(udpread);
NETIEVENT_SOCKET_DEF(tcpdnsclose); NETIEVENT_SOCKET_DEF(tcpdnsclose);
NETIEVENT_SOCKET_DEF(tcpdnsread); NETIEVENT_SOCKET_DEF(tcpdnsread);

View File

@@ -59,10 +59,6 @@
#endif /* if defined(HAVE_LINUX_NETLINK_H) && defined(HAVE_LINUX_RTNETLINK_H) \ #endif /* if defined(HAVE_LINUX_NETLINK_H) && defined(HAVE_LINUX_RTNETLINK_H) \
*/ */
static void
udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags);
static void static void
udp_send_cb(uv_udp_send_t *req, int status); udp_send_cb(uv_udp_send_t *req, int status);
@@ -413,7 +409,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__nm_set_network_buffers(mgr, &sock->uv_handle.handle); isc__nm_set_network_buffers(mgr, &sock->uv_handle.handle);
r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb, r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
udp_recv_cb); isc__nm_udp_read_cb);
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock, STATID_BINDFAIL); isc__nm_incstats(sock, STATID_BINDFAIL);
goto done; goto done;
@@ -511,9 +507,9 @@ isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
* reused for a series of packets, so we need to allocate a new one. * reused for a series of packets, so we need to allocate a new one.
* This new one can be reused to send the response then. * This new one can be reused to send the response then.
*/ */
static void void
udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags) { const struct sockaddr *addr, unsigned flags) {
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
isc__nm_uvreq_t *req = NULL; isc__nm_uvreq_t *req = NULL;
uint32_t maxudp; uint32_t maxudp;
@@ -562,15 +558,6 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
goto free; goto free;
} }
/*
* - If addr == NULL, in which case it's the end of stream;
* we can free the buffer and bail.
*/
if (addr == NULL) {
isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
goto free;
}
/* /*
* - If the network manager is shutting down * - If the network manager is shutting down
*/ */
@@ -587,6 +574,23 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
goto free; goto free;
} }
/*
* End of the current (iteration) datagram stream, just free the buffer.
* The callback with nrecv == 0 and addr == NULL is called for both
* normal UDP sockets and recvmmsg sockets at the end of every event
* loop iteration.
*/
if (nrecv == 0 && addr == NULL) {
INSIST(flags == 0);
goto free;
}
/*
* We could receive an empty datagram in which case:
* nrecv == 0 and addr != NULL
*/
INSIST(addr != NULL);
if (!sock->route_sock) { if (!sock->route_sock) {
result = isc_sockaddr_fromsockaddr(&sockaddr, addr); result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS); RUNTIME_CHECK(result == ISC_R_SUCCESS);
@@ -604,6 +608,16 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
sock->recv_read = false; sock->recv_read = false;
/*
* The client isc_nm_read() expects just a single message, so we need to
* stop reading now. The reading could be restarted in the read
* callback with another isc_nm_read() call.
*/
if (atomic_load(&sock->client)) {
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
REQUIRE(!sock->processing); REQUIRE(!sock->processing);
sock->processing = true; sock->processing = true;
isc__nm_readcb(sock, req, ISC_R_SUCCESS); isc__nm_readcb(sock, req, ISC_R_SUCCESS);
@@ -870,31 +884,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_sockaddr_t *local, isc_sockaddr_t *peer,
isc__nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
} }
void
isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags) {
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(atomic_load(&sock->client));
REQUIRE(sock->parent == NULL);
/*
* This function can only be reached when calling isc_nm_read() on
* a UDP client socket. There's no point calling isc_nm_read() on
* a UDP listener socket; those are always reading.
*
* The reason why we stop the timer and the reading after calling the
* callback is because there's a time window where a second UDP packet
* might be received between isc__nm_stop_reading() call and
* isc_nm_read() call from the callback and such UDP datagram would be
* lost like tears in the rain.
*/
udp_recv_cb(handle, nrecv, buf, addr, flags);
isc__nmsocket_timer_stop(sock);
isc__nm_stop_reading(sock);
}
void void
isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
@@ -940,23 +929,30 @@ isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
} }
} }
/*
* Asynchronous 'udpread' call handler: start or resume reading on a
* socket; pause reading and call the 'recv' callback after each
* datagram.
*/
void void
isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc_nmsocket_t *sock = NULL;
isc_nmsocket_t *sock = ievent->sock;
isc_result_t result; isc_result_t result;
UNUSED(worker); REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->statichandle == handle);
REQUIRE(!sock->recv_read);
REQUIRE(sock->tid == isc_tid()); REQUIRE(sock->tid == isc_tid());
if (isc__nm_closing(worker)) { /*
* We need to initialize the callback before checking for shutdown
* conditions, so the callback is always called even on error condition.
*/
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->recv_read = true;
if (isc__nm_closing(sock->worker)) {
result = ISC_R_SHUTTINGDOWN; result = ISC_R_SHUTTINGDOWN;
goto fail; goto fail;
} }
@@ -971,7 +967,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
goto fail; goto fail;
} }
isc__nmsocket_timer_start(sock); isc__nmsocket_timer_restart(sock);
return; return;
fail: fail:
@@ -979,35 +975,6 @@ fail:
isc__nm_failed_read_cb(sock, result, false); isc__nm_failed_read_cb(sock, result, false);
} }
void
isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle));
sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->statichandle == handle);
REQUIRE(!sock->recv_read);
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->recv_read = true;
if (!atomic_load(&sock->reading) && sock->tid == isc_tid()) {
isc__netievent_udpread_t ievent = { .sock = sock };
isc__nm_async_udpread(sock->worker,
(isc__netievent_t *)&ievent);
} else {
isc__netievent_udpread_t *ievent =
isc__nm_get_netievent_udpread(sock->worker, sock);
isc__nm_enqueue_ievent(sock->worker,
(isc__netievent_t *)ievent);
}
}
static void static void
udp_close_cb(uv_handle_t *handle) { udp_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle); isc_nmsocket_t *sock = uv_handle_get_data(handle);

View File

@@ -100,6 +100,8 @@ setup_test(void **state) {
isc_nonce_buf(&send_magic, sizeof(send_magic)); isc_nonce_buf(&send_magic, sizeof(send_magic));
connect_readcb = connect_read_cb;
return (0); return (0);
} }
@@ -797,7 +799,7 @@ udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
isc_refcount_increment0(&active_creads); isc_refcount_increment0(&active_creads);
isc_nmhandle_attach(handle, &readhandle); isc_nmhandle_attach(handle, &readhandle);
isc_nm_read(handle, connect_read_cb, cbarg); isc_nm_read(handle, connect_readcb, cbarg);
isc_refcount_increment0(&active_csends); isc_refcount_increment0(&active_csends);
isc_nmhandle_attach(handle, &sendhandle); isc_nmhandle_attach(handle, &sendhandle);
@@ -931,6 +933,171 @@ ISC_LOOP_TEST_IMPL(udp_recv_send) {
} }
} }
static void
double_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(cbarg);
UNUSED(eresult);
assert_non_null(handle);
F();
isc_refcount_decrement(&active_ssends);
switch (eresult) {
case ISC_R_SUCCESS:
if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
do_ssends_shutdown(loopmgr);
} else {
isc_nmhandle_t *sendhandle = NULL;
isc_nmhandle_attach(handle, &sendhandle);
isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
isc_refcount_increment0(&active_ssends);
isc_nm_send(sendhandle, &send_msg, double_read_send_cb,
cbarg);
break;
}
break;
case ISC_R_CANCELED:
break;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
isc_result_totext(eresult), cbarg);
assert_int_equal(eresult, ISC_R_SUCCESS);
}
isc_nmhandle_detach(&handle);
}
static void
double_read_listen_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *cbarg) {
uint64_t magic = 0;
assert_non_null(handle);
F();
switch (eresult) {
case ISC_R_EOF:
case ISC_R_SHUTTINGDOWN:
case ISC_R_CANCELED:
break;
case ISC_R_SUCCESS:
memmove(&magic, region->base, sizeof(magic));
assert_true(magic == send_magic);
assert_true(region->length >= sizeof(magic));
memmove(&magic, region->base, sizeof(magic));
assert_true(magic == send_magic);
isc_nmhandle_t *sendhandle = NULL;
isc_nmhandle_attach(handle, &sendhandle);
isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
isc_refcount_increment0(&active_ssends);
isc_nm_send(sendhandle, &send_msg, double_read_send_cb, cbarg);
return;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
isc_result_totext(eresult), cbarg);
assert_int_equal(eresult, ISC_R_SUCCESS);
}
isc_refcount_decrement(&active_sreads);
isc_nmhandle_detach(&handle);
}
static void
double_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *cbarg) {
uint64_t magic = 0;
bool detach = false;
UNUSED(cbarg);
assert_non_null(handle);
F();
switch (eresult) {
case ISC_R_SUCCESS:
assert_true(region->length >= sizeof(magic));
memmove(&magic, region->base, sizeof(magic));
assert_true(magic == send_magic);
if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
do_creads_shutdown(loopmgr);
detach = true;
}
if (magic == send_magic && allow_send_back) {
connect_send(handle);
return;
}
break;
case ISC_R_TIMEDOUT:
case ISC_R_EOF:
case ISC_R_SHUTTINGDOWN:
case ISC_R_CANCELED:
case ISC_R_CONNECTIONRESET:
detach = true;
break;
default:
fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
isc_result_totext(eresult), cbarg);
assert_int_equal(eresult, ISC_R_SUCCESS);
}
if (detach) {
isc_refcount_decrement(&active_creads);
isc_nmhandle_detach(&handle);
} else {
isc_nm_read(handle, connect_readcb, cbarg);
}
}
ISC_SETUP_TEST_IMPL(udp_double_read) {
setup_test(state);
expected_cconnects = 1;
cconnects_shutdown = false;
expected_csends = 1;
csends_shutdown = false;
expected_sreads = 1;
sreads_shutdown = false;
expected_ssends = 2;
ssends_shutdown = false;
expected_creads = 2;
creads_shutdown = true;
connect_readcb = double_read_cb;
return (0);
}
ISC_TEARDOWN_TEST_IMPL(udp_double_read) {
atomic_assert_int_eq(creads, expected_creads);
teardown_test(state);
return (0);
}
ISC_LOOP_TEST_IMPL(udp_double_read) {
start_listening(ISC_NM_LISTEN_ALL, double_read_listen_cb);
udp__connect(NULL);
}
ISC_TEST_LIST_START ISC_TEST_LIST_START
/* Mock tests are unreliable on OpenBSD */ /* Mock tests are unreliable on OpenBSD */
@@ -956,6 +1123,7 @@ ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_shutdown_connect)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_shutdown_read) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_shutdown_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_cancel_read) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_cancel_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_timeout_recovery) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_timeout_recovery)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_double_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_one) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_one)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_two) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_two)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_send) ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_send)