diff --git a/bin/rndc/rndc.c b/bin/rndc/rndc.c index 53a1cdccd6..de6b1850fa 100644 --- a/bin/rndc/rndc.c +++ b/bin/rndc/rndc.c @@ -1009,7 +1009,7 @@ main(int argc, char **argv) { isc_mem_create(&rndc_mctx); netmgr = isc_nm_start(rndc_mctx, 1); DO("create task manager", - isc_taskmgr_create(rndc_mctx, 1, 0, NULL, &taskmgr)); + isc_taskmgr_create(rndc_mctx, 1, 0, netmgr, &taskmgr)); DO("create task", isc_task_create(taskmgr, 0, &rndc_task)); isc_log_create(rndc_mctx, &log, &logconfig); isc_log_setcontext(log); diff --git a/bin/tests/system/pipelined/tests.sh b/bin/tests/system/pipelined/tests.sh index be2b469a22..77e8b0dec6 100644 --- a/bin/tests/system/pipelined/tests.sh +++ b/bin/tests/system/pipelined/tests.sh @@ -26,12 +26,10 @@ $DIFF ref output > /dev/null && { ret=1 ; echo_i "diff out of order failed"; } if [ $ret != 0 ]; then echo_i "failed"; fi status=`expr $status + $ret` -# flush resolver so queries will be from others again -$RNDCCMD 10.53.0.4 flush -sleep 1 - echo_i "check pipelined TCP queries using mdig" ret=0 +$RNDCCMD 10.53.0.4 flush +sleep 1 $MDIG $MDIGOPTS +noall +answer +vc -f input -b 10.53.0.4 @10.53.0.4 > raw.mdig awk '{ print $1 " " $5 }' < raw.mdig > output.mdig sort < output.mdig > output-sorted.mdig @@ -42,6 +40,8 @@ status=`expr $status + $ret` echo_i "check keep-response-order" ret=0 +$RNDCCMD 10.53.0.4 flush +sleep 1 $PIPEQUERIES -p ${PORT} ++ < inputb > rawb || ret=1 awk '{ print $1 " " $5 }' < rawb > outputb $DIFF refb outputb || ret=1 @@ -50,6 +50,8 @@ status=`expr $status + $ret` echo_i "check keep-response-order using mdig" ret=0 +$RNDCCMD 10.53.0.4 flush +sleep 1 $MDIG $MDIGOPTS +noall +answer +vc -f inputb -b 10.53.0.7 @10.53.0.4 > rawb.mdig awk '{ print $1 " " $5 }' < rawb.mdig > outputb.mdig $DIFF refb outputb.mdig || ret=1 @@ -58,6 +60,8 @@ status=`expr $status + $ret` echo_i "check mdig -4 -6" ret=0 +$RNDCCMD 10.53.0.4 flush +sleep 1 $MDIG $MDIGOPTS -4 -6 -f input @10.53.0.4 > output46.mdig 2>&1 && ret=1 grep "only one of -4 and -6 allowed" output46.mdig > /dev/null || ret=1 if [ $ret != 0 ]; then echo_i "failed"; fi diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index 3f6d8f6f05..b1c92ce348 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -176,8 +176,8 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, * as its argument. * * When handles are allocated for the socket, 'extrasize' additional bytes - * will be allocated along with the handle for an associated object - * (typically ns_client). + * can be allocated along with the handle for an associated object, which + * can then be freed automatically when the handle is destroyed. */ void @@ -196,12 +196,17 @@ isc_nm_pause(isc_nm_t *mgr); void isc_nm_resume(isc_nm_t *mgr); /*%< - * Resume paused processing. It will return immediately - * after signalling workers to resume. + * Resume paused processing. It will return immediately after signalling + * workers to resume. */ isc_result_t isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); +/* + * Begin (or continue) reading on the socket associated with 'handle', and + * update its recv callback to 'cb', which will be called as soon as there + * is data to process. + */ isc_result_t isc_nm_pauseread(isc_nmhandle_t *handle); diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 6f4d1ff3e7..7c02d1e6da 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -117,12 +117,10 @@ struct isc_nmiface { typedef enum isc__netievent_type { netievent_udpsend, - netievent_udprecv, netievent_udpstop, netievent_tcpconnect, netievent_tcpsend, - netievent_tcprecv, netievent_tcpstartread, netievent_tcppauseread, netievent_tcpchildaccept, @@ -130,8 +128,8 @@ typedef enum isc__netievent_type { netievent_tcpstop, netievent_tcpclose, - netievent_tcpdnsclose, netievent_tcpdnssend, + netievent_tcpdnsclose, netievent_closecb, netievent_shutdown, @@ -146,20 +144,12 @@ typedef enum isc__netievent_type { netievent_tcplisten, } isc__netievent_type; -/* - * We have to split it because we can read and write on a socket - * simultaneously. - */ typedef union { isc_nm_recv_cb_t recv; + isc_nm_cb_t connect; isc_nm_accept_cb_t accept; } isc__nm_readcb_t; -typedef union { - isc_nm_cb_t send; - isc_nm_cb_t connect; -} isc__nm_writecb_t; - typedef union { isc_nm_recv_cb_t recv; isc_nm_accept_cb_t accept; @@ -209,10 +199,10 @@ typedef isc__netievent__socket_t isc__netievent_udplisten_t; typedef isc__netievent__socket_t isc__netievent_udpstop_t; typedef isc__netievent__socket_t isc__netievent_tcpstop_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; -typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; typedef isc__netievent__socket_t isc__netievent_pauseread_t; typedef isc__netievent__socket_t isc__netievent_closecb_t; +typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef struct isc__netievent__socket_req { isc__netievent_type type; @@ -333,7 +323,7 @@ typedef enum isc_nmsocket_type { isc_nm_tcpsocket, isc_nm_tcplistener, isc_nm_tcpdnslistener, - isc_nm_tcpdnssocket + isc_nm_tcpdnssocket, } isc_nmsocket_type; /*% @@ -403,7 +393,7 @@ struct isc_nmsocket { isc_nmsocket_t *children; int nchildren; isc_nmiface_t *iface; - isc_nmhandle_t *tcphandle; + isc_nmhandle_t *statichandle; isc_nmhandle_t *outerhandle; /*% Extra data allocated at the end of each isc_nmhandle_t */ @@ -445,7 +435,12 @@ struct isc_nmsocket { isc_refcount_t references; /*% - * TCPDNS socket has been set not to pipeliine. + * Established an outgoing connection, as client not server. + */ + atomic_bool client; + + /*% + * TCPDNS socket has been set not to pipeline. */ atomic_bool sequential; @@ -686,6 +681,9 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, isc_result_t isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg); +/* + * Back-end implementation of isc_nm_read() for TCP handles. + */ void isc__nm_tcp_close(isc_nmsocket_t *sock); @@ -713,9 +711,9 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock); */ void -isc__nm_tcp_cancelread(isc_nmsocket_t *sock); +isc__nm_tcp_cancelread(isc_nmhandle_t *handle); /*%< - * Stop reading on a connected socket. + * Stop reading on a connected TCP handle. */ void diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index bb10557e9c..abd4566669 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -591,6 +591,7 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { uv_stop(&worker->loop); isc_mempool_put(worker->mgr->evpool, ievent); return; + case netievent_udplisten: isc__nm_async_udplisten(worker, ievent); break; @@ -600,6 +601,7 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_udpsend: isc__nm_async_udpsend(worker, ievent); break; + case netievent_tcpconnect: isc__nm_async_tcpconnect(worker, ievent); break; @@ -630,9 +632,11 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; + case netievent_tcpdnsclose: isc__nm_async_tcpdnsclose(worker, ievent); break; + case netievent_closecb: isc__nm_async_closecb(worker, ievent); break; @@ -739,7 +743,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]); } - sock->tcphandle = NULL; + sock->statichandle = NULL; if (sock->outerhandle != NULL) { isc_nmhandle_unref(sock->outerhandle); @@ -833,7 +837,7 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { } } - if (active_handles == 0 || sock->tcphandle != NULL) { + if (active_handles == 0 || sock->statichandle != NULL) { destroy = true; } @@ -1051,7 +1055,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, if (handle == NULL) { handle = alloc_handle(sock); } else { - isc_refcount_increment0(&handle->references); + isc_refcount_init(&handle->references, 1); INSIST(VALID_NMHANDLE(handle)); } @@ -1099,9 +1103,11 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, handle->ah_pos = pos; UNLOCK(&sock->lock); - if (sock->type == isc_nm_tcpsocket) { - INSIST(sock->tcphandle == NULL); - sock->tcphandle = handle; + if (sock->type == isc_nm_tcpsocket || + (sock->type == isc_nm_udpsocket && atomic_load(&sock->client))) + { + INSIST(sock->statichandle == NULL); + sock->statichandle = handle; } return (handle); @@ -1208,6 +1214,10 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { } } + if (handle == sock->statichandle) { + sock->statichandle = NULL; + } + isc__nmsocket_detach(&sock); } @@ -1353,7 +1363,7 @@ isc_nm_cancelread(isc_nmhandle_t *handle) { switch (handle->sock->type) { case isc_nm_tcpsocket: - isc__nm_tcp_cancelread(handle->sock); + isc__nm_tcp_cancelread(handle); break; default: INSIST(0); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 73b8b1fa0b..114279710c 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -147,49 +147,46 @@ done: static void tcp_connect_cb(uv_connect_t *uvreq, int status) { + isc_result_t result; isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data; isc_nmsocket_t *sock = NULL; + struct sockaddr_storage ss; + isc_nmhandle_t *handle = NULL; sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); REQUIRE(VALID_UVREQ(req)); - if (status == 0) { - isc_result_t result; - struct sockaddr_storage ss; - isc_nmhandle_t *handle = NULL; - - sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); - uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, - &(int){ sizeof(ss) }); - result = isc_sockaddr_fromsockaddr(&sock->peer, - (struct sockaddr *)&ss); - RUNTIME_CHECK(result == ISC_R_SUCCESS); - - handle = isc__nmhandle_get(sock, NULL, NULL); - req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); - - isc__nm_uvreq_put(&req, sock); - - /* - * The sock is now attached to the handle. - */ - isc__nmsocket_detach(&sock); - - /* - * If the connect callback wants to hold on to the handle, - * it needs to attach to it. - */ - isc_nmhandle_unref(handle); - } else { - /* - * TODO: - * Handle the connect error properly and free the socket. - */ + if (status != 0) { req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg); isc__nm_uvreq_put(&req, sock); + return; } + + sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); + uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, + &(int){ sizeof(ss) }); + result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); + RUNTIME_CHECK(result == ISC_R_SUCCESS); + + handle = isc__nmhandle_get(sock, NULL, NULL); + req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); + + isc__nm_uvreq_put(&req, sock); + + atomic_init(&sock->client, true); + + /* + * The sock is now attached to the handle. + */ + isc__nmsocket_detach(&sock); + + /* + * If the connect callback wants to hold on to the handle, + * it needs to attach to it. + */ + isc_nmhandle_unref(handle); } isc_result_t @@ -201,6 +198,8 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_result_t result = ISC_R_SUCCESS; REQUIRE(VALID_NM(mgr)); + REQUIRE(local != NULL); + REQUIRE(peer != NULL); nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); @@ -211,6 +210,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, req->cb.connect = cb; req->cbarg = cbarg; req->peer = peer->addr; + req->local = local->addr; ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect); ievent->sock = nsock; @@ -604,7 +604,7 @@ readtimeout_cb(uv_timer_t *handle) { isc_quota_detach(&sock->quota); } if (sock->rcb.recv != NULL) { - sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL, + sock->rcb.recv(sock->statichandle, ISC_R_TIMEDOUT, NULL, sock->rcbarg); isc__nmsocket_clearcb(sock); } @@ -766,8 +766,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { .length = nread }; if (sock->rcb.recv != NULL) { - sock->rcb.recv(sock->tcphandle, ISC_R_SUCCESS, ®ion, - sock->rcbarg); + sock->rcb.recv(sock->statichandle, ISC_R_SUCCESS, + ®ion, sock->rcbarg); } sock->read_timeout = (atomic_load(&sock->keepalive) @@ -792,7 +792,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { */ if (sock->rcb.recv != NULL) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); - sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg); + sock->rcb.recv(sock->statichandle, ISC_R_EOF, NULL, + sock->rcbarg); isc__nmsocket_clearcb(sock); } @@ -1125,24 +1126,27 @@ void isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); - if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL && + if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL && sock->rcb.recv != NULL) { - sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL, + sock->rcb.recv(sock->statichandle, ISC_R_CANCELED, NULL, sock->rcbarg); isc__nmsocket_clearcb(sock); } } void -isc__nm_tcp_cancelread(isc_nmsocket_t *sock) { - REQUIRE(VALID_NMSOCK(sock)); +isc__nm_tcp_cancelread(isc_nmhandle_t *handle) { + isc_nmsocket_t *sock = NULL; - if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL && - sock->rcb.recv != NULL) - { - sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL, - sock->rcbarg); + REQUIRE(VALID_NMHANDLE(handle)); + + sock = handle->sock; + + REQUIRE(sock->type == isc_nm_tcpsocket); + + if (atomic_load(&sock->client) && sock->rcb.recv != NULL) { + sock->rcb.recv(handle, ISC_R_EOF, NULL, sock->rcbarg); isc__nmsocket_clearcb(sock); } } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 512f88be03..107cde97b6 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -82,7 +82,9 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { static void timer_close_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = (isc_nmsocket_t *)uv_handle_get_data(handle); - INSIST(VALID_NMSOCK(sock)); + + REQUIRE(VALID_NMSOCK(sock)); + atomic_store(&sock->closed, true); tcpdns_close_direct(sock); } @@ -94,7 +96,8 @@ dnstcp_readtimeout(uv_timer_t *timer) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); - /* Close the TCP connection, it's closing should fire 'our' closing */ + + /* Close the TCP connection; its closure should fire ours. */ isc_nmhandle_unref(sock->outerhandle); sock->outerhandle = NULL; } @@ -187,8 +190,15 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { */ len = dnslen(dnssock->buf); if (len <= dnssock->buf_len - 2) { - isc_nmhandle_t *dnshandle = isc__nmhandle_get(dnssock, NULL, - NULL); + isc_nmhandle_t *dnshandle; + if (atomic_load(&dnssock->client) && + dnssock->statichandle != NULL) { + dnshandle = dnssock->statichandle; + isc_nmhandle_ref(dnshandle); + } else { + dnshandle = isc__nmhandle_get(dnssock, NULL, NULL); + } + isc_nmsocket_t *listener = dnssock->listener; if (listener != NULL && listener->rcb.recv != NULL) { @@ -197,6 +207,20 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) { &(isc_region_t){ .base = dnssock->buf + 2, .length = len }, listener->rcbarg); + } else if (dnssock->rcb.recv != NULL) { + isc_nm_recv_cb_t cb = dnssock->rcb.recv; + void *cbarg = dnssock->rcbarg; + + /* + * We need to clear the read callback *before* + * calling it, because it might make another + * call to isc_nm_read() and set up a new callback. + */ + isc__nmsocket_clearcb(dnssock); + cb(dnshandle, ISC_R_SUCCESS, + &(isc_region_t){ .base = dnssock->buf + 2, + .length = len }, + cbarg); } len += 2; @@ -227,11 +251,12 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, REQUIRE(VALID_NMSOCK(dnssock)); REQUIRE(VALID_NMHANDLE(handle)); - REQUIRE(dnssock->tid == isc_nm_tid()); if (region == NULL || eresult != ISC_R_SUCCESS) { /* Connection closed */ - isc_nmhandle_unref(handle); + if (eresult != ISC_R_CANCELED) { + isc_nmhandle_unref(handle); + } dnssock->result = eresult; if (dnssock->self != NULL) { isc__nmsocket_detach(&dnssock->self); @@ -277,11 +302,14 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult, uv_timer_stop(&dnssock->timer); } - if (atomic_load(&dnssock->sequential)) { + if (atomic_load(&dnssock->sequential) || + dnssock->rcb.recv == NULL) { /* - * We're in sequential mode and we processed - * one packet, so we're done until the next read - * completes. + * Two reasons we might want to pause here: + * - If we're in sequential mode and we've received + * a whole packet, so we're done until it's been + * processed; + * - If we no longer have a read callback. */ isc_nm_pauseread(dnssock->outerhandle); done = true; @@ -314,7 +342,6 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, void *cbarg, isc_nm_accept_cb_t accept_cb, void *accept_cbarg, size_t extrahandlesize, int backlog, isc_quota_t *quota, isc_nmsocket_t **sockp) { - /* A 'wrapper' socket object with outer set to true TCP socket */ isc_nmsocket_t *dnslistensock = isc_mem_get(mgr->mctx, sizeof(*dnslistensock)); isc_result_t result; @@ -328,7 +355,11 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, dnslistensock->accept_cbarg = accept_cbarg; dnslistensock->extrahandlesize = extrahandlesize; - /* We set dnslistensock->outer to a true listening socket */ + /* + * dnslistensock will be a DNS 'wrapper' around a connected + * stream. We set dnslistensock->outer to a socket listening + * for a TCP connection. + */ result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock, extrahandlesize, backlog, quota, &dnslistensock->outer); @@ -495,8 +526,7 @@ isc__nm_async_tcpdnssend(isc__networker_t *worker, isc__netievent_t *ev0) { r.base = (unsigned char *)req->uvbuf.base; r.length = req->uvbuf.len; - result = isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb, - req); + result = isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb, req); } if (result != ISC_R_SUCCESS) { @@ -538,8 +568,8 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region, r.base = (unsigned char *)uvreq->uvbuf.base; r.length = uvreq->uvbuf.len; - return (isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb, - uvreq)); + return (isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb, + uvreq)); } else { isc__netievent_tcpdnssend_t *ievent = NULL; diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index ff981f920f..9a4013f666 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -80,7 +80,7 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, csock->rcb.recv = cb; csock->rcbarg = cbarg; csock->fd = socket(family, SOCK_DGRAM, 0); - INSIST(csock->fd >= 0); + RUNTIME_CHECK(csock->fd >= 0); /* * This is SO_REUSE**** hell: @@ -223,7 +223,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { } static void -udp_close_cb(uv_handle_t *handle) { +udp_stop_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); atomic_store(&sock->closed, true); @@ -236,7 +236,7 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->tid == isc_nm_tid()); uv_udp_recv_stop(&sock->uv_handle.udp); - uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_close_cb); + uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); @@ -395,7 +395,11 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); - nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); + if (!atomic_load(&sock->connected)) { + nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); + } else { + nmhandle = sock->statichandle; + } region.base = (unsigned char *)buf->base; region.length = nrecv; @@ -425,13 +429,13 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, isc_result_t isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { - isc_nmsocket_t *psock = NULL, *rsock = NULL; isc_nmsocket_t *sock = handle->sock; + isc_nmsocket_t *psock = NULL, *rsock = sock; isc_sockaddr_t *peer = &handle->peer; isc__netievent_udpsend_t *ievent = NULL; isc__nm_uvreq_t *uvreq = NULL; - int ntid; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); + int ntid; /* * We're simulating a firewall blocking UDP packets bigger than @@ -446,12 +450,12 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, return (ISC_R_SUCCESS); } - if (sock->type == isc_nm_udpsocket) { + if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) { INSIST(sock->parent != NULL); psock = sock->parent; } else if (sock->type == isc_nm_udplistener) { psock = sock; - } else { + } else if (!atomic_load(&sock->client)) { INSIST(0); ISC_UNREACHABLE(); } @@ -467,13 +471,16 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, */ if (isc__nm_in_netthread()) { ntid = isc_nm_tid(); - } else if (sock->type == isc_nm_udpsocket) { + } else if (sock->type == isc_nm_udpsocket && + !atomic_load(&sock->client)) { ntid = sock->tid; } else { ntid = (int)isc_random_uniform(sock->nchildren); } - rsock = &psock->children[ntid]; + if (psock != NULL) { + rsock = &psock->children[ntid]; + } uvreq = isc__nm_uvreq_get(sock->mgr, sock); uvreq->uvbuf.base = (char *)region->base; @@ -553,6 +560,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { static isc_result_t udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, isc_sockaddr_t *peer) { + const struct sockaddr *sa = NULL; int rv; REQUIRE(sock->tid == isc_nm_tid()); @@ -561,9 +569,11 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, if (!isc__nmsocket_active(sock)) { return (ISC_R_CANCELED); } + isc_nmhandle_ref(req->handle); + sa = atomic_load(&sock->connected) ? NULL : &peer->type.sa; rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp, - &req->uvbuf, 1, &peer->type.sa, udp_send_cb); + &req->uvbuf, 1, sa, udp_send_cb); if (rv < 0) { isc__nm_incstats(req->sock->mgr, req->sock->statsindex[STATID_SENDFAIL]);