2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-04 16:45:24 +00:00

Merge branch 'each-rndc-netmgr-pt1' into 'main'

client-side TCP

Closes #1958

See merge request isc-projects/bind9!3723
This commit is contained in:
Evan Hunt
2020-06-19 22:19:10 +00:00
11 changed files with 361 additions and 123 deletions

View File

@@ -1,3 +1,6 @@
5442. [func] Add support for outgoing TCP connections in netmgr.
[GL #1958]
5441. [placeholder] 5441. [placeholder]
5440. [placeholder] 5440. [placeholder]

View File

@@ -57,26 +57,13 @@ isc_nm_closedown(isc_nm_t *mgr);
int int
isc_nm_tid(void); isc_nm_tid(void);
/*
* isc_nm_freehandle frees a handle, releasing resources
*/
void void
isc_nm_freehandle(isc_nmhandle_t *handle); isc_nmsocket_close(isc_nmsocket_t **sockp);
void
isc_nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target);
/*%< /*%<
* isc_nmsocket_attach attaches to a socket, increasing refcount * isc_nmsocket_close() detaches a listening socket that was
*/ * created by isc_nm_listenudp(), isc_nm_listentcp(), or
* isc_nm_listentcpdns(). Once there are no remaining child
void * sockets with active handles, the socket will be closed.
isc_nmsocket_close(isc_nmsocket_t *sock);
void
isc_nmsocket_detach(isc_nmsocket_t **socketp);
/*%<
* isc_nmsocket_detach detaches from socket, decreasing refcount
* and possibly destroying the socket if it's no longer referenced.
*/ */
void void
@@ -135,26 +122,27 @@ isc_nmhandle_netmgr(isc_nmhandle_t *handle);
* Return a pointer to the netmgr object for the given handle. * Return a pointer to the netmgr object for the given handle.
*/ */
typedef void (*isc_nm_recv_cb_t)(isc_nmhandle_t *handle, isc_region_t *region, typedef void (*isc_nm_recv_cb_t)(isc_nmhandle_t *handle, isc_result_t eresult,
void *cbarg); isc_region_t *region, void *cbarg);
/*%< /*%<
* Callback function to be used when receiving a packet. * Callback function to be used when receiving a packet.
* *
* 'handle' the handle that can be used to send back the answer. * 'handle' the handle that can be used to send back the answer.
* 'region' contains the received data. It will be freed after * 'eresult' the result of the event.
* return by caller. * 'region' contains the received data, if any. It will be freed
* after return by caller.
* 'cbarg' the callback argument passed to isc_nm_listenudp(), * 'cbarg' the callback argument passed to isc_nm_listenudp(),
* isc_nm_listentcpdns(), or isc_nm_read(). * isc_nm_listentcpdns(), or isc_nm_read().
*/ */
typedef void (*isc_nm_cb_t)(isc_nmhandle_t *handle, isc_result_t result, typedef void (*isc_nm_cb_t)(isc_nmhandle_t *handle, isc_result_t eresult,
void *cbarg); void *cbarg);
/*%< /*%<
* Callback function for other network completion events (send, connect, * Callback function for other network completion events (send, connect,
* accept). * accept).
* *
* 'handle' the handle on which the event took place. * 'handle' the handle on which the event took place.
* 'result' the result of the event. * 'eresult' the result of the event.
* 'cbarg' the callback argument passed to isc_nm_send(), * 'cbarg' the callback argument passed to isc_nm_send(),
* isc_nm_tcp_connect(), or isc_nm_listentcp() * isc_nm_tcp_connect(), or isc_nm_listentcp()
*/ */
@@ -205,6 +193,17 @@ isc_nm_pauseread(isc_nmsocket_t *sock);
* Pause reading on this socket, while still remembering the callback. * Pause reading on this socket, while still remembering the callback.
*/ */
void
isc_nm_cancelread(isc_nmhandle_t *handle);
/*%<
* Cancel reading on a connected socket. Calls the read/recv callback on
* active handles with a result code of ISC_R_CANCELED.
*
* Requires:
* \li 'sock' is a valid netmgr socket
* \li ...for which a read/recv callback has been defined.
*/
isc_result_t isc_result_t
isc_nm_resumeread(isc_nmsocket_t *sock); isc_nm_resumeread(isc_nmsocket_t *sock);
/*%< /*%<
@@ -251,6 +250,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
* prepended with a two-byte length field, and handles buffering. * prepended with a two-byte length field, and handles buffering.
*/ */
isc_result_t
isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize);
/*%<
* Create a socket using netmgr 'mgr', bind it to the address 'local',
* and connect it to the address 'peer'.
*
* When the connection is complete, call 'cb' with argument 'cbarg'.
* Allocate 'extrahandlesize' additional bytes along with the handle to use
* for an associated object.
*
* The connected socket can only be accessed via the handle passed to
* 'cb'.
*/
isc_result_t isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg, void *cbarg, isc_nm_cb_t accept_cb, void *accept_cbarg,

View File

@@ -96,8 +96,7 @@ struct isc_nmhandle {
* the socket. * the socket.
*/ */
isc_nmsocket_t *sock; isc_nmsocket_t *sock;
size_t ah_pos; /* Position in the socket's size_t ah_pos; /* Position in the socket's 'active handles' array */
* 'active handles' array */
/* /*
* The handle is 'inflight' if netmgr is not currently processing * The handle is 'inflight' if netmgr is not currently processing
@@ -141,6 +140,7 @@ typedef enum isc__netievent_type {
netievent_closecb, netievent_closecb,
netievent_shutdown, netievent_shutdown,
netievent_stop, netievent_stop,
netievent_prio = 0xff, /* event type values higher than this netievent_prio = 0xff, /* event type values higher than this
* will be treated as high-priority * will be treated as high-priority
* events, which can be processed * events, which can be processed
@@ -371,6 +371,8 @@ struct isc_nmsocket {
isc_nmsocket_t *parent; isc_nmsocket_t *parent;
/*% Listener socket this connection was accepted on */ /*% Listener socket this connection was accepted on */
isc_nmsocket_t *listener; isc_nmsocket_t *listener;
/*% Self, for self-contained unreferenced sockets (tcpdns) */
isc_nmsocket_t *self;
/*% /*%
* quota is the TCP client, attached when a TCP connection * quota is the TCP client, attached when a TCP connection
@@ -405,6 +407,7 @@ struct isc_nmsocket {
int nchildren; int nchildren;
isc_nmiface_t *iface; isc_nmiface_t *iface;
isc_nmhandle_t *tcphandle; isc_nmhandle_t *tcphandle;
isc_nmhandle_t *outerhandle;
/*% Extra data allocated at the end of each isc_nmhandle_t */ /*% Extra data allocated at the end of each isc_nmhandle_t */
size_t extrahandlesize; size_t extrahandlesize;
@@ -440,6 +443,8 @@ struct isc_nmsocket {
atomic_bool closed; atomic_bool closed;
atomic_bool listening; atomic_bool listening;
atomic_bool listen_error; atomic_bool listen_error;
atomic_bool connected;
atomic_bool connect_error;
isc_refcount_t references; isc_refcount_t references;
/*% /*%
@@ -589,6 +594,9 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
* *
* If 'local' is not NULL, set the handle's local address to 'local', * If 'local' is not NULL, set the handle's local address to 'local',
* otherwise set it to 'sock->iface->addr'. * otherwise set it to 'sock->iface->addr'.
*
* 'sock' will be attached to 'handle->sock'. The caller may need
* to detach the socket afterward.
*/ */
isc__nm_uvreq_t * isc__nm_uvreq_t *
@@ -615,6 +623,19 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
* and its interface to 'iface'. * and its interface to 'iface'.
*/ */
void
isc__nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target);
/*%<
* Attach to a socket, increasing refcount
*/
void
isc__nmsocket_detach(isc_nmsocket_t **socketp);
/*%<
* Detach from socket, decreasing refcount and possibly destroying the
* socket if it's no longer referenced.
*/
void void
isc__nmsocket_prep_destroy(isc_nmsocket_t *sock); isc__nmsocket_prep_destroy(isc_nmsocket_t *sock);
/*%< /*%<
@@ -694,7 +715,14 @@ isc__nm_tcp_resumeread(isc_nmsocket_t *sock);
void void
isc__nm_tcp_shutdown(isc_nmsocket_t *sock); isc__nm_tcp_shutdown(isc_nmsocket_t *sock);
/*%< /*%<
* Called on shutdown to close and clean up a listening TCP socket. * Called during the shutdown process to close and clean up connected
* sockets.
*/
void
isc__nm_tcp_cancelread(isc_nmsocket_t *sock);
/*%<
* Stop reading on a connected socket.
*/ */
void void

View File

@@ -417,9 +417,9 @@ isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_pause(mgr); isc_nm_pause(mgr);
isc_nm_resume(mgr); isc_nm_resume(mgr);
#ifdef WIN32 #ifdef WIN32
_sleep(1000); _sleep(10);
#else /* ifdef WIN32 */ #else /* ifdef WIN32 */
usleep(1000000); usleep(10000);
#endif /* ifdef WIN32 */ #endif /* ifdef WIN32 */
} }
@@ -686,7 +686,7 @@ isc__nmsocket_active(isc_nmsocket_t *sock) {
} }
void void
isc_nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target) { isc__nmsocket_attach(isc_nmsocket_t *sock, isc_nmsocket_t **target) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(target != NULL && *target == NULL); REQUIRE(target != NULL && *target == NULL);
@@ -736,9 +736,15 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]); isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
} }
if (sock->tcphandle != NULL) { sock->tcphandle = NULL;
isc_nmhandle_unref(sock->tcphandle);
sock->tcphandle = NULL; if (sock->outerhandle != NULL) {
isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
}
if (sock->outer != NULL) {
isc__nmsocket_detach(&sock->outer);
} }
while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) { while ((handle = isc_astack_pop(sock->inactivehandles)) != NULL) {
@@ -878,7 +884,7 @@ isc__nmsocket_prep_destroy(isc_nmsocket_t *sock) {
} }
void void
isc_nmsocket_detach(isc_nmsocket_t **sockp) { isc__nmsocket_detach(isc_nmsocket_t **sockp) {
REQUIRE(sockp != NULL && *sockp != NULL); REQUIRE(sockp != NULL && *sockp != NULL);
REQUIRE(VALID_NMSOCK(*sockp)); REQUIRE(VALID_NMSOCK(*sockp));
@@ -901,6 +907,17 @@ isc_nmsocket_detach(isc_nmsocket_t **sockp) {
} }
} }
void
isc_nmsocket_close(isc_nmsocket_t **sockp) {
REQUIRE(sockp != NULL);
REQUIRE(VALID_NMSOCK(*sockp));
REQUIRE((*sockp)->type == isc_nm_udplistener ||
(*sockp)->type == isc_nm_tcplistener ||
(*sockp)->type == isc_nm_tcpdnslistener);
isc__nmsocket_detach(sockp);
}
void void
isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
isc_nmiface_t *iface) { isc_nmiface_t *iface) {
@@ -1039,7 +1056,8 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
isc_refcount_increment0(&handle->references); isc_refcount_increment0(&handle->references);
} }
handle->sock = sock; isc__nmsocket_attach(sock, &handle->sock);
if (peer != NULL) { if (peer != NULL) {
memcpy(&handle->peer, peer, sizeof(isc_sockaddr_t)); memcpy(&handle->peer, peer, sizeof(isc_sockaddr_t));
} else { } else {
@@ -1122,6 +1140,9 @@ nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
static void static void
nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
size_t handlenum;
bool reuse = false;
/* /*
* We do all of this under lock to avoid races with socket * We do all of this under lock to avoid races with socket
* destruction. We have to do this now, because at this point the * destruction. We have to do this now, because at this point the
@@ -1134,10 +1155,9 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
INSIST(atomic_load(&sock->ah) > 0); INSIST(atomic_load(&sock->ah) > 0);
sock->ah_handles[handle->ah_pos] = NULL; sock->ah_handles[handle->ah_pos] = NULL;
size_t handlenum = atomic_fetch_sub(&sock->ah, 1) - 1; handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;
sock->ah_frees[handlenum] = handle->ah_pos; sock->ah_frees[handlenum] = handle->ah_pos;
handle->ah_pos = 0; handle->ah_pos = 0;
bool reuse = false;
if (atomic_load(&sock->active)) { if (atomic_load(&sock->active)) {
reuse = isc_astack_trypush(sock->inactivehandles, handle); reuse = isc_astack_trypush(sock->inactivehandles, handle);
} }
@@ -1149,7 +1169,7 @@ nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
void void
isc_nmhandle_unref(isc_nmhandle_t *handle) { isc_nmhandle_unref(isc_nmhandle_t *handle) {
isc_nmsocket_t *sock = NULL, *tmp = NULL; isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMHANDLE(handle));
@@ -1166,12 +1186,6 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
handle->doreset(handle->opaque); handle->doreset(handle->opaque);
} }
/*
* Temporarily reference the socket to ensure that it can't
* be deleted by another thread while we're deactivating the
* handle.
*/
isc_nmsocket_attach(sock, &tmp);
nmhandle_deactivate(sock, handle); nmhandle_deactivate(sock, handle);
/* /*
@@ -1189,13 +1203,13 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
* The socket will be finally detached by the closecb * The socket will be finally detached by the closecb
* event handler. * event handler.
*/ */
isc_nmsocket_attach(sock, &event->sock); isc__nmsocket_attach(sock, &event->sock);
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)event); (isc__netievent_t *)event);
} }
} }
isc_nmsocket_detach(&tmp); isc__nmsocket_detach(&sock);
} }
void * void *
@@ -1262,7 +1276,7 @@ isc__nm_uvreq_get(isc_nm_t *mgr, isc_nmsocket_t *sock) {
*req = (isc__nm_uvreq_t){ .magic = 0 }; *req = (isc__nm_uvreq_t){ .magic = 0 };
req->uv_req.req.data = req; req->uv_req.req.data = req;
isc_nmsocket_attach(sock, &req->sock); isc__nmsocket_attach(sock, &req->sock);
req->magic = UVREQ_MAGIC; req->magic = UVREQ_MAGIC;
return (req); return (req);
@@ -1299,7 +1313,7 @@ isc__nm_uvreq_put(isc__nm_uvreq_t **req0, isc_nmsocket_t *sock) {
isc_nmhandle_unref(handle); isc_nmhandle_unref(handle);
} }
isc_nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
} }
isc_result_t isc_result_t
@@ -1334,9 +1348,24 @@ isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
} }
} }
void
isc_nm_cancelread(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle));
switch (handle->sock->type) {
case isc_nm_tcpsocket:
isc__nm_tcp_cancelread(handle->sock);
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
}
isc_result_t isc_result_t
isc_nm_pauseread(isc_nmsocket_t *sock) { isc_nm_pauseread(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
switch (sock->type) { switch (sock->type) {
case isc_nm_tcpsocket: case isc_nm_tcpsocket:
return (isc__nm_tcp_pauseread(sock)); return (isc__nm_tcp_pauseread(sock));
@@ -1349,6 +1378,7 @@ isc_nm_pauseread(isc_nmsocket_t *sock) {
isc_result_t isc_result_t
isc_nm_resumeread(isc_nmsocket_t *sock) { isc_nm_resumeread(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
switch (sock->type) { switch (sock->type) {
case isc_nm_tcpsocket: case isc_nm_tcpsocket:
return (isc__nm_tcp_resumeread(sock)); return (isc__nm_tcp_resumeread(sock));
@@ -1361,6 +1391,7 @@ isc_nm_resumeread(isc_nmsocket_t *sock) {
void void
isc_nm_stoplistening(isc_nmsocket_t *sock) { isc_nm_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
switch (sock->type) { switch (sock->type) {
case isc_nm_udplistener: case isc_nm_udplistener:
isc__nm_udp_stoplistening(sock); isc__nm_udp_stoplistening(sock);
@@ -1388,7 +1419,7 @@ isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(worker); UNUSED(worker);
ievent->sock->closehandle_cb(ievent->sock); ievent->sock->closehandle_cb(ievent->sock);
isc_nmsocket_detach(&ievent->sock); isc__nmsocket_detach(&ievent->sock);
} }
static void static void

View File

@@ -88,6 +88,10 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
/* Socket was never opened; no need for tcp_close_direct() */
atomic_store(&sock->closed, true);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
return (r); return (r);
} }
@@ -96,13 +100,23 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_BINDFAIL]); sock->statsindex[STATID_BINDFAIL]);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
tcp_close_direct(sock); tcp_close_direct(sock);
return (r); return (r);
} }
} }
uv_handle_set_data(&sock->uv_handle.handle, sock); uv_handle_set_data(&sock->uv_handle.handle, sock);
r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp, r = uv_tcp_connect(&req->uv_req.connect, &sock->uv_handle.tcp,
&req->peer.type.sa, tcp_connect_cb); &req->peer.type.sa, tcp_connect_cb);
if (r != 0) {
isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_CONNECTFAIL]);
sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->connect_error, true);
tcp_close_direct(sock);
}
return (r); return (r);
} }
@@ -114,29 +128,38 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__nm_uvreq_t *req = ievent->req; isc__nm_uvreq_t *req = ievent->req;
int r; int r;
REQUIRE(sock->type == isc_nm_tcpsocket); UNUSED(worker);
REQUIRE(worker->id == ievent->req->sock->mgr->workers[isc_nm_tid()].id);
r = tcp_connect_direct(sock, req); r = tcp_connect_direct(sock, req);
if (r != 0) { if (r != 0) {
/* We need to issue callbacks ourselves */ /* We need to issue callbacks ourselves */
tcp_connect_cb(&req->uv_req.connect, r); tcp_connect_cb(&req->uv_req.connect, r);
goto done;
} }
atomic_store(&sock->connected, true);
done:
LOCK(&sock->lock);
SIGNAL(&sock->cond);
UNLOCK(&sock->lock);
} }
static void static void
tcp_connect_cb(uv_connect_t *uvreq, int status) { tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data; isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
isc_nmsocket_t *sock = NULL; isc_nmsocket_t *sock = NULL;
sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
REQUIRE(VALID_UVREQ(req)); REQUIRE(VALID_UVREQ(req));
if (status == 0) { if (status == 0) {
isc_result_t result; isc_result_t result;
isc_nmhandle_t *handle = NULL;
struct sockaddr_storage ss; struct sockaddr_storage ss;
isc_nmhandle_t *handle = NULL;
sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss, uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
&(int){ sizeof(ss) }); &(int){ sizeof(ss) });
@@ -146,17 +169,77 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
handle = isc__nmhandle_get(sock, NULL, NULL); handle = isc__nmhandle_get(sock, NULL, NULL);
req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
isc__nm_uvreq_put(&req, sock);
/*
* The sock is now attached to the handle.
*/
isc__nmsocket_detach(&sock);
/*
* If the connect callback wants to hold on to the handle,
* it needs to attach to it.
*/
isc_nmhandle_unref(handle);
} else { } else {
/* /*
* TODO: * TODO:
* Handle the connect error properly and free the socket. * Handle the connect error properly and free the socket.
*/ */
isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_CONNECTFAIL]);
req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg); req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
isc__nm_uvreq_put(&req, sock);
}
}
isc_result_t
isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, size_t extrahandlesize) {
isc_nmsocket_t *nsock = NULL;
isc__netievent_tcpconnect_t *ievent = NULL;
isc__nm_uvreq_t *req = NULL;
REQUIRE(VALID_NM(mgr));
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
nsock->extrahandlesize = extrahandlesize;
nsock->result = ISC_R_SUCCESS;
req = isc__nm_uvreq_get(mgr, nsock);
req->cb.connect = cb;
req->cbarg = cbarg;
req->peer = peer->addr;
ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
ievent->sock = nsock;
ievent->req = req;
if (isc__nm_in_netthread()) {
nsock->tid = isc_nm_tid();
isc__nm_async_tcpconnect(&mgr->workers[nsock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(mgr, ievent);
} else {
nsock->tid = isc_random_uniform(mgr->nworkers);
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid],
(isc__netievent_t *)ievent);
LOCK(&nsock->lock);
while (!atomic_load(&nsock->connected) &&
!atomic_load(&nsock->connect_error)) {
WAIT(&nsock->cond, &nsock->lock);
}
UNLOCK(&nsock->lock);
} }
isc__nm_uvreq_put(&req, sock); if (nsock->result != ISC_R_SUCCESS) {
isc_result_t result = nsock->result;
isc__nmsocket_detach(&nsock);
return (result);
}
return (ISC_R_SUCCESS);
} }
isc_result_t isc_result_t
@@ -170,8 +253,8 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface);
nsock->rcb.accept = cb; nsock->accept_cb.accept = cb;
nsock->rcbarg = cbarg; nsock->accept_cbarg = cbarg;
nsock->extrahandlesize = extrahandlesize; nsock->extrahandlesize = extrahandlesize;
nsock->backlog = backlog; nsock->backlog = backlog;
nsock->result = ISC_R_SUCCESS; nsock->result = ISC_R_SUCCESS;
@@ -209,7 +292,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb,
return (ISC_R_SUCCESS); return (ISC_R_SUCCESS);
} else { } else {
isc_result_t result = nsock->result; isc_result_t result = nsock->result;
isc_nmsocket_detach(&nsock); isc__nmsocket_detach(&nsock);
return (result); return (result);
} }
} }
@@ -232,8 +315,8 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp);
if (r != 0) { if (r != 0) {
/* It was never opened */
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
/* The socket was never opened, so no need for uv_close() */
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
sock->result = isc__nm_uverr2result(r); sock->result = isc__nm_uverr2result(r);
atomic_store(&sock->listen_error, true); atomic_store(&sock->listen_error, true);
@@ -379,14 +462,24 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
goto error; goto error;
} }
isc_nmsocket_attach(ssock, &csock->server); isc__nmsocket_attach(ssock, &csock->server);
handle = isc__nmhandle_get(csock, NULL, &local); handle = isc__nmhandle_get(csock, NULL, &local);
INSIST(ssock->rcb.accept != NULL); INSIST(ssock->accept_cb.accept != NULL);
csock->read_timeout = ssock->mgr->init; csock->read_timeout = ssock->mgr->init;
ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg); ssock->accept_cb.accept(handle, ISC_R_SUCCESS, ssock->accept_cbarg);
isc_nmsocket_detach(&csock);
/*
* csock is now attached to the handle.
*/
isc__nmsocket_detach(&csock);
/*
* If the accept callback wants to hold on to the handle,
* it needs to attach to it.
*/
isc_nmhandle_unref(handle);
return; return;
error: error:
@@ -405,7 +498,7 @@ error:
/* /*
* Detach the socket properly to make sure uv_close() is called. * Detach the socket properly to make sure uv_close() is called.
*/ */
isc_nmsocket_detach(&csock); isc__nmsocket_detach(&csock);
} }
void void
@@ -416,7 +509,7 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
REQUIRE(!isc__nm_in_netthread()); REQUIRE(!isc__nm_in_netthread());
ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop); ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpstop);
isc_nmsocket_attach(sock, &ievent->sock); isc__nmsocket_attach(sock, &ievent->sock);
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent); (isc__netievent_t *)ievent);
} }
@@ -462,7 +555,7 @@ tcp_listenclose_cb(uv_handle_t *handle) {
sock->pquota = NULL; sock->pquota = NULL;
UNLOCK(&sock->lock); UNLOCK(&sock->lock);
isc_nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
} }
static void static void
@@ -488,7 +581,10 @@ readtimeout_cb(uv_timer_t *handle) {
if (sock->quota) { if (sock->quota) {
isc_quota_detach(&sock->quota); isc_quota_detach(&sock->quota);
} }
sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); if (sock->rcb.recv != NULL) {
sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL,
sock->rcbarg);
}
} }
isc_result_t isc_result_t
@@ -621,7 +717,8 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
.length = nread }; .length = nread };
if (sock->rcb.recv != NULL) { if (sock->rcb.recv != NULL) {
sock->rcb.recv(sock->tcphandle, &region, sock->rcbarg); sock->rcb.recv(sock->tcphandle, ISC_R_SUCCESS, &region,
sock->rcbarg);
} }
sock->read_timeout = (atomic_load(&sock->keepalive) sock->read_timeout = (atomic_load(&sock->keepalive)
@@ -646,7 +743,7 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
*/ */
if (sock->rcb.recv != NULL) { if (sock->rcb.recv != NULL) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg);
} }
/* /*
@@ -697,7 +794,7 @@ isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
/* /*
* The socket was attached just before we called isc_quota_attach_cb(). * The socket was attached just before we called isc_quota_attach_cb().
*/ */
isc_nmsocket_detach(&ievent->sock); isc__nmsocket_detach(&ievent->sock);
} }
/* /*
@@ -741,7 +838,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
* we need to - but we risk a race then.) * we need to - but we risk a race then.)
*/ */
isc_nmsocket_t *tsock = NULL; isc_nmsocket_t *tsock = NULL;
isc_nmsocket_attach(ssock, &tsock); isc__nmsocket_attach(ssock, &tsock);
result = isc_quota_attach_cb(ssock->pquota, &quota, result = isc_quota_attach_cb(ssock->pquota, &quota,
&ssock->quotacb); &ssock->quotacb);
if (result == ISC_R_QUOTA) { if (result == ISC_R_QUOTA) {
@@ -754,7 +851,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) {
* We're under quota, so there's no need to wait; * We're under quota, so there's no need to wait;
* Detach the socket. * Detach the socket.
*/ */
isc_nmsocket_detach(&tsock); isc__nmsocket_detach(&tsock);
} }
isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]); isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]);
@@ -841,6 +938,7 @@ static void
tcp_send_cb(uv_write_t *req, int status) { tcp_send_cb(uv_write_t *req, int status) {
isc_result_t result = ISC_R_SUCCESS; isc_result_t result = ISC_R_SUCCESS;
isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data;
isc_nmsocket_t *sock = NULL;
REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(VALID_NMHANDLE(uvreq->handle));
@@ -852,8 +950,10 @@ tcp_send_cb(uv_write_t *req, int status) {
} }
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); uvreq->cb.send(uvreq->handle, result, uvreq->cbarg);
sock = uvreq->handle->sock;
isc_nmhandle_unref(uvreq->handle); isc_nmhandle_unref(uvreq->handle);
isc__nm_uvreq_put(&uvreq, uvreq->handle->sock); isc__nm_uvreq_put(&uvreq, sock);
} }
/* /*
@@ -906,6 +1006,7 @@ tcp_close_cb(uv_handle_t *uvhandle) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
atomic_store(&sock->connected, false);
isc__nmsocket_prep_destroy(sock); isc__nmsocket_prep_destroy(sock);
} }
@@ -915,7 +1016,9 @@ timer_close_cb(uv_handle_t *uvhandle) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
isc_nmsocket_detach(&sock->server); if (sock->server != NULL) {
isc__nmsocket_detach(&sock->server);
}
uv_close(&sock->uv_handle.handle, tcp_close_cb); uv_close(&sock->uv_handle.handle, tcp_close_cb);
} }
@@ -933,7 +1036,7 @@ tcp_close_direct(isc_nmsocket_t *sock) {
uv_close((uv_handle_t *)&sock->timer, timer_close_cb); uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
} else { } else {
if (sock->server != NULL) { if (sock->server != NULL) {
isc_nmsocket_detach(&sock->server); isc__nmsocket_detach(&sock->server);
} }
uv_close(&sock->uv_handle.handle, tcp_close_cb); uv_close(&sock->uv_handle.handle, tcp_close_cb);
} }
@@ -975,6 +1078,19 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL && if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
sock->rcb.recv != NULL) sock->rcb.recv != NULL)
{ {
sock->rcb.recv(sock->tcphandle, NULL, sock->rcbarg); sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
sock->rcbarg);
}
}
void
isc__nm_tcp_cancelread(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
sock->rcb.recv != NULL)
{
sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
sock->rcbarg);
} }
} }

View File

@@ -38,7 +38,8 @@
*/ */
static void static void
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg); dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *arg);
static void static void
resume_processing(void *arg); resume_processing(void *arg);
@@ -82,7 +83,8 @@ static void
timer_close_cb(uv_handle_t *handle) { timer_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = (isc_nmsocket_t *)uv_handle_get_data(handle); isc_nmsocket_t *sock = (isc_nmsocket_t *)uv_handle_get_data(handle);
INSIST(VALID_NMSOCK(sock)); INSIST(VALID_NMSOCK(sock));
isc_nmsocket_detach(&sock); atomic_store(&sock->closed, true);
tcpdns_close_direct(sock);
} }
static void static void
@@ -92,7 +94,9 @@ dnstcp_readtimeout(uv_timer_t *timer) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
tcpdns_close_direct(sock); /* Close the TCP connection, it's closing should fire 'our' closing */
isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
} }
/* /*
@@ -122,8 +126,13 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
handle->sock->iface); handle->sock->iface);
dnssock->extrahandlesize = dnslistensock->extrahandlesize; dnssock->extrahandlesize = dnslistensock->extrahandlesize;
isc_nmsocket_attach(dnslistensock, &dnssock->listener); isc__nmsocket_attach(dnslistensock, &dnssock->listener);
isc_nmsocket_attach(handle->sock, &dnssock->outer);
isc__nmsocket_attach(dnssock, &dnssock->self);
dnssock->outerhandle = handle;
isc_nmhandle_ref(dnssock->outerhandle);
dnssock->peer = handle->sock->peer; dnssock->peer = handle->sock->peer;
dnssock->read_timeout = handle->sock->mgr->init; dnssock->read_timeout = handle->sock->mgr->init;
dnssock->tid = isc_nm_tid(); dnssock->tid = isc_nm_tid();
@@ -135,8 +144,12 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
dnssock->timer_initialized = true; dnssock->timer_initialized = true;
uv_timer_start(&dnssock->timer, dnstcp_readtimeout, uv_timer_start(&dnssock->timer, dnstcp_readtimeout,
dnssock->read_timeout, 0); dnssock->read_timeout, 0);
isc_nmhandle_ref(handle);
isc_nm_read(handle, dnslisten_readcb, dnssock); result = isc_nm_read(handle, dnslisten_readcb, dnssock);
if (result != ISC_R_SUCCESS) {
isc_nmhandle_unref(handle);
}
isc__nmsocket_detach(&dnssock);
} }
/* /*
@@ -175,7 +188,7 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
if (listener != NULL && listener->rcb.recv != NULL) { if (listener != NULL && listener->rcb.recv != NULL) {
listener->rcb.recv( listener->rcb.recv(
dnshandle, dnshandle, ISC_R_SUCCESS,
&(isc_region_t){ .base = dnssock->buf + 2, &(isc_region_t){ .base = dnssock->buf + 2,
.length = len }, .length = len },
listener->rcbarg); listener->rcbarg);
@@ -200,7 +213,8 @@ processbuffer(isc_nmsocket_t *dnssock, isc_nmhandle_t **handlep) {
* a complete DNS packet and, if so - call the callback * a complete DNS packet and, if so - call the callback
*/ */
static void static void
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { dnslisten_readcb(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *arg) {
isc_nmsocket_t *dnssock = (isc_nmsocket_t *)arg; isc_nmsocket_t *dnssock = (isc_nmsocket_t *)arg;
unsigned char *base = NULL; unsigned char *base = NULL;
bool done = false; bool done = false;
@@ -210,9 +224,13 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(dnssock->tid == isc_nm_tid()); REQUIRE(dnssock->tid == isc_nm_tid());
if (region == NULL) { if (region == NULL || eresult != ISC_R_SUCCESS) {
/* Connection closed */ /* Connection closed */
isc__nm_tcpdns_close(dnssock); isc_nmhandle_unref(handle);
dnssock->result = eresult;
if (dnssock->self != NULL) {
isc__nmsocket_detach(&dnssock->self);
}
return; return;
} }
@@ -244,7 +262,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
/* /*
* We have a packet: stop timeout timers * We have a packet: stop timeout timers
*/ */
atomic_store(&dnssock->outer->processing, true); atomic_store(&dnssock->outerhandle->sock->processing, true);
if (dnssock->timer_initialized) { if (dnssock->timer_initialized) {
uv_timer_stop(&dnssock->timer); uv_timer_stop(&dnssock->timer);
} }
@@ -255,7 +273,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
* one packet, so we're done until the next read * one packet, so we're done until the next read
* completes. * completes.
*/ */
isc_nm_pauseread(dnssock->outer); isc_nm_pauseread(dnssock->outerhandle->sock);
done = true; done = true;
} else { } else {
/* /*
@@ -267,7 +285,7 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
*/ */
if (atomic_load(&dnssock->ah) >= if (atomic_load(&dnssock->ah) >=
TCPDNS_CLIENTS_PER_CONN) { TCPDNS_CLIENTS_PER_CONN) {
isc_nm_pauseread(dnssock->outer); isc_nm_pauseread(dnssock->outerhandle->sock);
done = true; done = true;
} }
} }
@@ -310,7 +328,7 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
return (ISC_R_SUCCESS); return (ISC_R_SUCCESS);
} else { } else {
atomic_store(&dnslistensock->closed, true); atomic_store(&dnslistensock->closed, true);
isc_nmsocket_detach(&dnslistensock); isc__nmsocket_detach(&dnslistensock);
return (result); return (result);
} }
} }
@@ -326,8 +344,8 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) {
sock->rcbarg = NULL; sock->rcbarg = NULL;
if (sock->outer != NULL) { if (sock->outer != NULL) {
isc_nm_stoplistening(sock->outer); isc__nm_tcp_stoplistening(sock->outer);
isc_nmsocket_detach(&sock->outer); isc__nmsocket_detach(&sock->outer);
} }
} }
@@ -336,7 +354,8 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMHANDLE(handle));
if (handle->sock->type != isc_nm_tcpdnssocket || if (handle->sock->type != isc_nm_tcpdnssocket ||
handle->sock->outer == NULL) { handle->sock->outerhandle == NULL)
{
return; return;
} }
@@ -348,7 +367,7 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
* closehandle_cb callback, called whenever a handle * closehandle_cb callback, called whenever a handle
* is released. * is released.
*/ */
isc_nm_pauseread(handle->sock->outer); isc_nm_pauseread(handle->sock->outerhandle->sock);
atomic_store(&handle->sock->sequential, true); atomic_store(&handle->sock->sequential, true);
} }
@@ -357,12 +376,13 @@ isc_nm_tcpdns_keepalive(isc_nmhandle_t *handle) {
REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMHANDLE(handle));
if (handle->sock->type != isc_nm_tcpdnssocket || if (handle->sock->type != isc_nm_tcpdnssocket ||
handle->sock->outer == NULL) { handle->sock->outerhandle == NULL)
{
return; return;
} }
atomic_store(&handle->sock->keepalive, true); atomic_store(&handle->sock->keepalive, true);
atomic_store(&handle->sock->outer->keepalive, true); atomic_store(&handle->sock->outerhandle->sock->keepalive, true);
} }
typedef struct tcpsend { typedef struct tcpsend {
@@ -382,13 +402,13 @@ resume_processing(void *arg) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) { if (sock->type != isc_nm_tcpdnssocket || sock->outerhandle == NULL) {
return; return;
} }
if (atomic_load(&sock->ah) == 0) { if (atomic_load(&sock->ah) == 0) {
/* Nothing is active; sockets can timeout now */ /* Nothing is active; sockets can timeout now */
atomic_store(&sock->outer->processing, false); atomic_store(&sock->outerhandle->sock->processing, false);
if (sock->timer_initialized) { if (sock->timer_initialized) {
uv_timer_start(&sock->timer, dnstcp_readtimeout, uv_timer_start(&sock->timer, dnstcp_readtimeout,
sock->read_timeout, 0); sock->read_timeout, 0);
@@ -404,13 +424,14 @@ resume_processing(void *arg) {
result = processbuffer(sock, &handle); result = processbuffer(sock, &handle);
if (result == ISC_R_SUCCESS) { if (result == ISC_R_SUCCESS) {
atomic_store(&sock->outer->processing, true); atomic_store(&sock->outerhandle->sock->processing,
true);
if (sock->timer_initialized) { if (sock->timer_initialized) {
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
} }
isc_nmhandle_unref(handle); isc_nmhandle_unref(handle);
} else if (sock->outer != NULL) { } else if (sock->outerhandle != NULL) {
isc_nm_resumeread(sock->outer); isc_nm_resumeread(sock->outerhandle->sock);
} }
return; return;
@@ -428,8 +449,8 @@ resume_processing(void *arg) {
/* /*
* Nothing in the buffer; resume reading. * Nothing in the buffer; resume reading.
*/ */
if (sock->outer != NULL) { if (sock->outerhandle != NULL) {
isc_nm_resumeread(sock->outer); isc_nm_resumeread(sock->outerhandle->sock);
} }
break; break;
@@ -438,7 +459,7 @@ resume_processing(void *arg) {
if (sock->timer_initialized) { if (sock->timer_initialized) {
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
} }
atomic_store(&sock->outer->processing, true); atomic_store(&sock->outerhandle->sock->processing, true);
isc_nmhandle_unref(dnshandle); isc_nmhandle_unref(dnshandle);
} while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN); } while (atomic_load(&sock->ah) < TCPDNS_CLIENTS_PER_CONN);
} }
@@ -447,13 +468,13 @@ static void
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
tcpsend_t *ts = (tcpsend_t *)cbarg; tcpsend_t *ts = (tcpsend_t *)cbarg;
UNUSED(handle);
ts->cb(ts->orighandle, result, ts->cbarg); ts->cb(ts->orighandle, result, ts->cbarg);
isc_mem_put(ts->mctx, ts->region.base, ts->region.length); isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
isc_nmhandle_unref(ts->orighandle); isc_nmhandle_unref(ts->orighandle);
isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts)); isc_mem_putanddetach(&ts->mctx, ts, sizeof(*ts));
isc_nmhandle_unref(handle);
} }
/* /*
@@ -471,7 +492,7 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnssocket); REQUIRE(sock->type == isc_nm_tcpdnssocket);
if (sock->outer == NULL) { if (sock->outerhandle == NULL) {
/* The socket is closed */ /* The socket is closed */
return (ISC_R_NOTCONNECTED); return (ISC_R_NOTCONNECTED);
} }
@@ -480,12 +501,13 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
*t = (tcpsend_t){ *t = (tcpsend_t){
.cb = cb, .cb = cb,
.cbarg = cbarg, .cbarg = cbarg,
.handle = handle->sock->outer->tcphandle, .handle = handle->sock->outerhandle,
}; };
isc_mem_attach(sock->mgr->mctx, &t->mctx); isc_mem_attach(sock->mgr->mctx, &t->mctx);
t->orighandle = handle; t->orighandle = handle;
isc_nmhandle_ref(t->orighandle); isc_nmhandle_ref(t->orighandle);
isc_nmhandle_ref(t->handle);
t->region = (isc_region_t){ .base = isc_mem_get(t->mctx, t->region = (isc_region_t){ .base = isc_mem_get(t->mctx,
region->length + 2), region->length + 2),
@@ -500,6 +522,7 @@ isc__nm_tcpdns_send(isc_nmhandle_t *handle, isc_region_t *region,
static void static void
tcpdns_close_direct(isc_nmsocket_t *sock) { tcpdns_close_direct(isc_nmsocket_t *sock) {
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
/* We don't need atomics here, it's all in single network thread */ /* We don't need atomics here, it's all in single network thread */
if (sock->timer_initialized) { if (sock->timer_initialized) {
/* /*
@@ -510,19 +533,23 @@ tcpdns_close_direct(isc_nmsocket_t *sock) {
sock->timer_initialized = false; sock->timer_initialized = false;
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
uv_close((uv_handle_t *)&sock->timer, timer_close_cb); uv_close((uv_handle_t *)&sock->timer, timer_close_cb);
} else if (sock->self != NULL) {
isc__nmsocket_detach(&sock->self);
} else { } else {
/* /*
* At this point we're certain that there are no external * At this point we're certain that there are no external
* references, we can close everything. * references, we can close everything.
*/ */
if (sock->outer != NULL) { if (sock->outerhandle != NULL) {
sock->outer->rcb.recv = NULL; sock->outerhandle->sock->rcb.recv = NULL;
isc_nmsocket_detach(&sock->outer); isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
} }
if (sock->listener != NULL) { if (sock->listener != NULL) {
isc_nmsocket_detach(&sock->listener); isc__nmsocket_detach(&sock->listener);
} }
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
} }
} }

View File

@@ -152,7 +152,8 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
#endif #endif
uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags); uv_udp_init_ex(&worker->loop, &sock->uv_handle.udp, uv_init_flags);
uv_handle_set_data(&sock->uv_handle.handle, NULL); uv_handle_set_data(&sock->uv_handle.handle, NULL);
isc_nmsocket_attach(sock, (isc_nmsocket_t **)&sock->uv_handle.udp.data); isc__nmsocket_attach(sock,
(isc_nmsocket_t **)&sock->uv_handle.udp.data);
r = uv_udp_open(&sock->uv_handle.udp, sock->fd); r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
if (r == 0) { if (r == 0) {
@@ -186,7 +187,7 @@ udp_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle); isc_nmsocket_t *sock = uv_handle_get_data(handle);
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
isc_nmsocket_detach((isc_nmsocket_t **)&sock->uv_handle.udp.data); isc__nmsocket_detach((isc_nmsocket_t **)&sock->uv_handle.udp.data);
} }
static void static void
@@ -316,12 +317,17 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
isc_result_t result; isc_result_t result;
isc_nmhandle_t *nmhandle = NULL; isc_nmhandle_t *nmhandle = NULL;
isc_sockaddr_t sockaddr; isc_sockaddr_t sockaddr;
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_nmsocket_t *sock = NULL;
isc_region_t region; isc_region_t region;
uint32_t maxudp; uint32_t maxudp;
bool free_buf = true; bool free_buf = true;
REQUIRE(VALID_NMSOCK(sock)); /*
* Even though destruction of the socket can only happen from the
* network thread that we're in, we still attach to the socket here
* to ensure it won't be destroyed by the recv callback.
*/
isc__nmsocket_attach(uv_handle_get_data((uv_handle_t *)handle), &sock);
#ifdef UV_UDP_MMSG_CHUNK #ifdef UV_UDP_MMSG_CHUNK
free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0); free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0);
@@ -337,6 +343,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
if (free_buf) { if (free_buf) {
isc__nm_free_uvbuf(sock, buf); isc__nm_free_uvbuf(sock, buf);
} }
isc__nmsocket_detach(&sock);
return; return;
} }
@@ -346,6 +353,7 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
*/ */
maxudp = atomic_load(&sock->mgr->maxudp); maxudp = atomic_load(&sock->mgr->maxudp);
if (maxudp != 0 && (uint32_t)nrecv > maxudp) { if (maxudp != 0 && (uint32_t)nrecv > maxudp) {
isc__nmsocket_detach(&sock);
return; return;
} }
@@ -356,11 +364,16 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
region.length = nrecv; region.length = nrecv;
INSIST(sock->rcb.recv != NULL); INSIST(sock->rcb.recv != NULL);
sock->rcb.recv(nmhandle, &region, sock->rcbarg); sock->rcb.recv(nmhandle, ISC_R_SUCCESS, &region, sock->rcbarg);
if (free_buf) { if (free_buf) {
isc__nm_free_uvbuf(sock, buf); isc__nm_free_uvbuf(sock, buf);
} }
/*
* The sock is now attached to the handle, we can detach our ref.
*/
isc__nmsocket_detach(&sock);
/* /*
* If the recv callback wants to hold on to the handle, * If the recv callback wants to hold on to the handle,
* it needs to attach to it. * it needs to attach to it.

View File

@@ -449,6 +449,7 @@ isc_nmhandle_peeraddr
isc_nmhandle_ref isc_nmhandle_ref
isc_nmhandle_setdata isc_nmhandle_setdata
isc_nmhandle_unref isc_nmhandle_unref
isc_nm_cancelread
isc_nm_closedown isc_nm_closedown
isc_nm_destroy isc_nm_destroy
isc_nm_detach isc_nm_detach
@@ -459,12 +460,13 @@ isc_nm_send
isc_nm_setstats isc_nm_setstats
isc_nm_start isc_nm_start
isc_nm_stoplistening isc_nm_stoplistening
isc_nm_tcpconnect
isc_nm_tcp_gettimeouts isc_nm_tcp_gettimeouts
isc_nm_tcp_settimeouts isc_nm_tcp_settimeouts
isc_nm_tcpdns_keepalive isc_nm_tcpdns_keepalive
isc_nm_tcpdns_sequential isc_nm_tcpdns_sequential
isc_nm_tid isc_nm_tid
isc_nmsocket_detach isc_nmsocket_close
isc__nm_acquire_interlocked isc__nm_acquire_interlocked
isc__nm_drop_interlocked isc__nm_drop_interlocked
isc__nm_acquire_interlocked_force isc__nm_acquire_interlocked_force

View File

@@ -1620,7 +1620,8 @@ ns__client_put_cb(void *client0) {
* or tcpmsg (TCP case). * or tcpmsg (TCP case).
*/ */
void void
ns__client_request(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { ns__client_request(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *arg) {
ns_client_t *client; ns_client_t *client;
bool newclient = false; bool newclient = false;
ns_clientmgr_t *mgr; ns_clientmgr_t *mgr;
@@ -1644,6 +1645,8 @@ ns__client_request(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
#endif /* ifdef HAVE_DNSTAP */ #endif /* ifdef HAVE_DNSTAP */
ifp = (ns_interface_t *)arg; ifp = (ns_interface_t *)arg;
UNUSED(eresult);
mgr = ifp->clientmgr; mgr = ifp->clientmgr;
if (mgr == NULL) { if (mgr == NULL) {
/* The interface was shut down in the meantime, just bail */ /* The interface was shut down in the meantime, just bail */

View File

@@ -468,7 +468,8 @@ ns_client_addopt(ns_client_t *client, dns_message_t *message,
*/ */
void void
ns__client_request(isc_nmhandle_t *handle, isc_region_t *region, void *arg); ns__client_request(isc_nmhandle_t *handle, isc_result_t eresult,
isc_region_t *region, void *arg);
/*%< /*%<
* Handle client requests. * Handle client requests.

View File

@@ -553,11 +553,11 @@ void
ns_interface_shutdown(ns_interface_t *ifp) { ns_interface_shutdown(ns_interface_t *ifp) {
if (ifp->udplistensocket != NULL) { if (ifp->udplistensocket != NULL) {
isc_nm_stoplistening(ifp->udplistensocket); isc_nm_stoplistening(ifp->udplistensocket);
isc_nmsocket_detach(&ifp->udplistensocket); isc_nmsocket_close(&ifp->udplistensocket);
} }
if (ifp->tcplistensocket != NULL) { if (ifp->tcplistensocket != NULL) {
isc_nm_stoplistening(ifp->tcplistensocket); isc_nm_stoplistening(ifp->tcplistensocket);
isc_nmsocket_detach(&ifp->tcplistensocket); isc_nmsocket_close(&ifp->tcplistensocket);
} }
if (ifp->clientmgr != NULL) { if (ifp->clientmgr != NULL) {
ns_clientmgr_destroy(&ifp->clientmgr); ns_clientmgr_destroy(&ifp->clientmgr);