diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index e68bed759c..1696f3cb8c 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -95,7 +95,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle); void * isc_nmhandle_getextra(isc_nmhandle_t *handle); -typedef void (*isc_nm_opaquecb)(void *arg); +typedef void (*isc_nm_opaquecb_t)(void *arg); bool isc_nmhandle_is_stream(isc_nmhandle_t *handle); @@ -109,7 +109,7 @@ isc_nmhandle_is_stream(isc_nmhandle_t *handle); */ void isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, - isc_nm_opaquecb doreset, isc_nm_opaquecb dofree); + isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree); isc_sockaddr_t isc_nmhandle_peeraddr(isc_nmhandle_t *handle); @@ -273,7 +273,17 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle); * Disable pipelining on this connection. Each DNS packet * will be only processed after the previous completes. * - * This cannot be reversed once set for a given connection + * The socket must be unpaused after the query is processed. + * This is done the response is sent, or if we're dropping the + * query, it will be done when a handle is fully dereferenced + * by calling the socket's closehandle_cb callback. + * + * Note: This can only be run while a message is being processed; + * if it is run before any messages are read, no messages will + * be read. + * + * Also note: once this has been set, it cannot be reversed for a + * given connection. */ void diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index b95be9873b..c1edf1ca9b 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -89,8 +89,8 @@ struct isc_nmhandle { isc_sockaddr_t peer; isc_sockaddr_t local; - isc_nm_opaquecb doreset; /* reset extra callback, external */ - isc_nm_opaquecb dofree; /* free extra callback, external */ + isc_nm_opaquecb_t doreset; /* reset extra callback, external */ + isc_nm_opaquecb_t dofree; /* free extra callback, external */ void * opaque; char extra[]; }; @@ -312,15 +312,28 @@ struct isc_nmsocket { isc_refcount_t references; /*% - * TCPDNS socket is not pipelining. + * TCPDNS socket has been set not to pipeliine. */ atomic_bool sequential; + + /*% + * TCPDNS socket has exceeded the maximum number of + * simultaneous requests per connecton, so will be temporarily + * restricted from pipelining. + */ + atomic_bool overlimit; + /*% * TCPDNS socket in sequential mode is currently processing a packet, * we need to wait until it finishes. */ atomic_bool processing; + /*% + * A TCP socket has had isc_nm_pauseread() called. + */ + atomic_bool readpaused; + /*% * 'spare' handles for that can be reused to avoid allocations, * for UDP. @@ -334,24 +347,26 @@ struct isc_nmsocket { /*% * List of active handles. - * ah_size - size of ah_frees and ah_handles - * ah_cpos - current position in ah_frees; - * ah_handles - array of *handles. + * ah - current position in 'ah_frees'; this represents the + * current number of active handles; + * ah_size - size of the 'ah_frees' and 'ah_handles' arrays + * ah_handles - array pointers to active handles + * * Adding a handle - * - if ah_cpos == ah_size, realloc - * - x = ah_frees[ah_cpos] - * - ah_frees[ah_cpos++] = 0; + * - if ah == ah_size, reallocate + * - x = ah_frees[ah] + * - ah_frees[ah++] = 0; * - ah_handles[x] = handle * - x must be stored with the handle! * Removing a handle: - * - ah_frees[--ah_cpos] = x + * - ah_frees[--ah] = x * - ah_handles[x] = NULL; * - * XXXWPK for now this is locked with socket->lock, but we might want - * to change it to something lockless + * XXXWPK for now this is locked with socket->lock, but we + * might want to change it to something lockless */ + size_t ah; size_t ah_size; - size_t ah_cpos; size_t *ah_frees; isc_nmhandle_t **ah_handles; @@ -360,6 +375,13 @@ struct isc_nmsocket { size_t buf_len; unsigned char *buf; + /* + * This function will be called with handle->sock + * as the argument whenever a handle's references drop + * to zero, after its reset callback has been called. + */ + isc_nm_opaquecb_t closehandle_cb; + isc__nm_readcb_t rcb; void *rcbarg; }; diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 962bc1af03..da8e73d81e 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -529,7 +529,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) { } if (sock->buf != NULL) { - isc_mem_put(sock->mgr->mctx, sock->buf, sock->buf_size); + isc_mem_free(sock->mgr->mctx, sock->buf); } if (sock->quota != NULL) { @@ -580,11 +580,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) { * accept destruction. */ LOCK(&sock->lock); - active_handles += sock->ah_cpos; + active_handles += sock->ah; if (sock->children != NULL) { for (int i = 0; i < sock->nchildren; i++) { LOCK(&sock->children[i].lock); - active_handles += sock->children[i].ah_cpos; + active_handles += sock->children[i].ah; UNLOCK(&sock->children[i].lock); } } @@ -701,7 +701,12 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_mutex_init(&sock->lock); isc_condition_init(&sock->cond); isc_refcount_init(&sock->references, 1); + atomic_init(&sock->active, true); + atomic_init(&sock->sequential, false); + atomic_init(&sock->overlimit, false); + atomic_init(&sock->processing, false); + atomic_init(&sock->readpaused, false); sock->magic = NMSOCK_MAGIC; } @@ -729,14 +734,15 @@ isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) { isc__networker_t *worker = NULL; REQUIRE(VALID_NMSOCK(sock)); - + if (buf->base == NULL) { + /* Empty buffer: might happen in case of error. */ + return; + } worker = &sock->mgr->workers[sock->tid]; REQUIRE(worker->udprecvbuf_inuse); REQUIRE(buf->base == worker->udprecvbuf); - UNUSED(buf); - worker->udprecvbuf_inuse = false; } @@ -791,7 +797,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, LOCK(&sock->lock); /* We need to add this handle to the list of active handles */ - if (sock->ah_cpos == sock->ah_size) { + if (sock->ah == sock->ah_size) { sock->ah_frees = isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees, sock->ah_size * 2 * @@ -810,7 +816,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer, sock->ah_size *= 2; } - pos = sock->ah_frees[sock->ah_cpos++]; + pos = sock->ah_frees[sock->ah++]; INSIST(sock->ah_handles[pos] == NULL); sock->ah_handles[pos] = handle; handle->ah_pos = pos; @@ -847,7 +853,7 @@ static void nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) { size_t extra = sock->extrahandlesize; - if (handle->dofree) { + if (handle->dofree != NULL) { handle->dofree(handle->opaque); } @@ -881,9 +887,9 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { LOCK(&sock->lock); INSIST(sock->ah_handles[handle->ah_pos] == handle); INSIST(sock->ah_size > handle->ah_pos); - INSIST(sock->ah_cpos > 0); + INSIST(sock->ah > 0); sock->ah_handles[handle->ah_pos] = NULL; - sock->ah_frees[--sock->ah_cpos] = handle->ah_pos; + sock->ah_frees[--sock->ah] = handle->ah_pos; handle->ah_pos = 0; if (atomic_load(&sock->active)) { @@ -892,11 +898,20 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) { } UNLOCK(&sock->lock); + /* + * Handle is closed. If the socket has a callback + * configured for that (e.g., to perform cleanup after + * request processing), call it now. + */ + if (sock->closehandle_cb != NULL) { + sock->closehandle_cb(sock); + } + if (!reuse) { nmhandle_free(sock, handle); } - if (sock->ah_cpos == 0 && + if (sock->ah == 0 && !atomic_load(&sock->active) && !atomic_load(&sock->destroying)) { @@ -914,7 +929,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle) { void isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg, - isc_nm_opaquecb doreset, isc_nm_opaquecb dofree) + isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree) { REQUIRE(VALID_NMHANDLE(handle)); diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index f0aabd28f8..59861b604f 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -280,6 +280,12 @@ isc_result_t isc_nm_pauseread(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); + if (atomic_load(&sock->readpaused)) { + return (ISC_R_SUCCESS); + } + + atomic_store(&sock->readpaused, true); + if (sock->tid == isc_nm_tid()) { int r = uv_read_stop(&sock->uv_handle.stream); INSIST(r == 0); @@ -312,6 +318,12 @@ isc_nm_resumeread(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->rcb.recv != NULL); + if (!atomic_load(&sock->readpaused)) { + return (ISC_R_SUCCESS); + } + + atomic_store(&sock->readpaused, false); + if (sock->tid == isc_nm_tid()) { int r = uv_read_start(&sock->uv_handle.stream, isc__nm_alloc_cb, read_cb); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 0c38ecf494..8e86a39474 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -28,9 +28,20 @@ #include "netmgr-int.h" +#define TCPDNS_CLIENTS_PER_CONN 23 +/*%< + * + * Maximum number of simultaneous handles in flight supported for a single + * connected TCPDNS socket. This value was chosen arbitrarily, and may be + * changed in the future. + */ + static void dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg); +static void +resume_processing(void *arg); + static inline size_t dnslen(unsigned char* base) { return ((base[0] << 8) + (base[1])); @@ -45,7 +56,7 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { if (sock->buf == NULL) { /* We don't have the buffer at all */ size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF; - sock->buf = isc_mem_get(sock->mgr->mctx, alloc_len); + sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len); sock->buf_size = alloc_len; } else { /* We have the buffer but it's too small */ @@ -55,7 +66,6 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) { } } - /* * Accept callback for TCP-DNS connection */ @@ -84,10 +94,79 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmsocket_attach(handle->sock, &dnssock->outer); dnssock->peer = handle->sock->peer; dnssock->iface = handle->sock->iface; + dnssock->closehandle_cb = resume_processing; isc_nm_read(handle, dnslisten_readcb, dnssock); } +static bool +connection_limit(isc_nmsocket_t *sock) { + int ah; + + REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL); + + if (atomic_load(&sock->sequential)) { + /* + * We're already non-pipelining, so there's + * no need to check per-connection limits. + */ + return (false); + } + + LOCK(&sock->lock); + ah = sock->ah; + UNLOCK(&sock->lock); + + if (ah >= TCPDNS_CLIENTS_PER_CONN) { + atomic_store(&sock->overlimit, true); + isc_nm_pauseread(sock->outer); + return (true); + } + + return (false); +} + +/* Process all complete packets out of incoming buffer */ +static void +processbuffer(isc_nmsocket_t *dnssock) { + REQUIRE(VALID_NMSOCK(dnssock)); + + /* While we have a complete packet in the buffer */ + while (dnssock->buf_len > 2 && + dnslen(dnssock->buf) <= dnssock->buf_len - 2 && + !connection_limit(dnssock)) + { + isc_nmhandle_t *dnshandle = NULL; + isc_region_t r2 = { + .base = dnssock->buf + 2, + .length = dnslen(dnssock->buf) + }; + size_t len; + + dnshandle = isc__nmhandle_get(dnssock, NULL, NULL); + atomic_store(&dnssock->processing, true); + dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); + + /* + * If the recv callback wants to hold on to the + * handle, it needs to attach to it. + */ + isc_nmhandle_unref(dnshandle); + + len = dnslen(dnssock->buf) + 2; + dnssock->buf_len -= len; + if (len > 0) { + memmove(dnssock->buf, dnssock->buf + len, + dnssock->buf_len); + } + + /* Check here to make sure we do the processing at least once */ + if (atomic_load(&dnssock->processing)) { + return; + } + } +} + /* * We've got a read on our underlying socket, need to check if we have * a complete DNS packet and, if so - call the callback @@ -118,6 +197,18 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { /* * We have something in the buffer, we need to glue it. */ + if (dnssock->buf_len > 0) { + if (dnssock->buf_len == 1) { + /* Make sure we have the length */ + dnssock->buf[1] = base[0]; + dnssock->buf_len = 2; + base++; + len--; + } + + processbuffer(dnssock); + } + if (dnssock->buf_len > 0) { size_t plen; @@ -132,8 +223,19 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { /* At this point we definitely have 2 bytes there. */ plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 - dnssock->buf_len)); - if (plen > dnssock->buf_size) { - alloc_dnsbuf(dnssock, plen); + + if (dnssock->buf_len + plen > NM_BIG_BUF) { + /* + * XXX: continuing to read will overrun the + * socket buffer. We may need to force the + * connection to close so the client will have + * to open a new one. + */ + return; + } + + if (dnssock->buf_len + plen > dnssock->buf_size) { + alloc_dnsbuf(dnssock, dnssock->buf_len + plen); } memmove(dnssock->buf + dnssock->buf_len, base, plen); @@ -142,12 +244,15 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { len -= plen; /* Do we have a complete packet in the buffer? */ - if (dnslen(dnssock->buf) == dnssock->buf_len - 2) { + if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 && + !connection_limit(dnssock)) + { isc_nmhandle_t *dnshandle = NULL; isc_region_t r2 = { .base = dnssock->buf + 2, .length = dnslen(dnssock->buf) }; + dnshandle = isc__nmhandle_get(dnssock, NULL, &local); atomic_store(&dnssock->processing, true); dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); @@ -165,11 +270,12 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { * At this point we've processed whatever was previously in the * socket buffer. If there are more messages to be found in what * we've read, and if we're either pipelining or not processing - * anything else, then we can process those messages now. + * anything else currently, then we can process those messages now. */ while (len >= 2 && dnslen(base) <= len - 2 && - !(atomic_load(&dnssock->sequential) && - atomic_load(&dnssock->processing))) + (!atomic_load(&dnssock->sequential) || + !atomic_load(&dnssock->processing)) && + !connection_limit(dnssock)) { isc_nmhandle_t *dnshandle = NULL; isc_region_t r2 = { @@ -206,46 +312,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) { } } -/* Process all complete packets out of incoming buffer */ -static void -processbuffer(isc_nmsocket_t *dnssock) { - REQUIRE(VALID_NMSOCK(dnssock)); - - /* While we have a complete packet in the buffer */ - while (dnssock->buf_len > 2 && - dnslen(dnssock->buf) <= dnssock->buf_len - 2) - { - isc_nmhandle_t *dnshandle = NULL; - isc_region_t r2 = { - .base = dnssock->buf + 2, - .length = dnslen(dnssock->buf) - }; - size_t len; - - dnshandle = isc__nmhandle_get(dnssock, NULL, NULL); - atomic_store(&dnssock->processing, true); - dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg); - - /* - * If the recv callback wants to hold on to the - * handle, it needs to attach to it. - */ - isc_nmhandle_unref(dnshandle); - - len = dnslen(dnssock->buf) + 2; - dnssock->buf_len -= len; - if (len > 0) { - memmove(dnssock->buf, dnssock->buf + len, - dnssock->buf_len); - } - - /* Check here to make sure we do the processing at least once */ - if (atomic_load(&dnssock->processing)) { - return; - } - } -} - /* * isc_nm_listentcpdns listens for connections and accepts * them immediately, then calls the cb for each incoming DNS packet @@ -306,13 +372,11 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) { /* * We don't want pipelining on this connection. That means - * that we can launch query processing only when the previous - * one returned. - * - * The socket MUST be unpaused after the query is processed. - * This is done by isc_nm_resumeread() in tcpdnssend_cb() below. - * - * XXX: The callback is not currently executed in failure cases! + * that we need to pause after reading each request, and + * resume only after the request has been processed. This + * is done in resume_processing(), which is the socket's + * closehandle_cb callback, called whenever a handle + * is released. */ isc_nm_pauseread(handle->sock->outer); atomic_store(&handle->sock->sequential, true); @@ -327,6 +391,28 @@ typedef struct tcpsend { void *cbarg; } tcpsend_t; +static void +resume_processing(void *arg) { + isc_nmsocket_t *sock = (isc_nmsocket_t *) arg; + + REQUIRE(VALID_NMSOCK(sock)); + + if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) { + return; + } + + /* + * If we're in sequential mode or over the + * clients-per-connection limit, the sock can + * resume reading now. + */ + if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) { + atomic_store(&sock->overlimit, false); + atomic_store(&sock->processing, false); + isc_nm_resumeread(sock->outer); + } +} + static void tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { tcpsend_t *ts = (tcpsend_t *) cbarg; @@ -337,11 +423,14 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_mem_put(ts->mctx, ts->region.base, ts->region.length); /* - * The response was sent, if we're in sequential mode resume - * processing. + * The response was sent; if we're in sequential or overlimit + * mode, resume processing now. */ - if (atomic_load(&ts->orighandle->sock->sequential)) { + if (atomic_load(&ts->orighandle->sock->sequential) || + atomic_load(&ts->orighandle->sock->overlimit)) + { atomic_store(&ts->orighandle->sock->processing, false); + atomic_store(&ts->orighandle->sock->overlimit, false); processbuffer(ts->orighandle->sock); isc_nm_resumeread(handle->sock); } diff --git a/lib/ns/client.c b/lib/ns/client.c index e360ba1422..189172f7e3 100644 --- a/lib/ns/client.c +++ b/lib/ns/client.c @@ -2473,7 +2473,6 @@ ns_clientmgr_destroy(ns_clientmgr_t **managerp) { MTRACE("destroy"); - /* XXXWPK TODO we need to pause netmgr here */ /* * Check for success because we may already be task-exclusive * at this point. Only if we succeed at obtaining an exclusive