diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 7c02d1e6da..633e1a0e96 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -146,13 +146,6 @@ typedef enum isc__netievent_type { typedef union { isc_nm_recv_cb_t recv; - isc_nm_cb_t connect; - isc_nm_accept_cb_t accept; -} isc__nm_readcb_t; - -typedef union { - isc_nm_recv_cb_t recv; - isc_nm_accept_cb_t accept; isc_nm_cb_t send; isc_nm_cb_t connect; } isc__nm_cb_t; @@ -526,10 +519,10 @@ struct isc_nmsocket { */ isc_nm_opaquecb_t closehandle_cb; - isc__nm_readcb_t rcb; - void *rcbarg; + isc_nm_recv_cb_t recv_cb; + void *recv_cbarg; - isc__nm_cb_t accept_cb; + isc_nm_accept_cb_t accept_cb; void *accept_cbarg; }; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index ab8852d8d0..4c6fdde1b2 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -1000,9 +1000,9 @@ void isc__nmsocket_clearcb(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - sock->rcb.recv = NULL; - sock->rcbarg = NULL; - sock->accept_cb.accept = NULL; + sock->recv_cb = NULL; + sock->recv_cbarg = NULL; + sock->accept_cb = NULL; sock->accept_cbarg = NULL; } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index e4dd3fecec..822823cb81 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -262,7 +262,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface); - nsock->accept_cb.accept = accept_cb; + nsock->accept_cb = accept_cb; nsock->accept_cbarg = accept_cbarg; nsock->extrahandlesize = extrahandlesize; nsock->backlog = backlog; @@ -434,6 +434,8 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { struct sockaddr_storage ss; isc_sockaddr_t local; int r; + isc_nm_accept_cb_t accept_cb; + void *accept_cbarg; REQUIRE(isc__nm_in_netthread()); REQUIRE(ssock->type == isc_nm_tcplistener); @@ -488,9 +490,14 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { handle = isc__nmhandle_get(csock, NULL, &local); - INSIST(ssock->accept_cb.accept != NULL); + LOCK(&ssock->lock); + INSIST(ssock->accept_cb != NULL); + accept_cb = ssock->accept_cb; + accept_cbarg = ssock->accept_cbarg; + UNLOCK(&ssock->lock); + csock->read_timeout = ssock->mgr->init; - ssock->accept_cb.accept(handle, ISC_R_SUCCESS, ssock->accept_cbarg); + accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); /* * csock is now attached to the handle. @@ -583,6 +590,8 @@ tcp_listenclose_cb(uv_handle_t *handle) { static void readtimeout_cb(uv_timer_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); + isc_nm_recv_cb_t cb; + void *cbarg; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -603,10 +612,15 @@ readtimeout_cb(uv_timer_t *handle) { if (sock->quota) { isc_quota_detach(&sock->quota); } - if (sock->rcb.recv != NULL) { - sock->rcb.recv(sock->statichandle, ISC_R_TIMEDOUT, NULL, - sock->rcbarg); - isc__nmsocket_clearcb(sock); + + LOCK(&sock->lock); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + isc__nmsocket_clearcb(sock); + UNLOCK(&sock->lock); + + if (cb != NULL) { + cb(sock->statichandle, ISC_R_TIMEDOUT, NULL, cbarg); } } @@ -619,8 +633,11 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMSOCK(handle->sock)); sock = handle->sock; - sock->rcb.recv = cb; - sock->rcbarg = cbarg; + + LOCK(&sock->lock); + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + UNLOCK(&sock->lock); ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstartread); ievent->sock = sock; @@ -729,9 +746,12 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) { isc__netievent_startread_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); - if (sock->rcb.recv == NULL) { + LOCK(&sock->lock); + if (sock->recv_cb == NULL) { + UNLOCK(&sock->lock); return (ISC_R_CANCELED); } + UNLOCK(&sock->lock); if (!atomic_load(&sock->readpaused)) { return (ISC_R_SUCCESS); @@ -757,17 +777,23 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream); + isc_nm_recv_cb_t cb; + void *cbarg; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(buf != NULL); + LOCK(&sock->lock); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + UNLOCK(&sock->lock); + if (nread >= 0) { isc_region_t region = { .base = (unsigned char *)buf->base, .length = nread }; - if (sock->rcb.recv != NULL) { - sock->rcb.recv(sock->statichandle, ISC_R_SUCCESS, - ®ion, sock->rcbarg); + if (cb != NULL) { + cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); } sock->read_timeout = (atomic_load(&sock->keepalive) @@ -790,11 +816,10 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { * This might happen if the inner socket is closing. It means that * it's detached, so the socket will be closed. */ - if (sock->rcb.recv != NULL) { + if (cb != NULL) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); - sock->rcb.recv(sock->statichandle, ISC_R_EOF, NULL, - sock->rcbarg); isc__nmsocket_clearcb(sock); + cb(sock->statichandle, ISC_R_EOF, NULL, cbarg); } /* @@ -1122,12 +1147,19 @@ void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL && - sock->rcb.recv != NULL) - { - sock->rcb.recv(sock->statichandle, ISC_R_CANCELED, NULL, - sock->rcbarg); + if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL) { + isc_nm_recv_cb_t cb; + void *cbarg; + + LOCK(&sock->lock); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); + UNLOCK(&sock->lock); + + if (cb != NULL) { + cb(sock->statichandle, ISC_R_CANCELED, NULL, cbarg); + } } } @@ -1142,8 +1174,16 @@ isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_tcpsocket); - if (atomic_load(&sock->client) && sock->rcb.recv != NULL) { - sock->rcb.recv(handle, ISC_R_EOF, NULL, sock->rcbarg); + if (atomic_load(&sock->client)) { + isc_nm_recv_cb_t cb; + void *cbarg; + + LOCK(&sock->lock); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; isc__nmsocket_clearcb(sock); + UNLOCK(&sock->lock); + + cb(handle, ISC_R_EOF, NULL, cbarg); } } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 84bf28d206..e086ed5e79 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -109,6 +109,8 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmsocket_t *dnslistensock = (isc_nmsocket_t *)cbarg; isc_nmsocket_t *dnssock = NULL; isc_nmhandle_t *readhandle = NULL; + isc_nm_accept_cb_t accept_cb; + void *accept_cbarg; REQUIRE(VALID_NMSOCK(dnslistensock)); REQUIRE(dnslistensock->type == isc_nm_tcpdnslistener); @@ -117,9 +119,13 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { return (result); } - if (dnslistensock->accept_cb.accept != NULL) { - result = dnslistensock->accept_cb.accept( - handle, ISC_R_SUCCESS, dnslistensock->accept_cbarg); + LOCK(&dnslistensock->lock); + accept_cb = dnslistensock->accept_cb; + accept_cbarg = dnslistensock->accept_cbarg; + UNLOCK(&dnslistensock->lock); + + if (accept_cb != NULL) { + result = accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); if (result != ISC_R_SUCCESS) { return (result); } @@ -196,6 +202,8 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { if (len <= dnssock->buf_len - 2) { isc_nmhandle_t *dnshandle = NULL; isc_nmsocket_t *listener = NULL; + isc_nm_recv_cb_t cb = NULL; + void *cbarg = NULL; if (atomic_load(&dnssock->client) && dnssock->statichandle != NULL) { @@ -205,22 +213,25 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { } listener = dnssock->listener; - if (listener != NULL && listener->rcb.recv != NULL) { - listener->rcb.recv( - dnshandle, ISC_R_SUCCESS, - &(isc_region_t){ .base = dnssock->buf + 2, - .length = len }, - listener->rcbarg); - } else if (dnssock->rcb.recv != NULL) { - isc_nm_recv_cb_t cb = dnssock->rcb.recv; - void *cbarg = dnssock->rcbarg; - + if (listener != NULL) { + LOCK(&listener->lock); + cb = listener->recv_cb; + cbarg = listener->recv_cbarg; + UNLOCK(&listener->lock); + } else if (dnssock->recv_cb != NULL) { + LOCK(&dnssock->lock); + cb = dnssock->recv_cb; + cbarg = dnssock->recv_cbarg; /* * We need to clear the read callback *before* * calling it, because it might make another * call to isc_nm_read() and set up a new callback. */ isc__nmsocket_clearcb(dnssock); + UNLOCK(&dnssock->lock); + } + + if (cb != NULL) { cb(dnshandle, ISC_R_SUCCESS, &(isc_region_t){ .base = dnssock->buf + 2, .length = len }, @@ -303,8 +314,10 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, uv_timer_stop(&dnssock->timer); } + LOCK(&dnssock->lock); if (atomic_load(&dnssock->sequential) || - dnssock->rcb.recv == NULL) { + dnssock->recv_cb == NULL) { + UNLOCK(&dnssock->lock); /* * There are two reasons we might want to pause here: * - We're in sequential mode and we've received @@ -315,6 +328,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, isc_nm_pauseread(dnssock->outerhandle); done = true; } else { + UNLOCK(&dnssock->lock); /* * We're pipelining, so we now resume processing * packets until the clients-per-connection limit @@ -350,10 +364,12 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, REQUIRE(VALID_NM(mgr)); isc__nmsocket_init(dnslistensock, mgr, isc_nm_tcpdnslistener, iface); - dnslistensock->rcb.recv = cb; - dnslistensock->rcbarg = cbarg; - dnslistensock->accept_cb.accept = accept_cb; + LOCK(&dnslistensock->lock); + dnslistensock->recv_cb = cb; + dnslistensock->recv_cbarg = cbarg; + dnslistensock->accept_cb = accept_cb; dnslistensock->accept_cbarg = accept_cbarg; + UNLOCK(&dnslistensock->lock); dnslistensock->extrahandlesize = extrahandlesize; /* diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index d167b91727..e820b24405 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -59,9 +59,9 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, mgr->nworkers * sizeof(*nsock)); memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); - INSIST(nsock->rcb.recv == NULL && nsock->rcbarg == NULL); - nsock->rcb.recv = cb; - nsock->rcbarg = cbarg; + INSIST(nsock->recv_cb == NULL && nsock->recv_cbarg == NULL); + nsock->recv_cb = cb; + nsock->recv_cbarg = cbarg; nsock->extrahandlesize = extrahandlesize; for (size_t i = 0; i < mgr->nworkers; i++) { @@ -76,9 +76,9 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, csock->tid = i; csock->extrahandlesize = extrahandlesize; - INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); - csock->rcb.recv = cb; - csock->rcbarg = cbarg; + INSIST(csock->recv_cb == NULL && csock->recv_cbarg == NULL); + csock->recv_cb = cb; + csock->recv_cbarg = cbarg; csock->fd = socket(family, SOCK_DGRAM, 0); RUNTIME_CHECK(csock->fd >= 0); @@ -360,6 +360,8 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc_region_t region; uint32_t maxudp; bool free_buf = true; + isc_nm_recv_cb_t cb; + void *cbarg; /* * Even though destruction of the socket can only happen from the @@ -399,8 +401,18 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, region.base = (unsigned char *)buf->base; region.length = nrecv; - INSIST(sock->rcb.recv != NULL); - sock->rcb.recv(nmhandle, ISC_R_SUCCESS, ®ion, sock->rcbarg); + /* + * In tcp.c and tcpdns.c, this would need to be locked + * by sock->lock because callbacks may be set to NULL + * unexpectedly when the connection drops, but that isn't + * a factor in the UDP case. + */ + INSIST(sock->recv_cb != NULL); + cb = sock->recv_cb; + cbarg = sock->recv_cbarg; + + cb(nmhandle, ISC_R_SUCCESS, ®ion, cbarg); + if (free_buf) { isc__nm_free_uvbuf(sock, buf); }