mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-31 14:35:26 +00:00
place a limit on pipelined queries that can be processed simultaneously
when the TCPDNS_CLIENTS_PER_CONN limit has been exceeded for a TCP DNS connection, switch to sequential mode to ensure that memory cannot be exhausted by too many simultaneous queries.
This commit is contained in:
@@ -95,7 +95,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle);
|
|||||||
void *
|
void *
|
||||||
isc_nmhandle_getextra(isc_nmhandle_t *handle);
|
isc_nmhandle_getextra(isc_nmhandle_t *handle);
|
||||||
|
|
||||||
typedef void (*isc_nm_opaquecb)(void *arg);
|
typedef void (*isc_nm_opaquecb_t)(void *arg);
|
||||||
|
|
||||||
bool
|
bool
|
||||||
isc_nmhandle_is_stream(isc_nmhandle_t *handle);
|
isc_nmhandle_is_stream(isc_nmhandle_t *handle);
|
||||||
@@ -109,7 +109,7 @@ isc_nmhandle_is_stream(isc_nmhandle_t *handle);
|
|||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
|
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
|
||||||
isc_nm_opaquecb doreset, isc_nm_opaquecb dofree);
|
isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree);
|
||||||
|
|
||||||
isc_sockaddr_t
|
isc_sockaddr_t
|
||||||
isc_nmhandle_peeraddr(isc_nmhandle_t *handle);
|
isc_nmhandle_peeraddr(isc_nmhandle_t *handle);
|
||||||
@@ -273,7 +273,17 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle);
|
|||||||
* Disable pipelining on this connection. Each DNS packet
|
* Disable pipelining on this connection. Each DNS packet
|
||||||
* will be only processed after the previous completes.
|
* will be only processed after the previous completes.
|
||||||
*
|
*
|
||||||
* This cannot be reversed once set for a given connection
|
* The socket must be unpaused after the query is processed.
|
||||||
|
* This is done the response is sent, or if we're dropping the
|
||||||
|
* query, it will be done when a handle is fully dereferenced
|
||||||
|
* by calling the socket's closehandle_cb callback.
|
||||||
|
*
|
||||||
|
* Note: This can only be run while a message is being processed;
|
||||||
|
* if it is run before any messages are read, no messages will
|
||||||
|
* be read.
|
||||||
|
*
|
||||||
|
* Also note: once this has been set, it cannot be reversed for a
|
||||||
|
* given connection.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void
|
void
|
||||||
|
@@ -89,8 +89,8 @@ struct isc_nmhandle {
|
|||||||
|
|
||||||
isc_sockaddr_t peer;
|
isc_sockaddr_t peer;
|
||||||
isc_sockaddr_t local;
|
isc_sockaddr_t local;
|
||||||
isc_nm_opaquecb doreset; /* reset extra callback, external */
|
isc_nm_opaquecb_t doreset; /* reset extra callback, external */
|
||||||
isc_nm_opaquecb dofree; /* free extra callback, external */
|
isc_nm_opaquecb_t dofree; /* free extra callback, external */
|
||||||
void * opaque;
|
void * opaque;
|
||||||
char extra[];
|
char extra[];
|
||||||
};
|
};
|
||||||
@@ -312,15 +312,28 @@ struct isc_nmsocket {
|
|||||||
isc_refcount_t references;
|
isc_refcount_t references;
|
||||||
|
|
||||||
/*%
|
/*%
|
||||||
* TCPDNS socket is not pipelining.
|
* TCPDNS socket has been set not to pipeliine.
|
||||||
*/
|
*/
|
||||||
atomic_bool sequential;
|
atomic_bool sequential;
|
||||||
|
|
||||||
|
/*%
|
||||||
|
* TCPDNS socket has exceeded the maximum number of
|
||||||
|
* simultaneous requests per connecton, so will be temporarily
|
||||||
|
* restricted from pipelining.
|
||||||
|
*/
|
||||||
|
atomic_bool overlimit;
|
||||||
|
|
||||||
/*%
|
/*%
|
||||||
* TCPDNS socket in sequential mode is currently processing a packet,
|
* TCPDNS socket in sequential mode is currently processing a packet,
|
||||||
* we need to wait until it finishes.
|
* we need to wait until it finishes.
|
||||||
*/
|
*/
|
||||||
atomic_bool processing;
|
atomic_bool processing;
|
||||||
|
|
||||||
|
/*%
|
||||||
|
* A TCP socket has had isc_nm_pauseread() called.
|
||||||
|
*/
|
||||||
|
atomic_bool readpaused;
|
||||||
|
|
||||||
/*%
|
/*%
|
||||||
* 'spare' handles for that can be reused to avoid allocations,
|
* 'spare' handles for that can be reused to avoid allocations,
|
||||||
* for UDP.
|
* for UDP.
|
||||||
@@ -334,24 +347,26 @@ struct isc_nmsocket {
|
|||||||
|
|
||||||
/*%
|
/*%
|
||||||
* List of active handles.
|
* List of active handles.
|
||||||
* ah_size - size of ah_frees and ah_handles
|
* ah - current position in 'ah_frees'; this represents the
|
||||||
* ah_cpos - current position in ah_frees;
|
* current number of active handles;
|
||||||
* ah_handles - array of *handles.
|
* ah_size - size of the 'ah_frees' and 'ah_handles' arrays
|
||||||
|
* ah_handles - array pointers to active handles
|
||||||
|
*
|
||||||
* Adding a handle
|
* Adding a handle
|
||||||
* - if ah_cpos == ah_size, realloc
|
* - if ah == ah_size, reallocate
|
||||||
* - x = ah_frees[ah_cpos]
|
* - x = ah_frees[ah]
|
||||||
* - ah_frees[ah_cpos++] = 0;
|
* - ah_frees[ah++] = 0;
|
||||||
* - ah_handles[x] = handle
|
* - ah_handles[x] = handle
|
||||||
* - x must be stored with the handle!
|
* - x must be stored with the handle!
|
||||||
* Removing a handle:
|
* Removing a handle:
|
||||||
* - ah_frees[--ah_cpos] = x
|
* - ah_frees[--ah] = x
|
||||||
* - ah_handles[x] = NULL;
|
* - ah_handles[x] = NULL;
|
||||||
*
|
*
|
||||||
* XXXWPK for now this is locked with socket->lock, but we might want
|
* XXXWPK for now this is locked with socket->lock, but we
|
||||||
* to change it to something lockless
|
* might want to change it to something lockless
|
||||||
*/
|
*/
|
||||||
|
size_t ah;
|
||||||
size_t ah_size;
|
size_t ah_size;
|
||||||
size_t ah_cpos;
|
|
||||||
size_t *ah_frees;
|
size_t *ah_frees;
|
||||||
isc_nmhandle_t **ah_handles;
|
isc_nmhandle_t **ah_handles;
|
||||||
|
|
||||||
@@ -360,6 +375,13 @@ struct isc_nmsocket {
|
|||||||
size_t buf_len;
|
size_t buf_len;
|
||||||
unsigned char *buf;
|
unsigned char *buf;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function will be called with handle->sock
|
||||||
|
* as the argument whenever a handle's references drop
|
||||||
|
* to zero, after its reset callback has been called.
|
||||||
|
*/
|
||||||
|
isc_nm_opaquecb_t closehandle_cb;
|
||||||
|
|
||||||
isc__nm_readcb_t rcb;
|
isc__nm_readcb_t rcb;
|
||||||
void *rcbarg;
|
void *rcbarg;
|
||||||
};
|
};
|
||||||
|
@@ -529,7 +529,7 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sock->buf != NULL) {
|
if (sock->buf != NULL) {
|
||||||
isc_mem_put(sock->mgr->mctx, sock->buf, sock->buf_size);
|
isc_mem_free(sock->mgr->mctx, sock->buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sock->quota != NULL) {
|
if (sock->quota != NULL) {
|
||||||
@@ -580,11 +580,11 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock) {
|
|||||||
* accept destruction.
|
* accept destruction.
|
||||||
*/
|
*/
|
||||||
LOCK(&sock->lock);
|
LOCK(&sock->lock);
|
||||||
active_handles += sock->ah_cpos;
|
active_handles += sock->ah;
|
||||||
if (sock->children != NULL) {
|
if (sock->children != NULL) {
|
||||||
for (int i = 0; i < sock->nchildren; i++) {
|
for (int i = 0; i < sock->nchildren; i++) {
|
||||||
LOCK(&sock->children[i].lock);
|
LOCK(&sock->children[i].lock);
|
||||||
active_handles += sock->children[i].ah_cpos;
|
active_handles += sock->children[i].ah;
|
||||||
UNLOCK(&sock->children[i].lock);
|
UNLOCK(&sock->children[i].lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -701,7 +701,12 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr,
|
|||||||
isc_mutex_init(&sock->lock);
|
isc_mutex_init(&sock->lock);
|
||||||
isc_condition_init(&sock->cond);
|
isc_condition_init(&sock->cond);
|
||||||
isc_refcount_init(&sock->references, 1);
|
isc_refcount_init(&sock->references, 1);
|
||||||
|
|
||||||
atomic_init(&sock->active, true);
|
atomic_init(&sock->active, true);
|
||||||
|
atomic_init(&sock->sequential, false);
|
||||||
|
atomic_init(&sock->overlimit, false);
|
||||||
|
atomic_init(&sock->processing, false);
|
||||||
|
atomic_init(&sock->readpaused, false);
|
||||||
|
|
||||||
sock->magic = NMSOCK_MAGIC;
|
sock->magic = NMSOCK_MAGIC;
|
||||||
}
|
}
|
||||||
@@ -729,14 +734,15 @@ isc__nm_free_uvbuf(isc_nmsocket_t *sock, const uv_buf_t *buf) {
|
|||||||
isc__networker_t *worker = NULL;
|
isc__networker_t *worker = NULL;
|
||||||
|
|
||||||
REQUIRE(VALID_NMSOCK(sock));
|
REQUIRE(VALID_NMSOCK(sock));
|
||||||
|
if (buf->base == NULL) {
|
||||||
|
/* Empty buffer: might happen in case of error. */
|
||||||
|
return;
|
||||||
|
}
|
||||||
worker = &sock->mgr->workers[sock->tid];
|
worker = &sock->mgr->workers[sock->tid];
|
||||||
|
|
||||||
REQUIRE(worker->udprecvbuf_inuse);
|
REQUIRE(worker->udprecvbuf_inuse);
|
||||||
REQUIRE(buf->base == worker->udprecvbuf);
|
REQUIRE(buf->base == worker->udprecvbuf);
|
||||||
|
|
||||||
UNUSED(buf);
|
|
||||||
|
|
||||||
worker->udprecvbuf_inuse = false;
|
worker->udprecvbuf_inuse = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -791,7 +797,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
|
|||||||
|
|
||||||
LOCK(&sock->lock);
|
LOCK(&sock->lock);
|
||||||
/* We need to add this handle to the list of active handles */
|
/* We need to add this handle to the list of active handles */
|
||||||
if (sock->ah_cpos == sock->ah_size) {
|
if (sock->ah == sock->ah_size) {
|
||||||
sock->ah_frees =
|
sock->ah_frees =
|
||||||
isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
|
isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,
|
||||||
sock->ah_size * 2 *
|
sock->ah_size * 2 *
|
||||||
@@ -810,7 +816,7 @@ isc__nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
|
|||||||
sock->ah_size *= 2;
|
sock->ah_size *= 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
pos = sock->ah_frees[sock->ah_cpos++];
|
pos = sock->ah_frees[sock->ah++];
|
||||||
INSIST(sock->ah_handles[pos] == NULL);
|
INSIST(sock->ah_handles[pos] == NULL);
|
||||||
sock->ah_handles[pos] = handle;
|
sock->ah_handles[pos] = handle;
|
||||||
handle->ah_pos = pos;
|
handle->ah_pos = pos;
|
||||||
@@ -847,7 +853,7 @@ static void
|
|||||||
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
|
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
|
||||||
size_t extra = sock->extrahandlesize;
|
size_t extra = sock->extrahandlesize;
|
||||||
|
|
||||||
if (handle->dofree) {
|
if (handle->dofree != NULL) {
|
||||||
handle->dofree(handle->opaque);
|
handle->dofree(handle->opaque);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -881,9 +887,9 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
|
|||||||
LOCK(&sock->lock);
|
LOCK(&sock->lock);
|
||||||
INSIST(sock->ah_handles[handle->ah_pos] == handle);
|
INSIST(sock->ah_handles[handle->ah_pos] == handle);
|
||||||
INSIST(sock->ah_size > handle->ah_pos);
|
INSIST(sock->ah_size > handle->ah_pos);
|
||||||
INSIST(sock->ah_cpos > 0);
|
INSIST(sock->ah > 0);
|
||||||
sock->ah_handles[handle->ah_pos] = NULL;
|
sock->ah_handles[handle->ah_pos] = NULL;
|
||||||
sock->ah_frees[--sock->ah_cpos] = handle->ah_pos;
|
sock->ah_frees[--sock->ah] = handle->ah_pos;
|
||||||
handle->ah_pos = 0;
|
handle->ah_pos = 0;
|
||||||
|
|
||||||
if (atomic_load(&sock->active)) {
|
if (atomic_load(&sock->active)) {
|
||||||
@@ -892,11 +898,20 @@ isc_nmhandle_unref(isc_nmhandle_t *handle) {
|
|||||||
}
|
}
|
||||||
UNLOCK(&sock->lock);
|
UNLOCK(&sock->lock);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Handle is closed. If the socket has a callback
|
||||||
|
* configured for that (e.g., to perform cleanup after
|
||||||
|
* request processing), call it now.
|
||||||
|
*/
|
||||||
|
if (sock->closehandle_cb != NULL) {
|
||||||
|
sock->closehandle_cb(sock);
|
||||||
|
}
|
||||||
|
|
||||||
if (!reuse) {
|
if (!reuse) {
|
||||||
nmhandle_free(sock, handle);
|
nmhandle_free(sock, handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sock->ah_cpos == 0 &&
|
if (sock->ah == 0 &&
|
||||||
!atomic_load(&sock->active) &&
|
!atomic_load(&sock->active) &&
|
||||||
!atomic_load(&sock->destroying))
|
!atomic_load(&sock->destroying))
|
||||||
{
|
{
|
||||||
@@ -914,7 +929,7 @@ isc_nmhandle_getdata(isc_nmhandle_t *handle) {
|
|||||||
|
|
||||||
void
|
void
|
||||||
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
|
isc_nmhandle_setdata(isc_nmhandle_t *handle, void *arg,
|
||||||
isc_nm_opaquecb doreset, isc_nm_opaquecb dofree)
|
isc_nm_opaquecb_t doreset, isc_nm_opaquecb_t dofree)
|
||||||
{
|
{
|
||||||
REQUIRE(VALID_NMHANDLE(handle));
|
REQUIRE(VALID_NMHANDLE(handle));
|
||||||
|
|
||||||
|
@@ -280,6 +280,12 @@ isc_result_t
|
|||||||
isc_nm_pauseread(isc_nmsocket_t *sock) {
|
isc_nm_pauseread(isc_nmsocket_t *sock) {
|
||||||
REQUIRE(VALID_NMSOCK(sock));
|
REQUIRE(VALID_NMSOCK(sock));
|
||||||
|
|
||||||
|
if (atomic_load(&sock->readpaused)) {
|
||||||
|
return (ISC_R_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store(&sock->readpaused, true);
|
||||||
|
|
||||||
if (sock->tid == isc_nm_tid()) {
|
if (sock->tid == isc_nm_tid()) {
|
||||||
int r = uv_read_stop(&sock->uv_handle.stream);
|
int r = uv_read_stop(&sock->uv_handle.stream);
|
||||||
INSIST(r == 0);
|
INSIST(r == 0);
|
||||||
@@ -312,6 +318,12 @@ isc_nm_resumeread(isc_nmsocket_t *sock) {
|
|||||||
REQUIRE(VALID_NMSOCK(sock));
|
REQUIRE(VALID_NMSOCK(sock));
|
||||||
REQUIRE(sock->rcb.recv != NULL);
|
REQUIRE(sock->rcb.recv != NULL);
|
||||||
|
|
||||||
|
if (!atomic_load(&sock->readpaused)) {
|
||||||
|
return (ISC_R_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store(&sock->readpaused, false);
|
||||||
|
|
||||||
if (sock->tid == isc_nm_tid()) {
|
if (sock->tid == isc_nm_tid()) {
|
||||||
int r = uv_read_start(&sock->uv_handle.stream,
|
int r = uv_read_start(&sock->uv_handle.stream,
|
||||||
isc__nm_alloc_cb, read_cb);
|
isc__nm_alloc_cb, read_cb);
|
||||||
|
@@ -28,9 +28,20 @@
|
|||||||
|
|
||||||
#include "netmgr-int.h"
|
#include "netmgr-int.h"
|
||||||
|
|
||||||
|
#define TCPDNS_CLIENTS_PER_CONN 23
|
||||||
|
/*%<
|
||||||
|
*
|
||||||
|
* Maximum number of simultaneous handles in flight supported for a single
|
||||||
|
* connected TCPDNS socket. This value was chosen arbitrarily, and may be
|
||||||
|
* changed in the future.
|
||||||
|
*/
|
||||||
|
|
||||||
static void
|
static void
|
||||||
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg);
|
dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg);
|
||||||
|
|
||||||
|
static void
|
||||||
|
resume_processing(void *arg);
|
||||||
|
|
||||||
static inline size_t
|
static inline size_t
|
||||||
dnslen(unsigned char* base) {
|
dnslen(unsigned char* base) {
|
||||||
return ((base[0] << 8) + (base[1]));
|
return ((base[0] << 8) + (base[1]));
|
||||||
@@ -45,7 +56,7 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
|
|||||||
if (sock->buf == NULL) {
|
if (sock->buf == NULL) {
|
||||||
/* We don't have the buffer at all */
|
/* We don't have the buffer at all */
|
||||||
size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
|
size_t alloc_len = len < NM_REG_BUF ? NM_REG_BUF : NM_BIG_BUF;
|
||||||
sock->buf = isc_mem_get(sock->mgr->mctx, alloc_len);
|
sock->buf = isc_mem_allocate(sock->mgr->mctx, alloc_len);
|
||||||
sock->buf_size = alloc_len;
|
sock->buf_size = alloc_len;
|
||||||
} else {
|
} else {
|
||||||
/* We have the buffer but it's too small */
|
/* We have the buffer but it's too small */
|
||||||
@@ -55,7 +66,6 @@ alloc_dnsbuf(isc_nmsocket_t *sock, size_t len) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Accept callback for TCP-DNS connection
|
* Accept callback for TCP-DNS connection
|
||||||
*/
|
*/
|
||||||
@@ -84,10 +94,79 @@ dnslisten_acceptcb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
|||||||
isc_nmsocket_attach(handle->sock, &dnssock->outer);
|
isc_nmsocket_attach(handle->sock, &dnssock->outer);
|
||||||
dnssock->peer = handle->sock->peer;
|
dnssock->peer = handle->sock->peer;
|
||||||
dnssock->iface = handle->sock->iface;
|
dnssock->iface = handle->sock->iface;
|
||||||
|
dnssock->closehandle_cb = resume_processing;
|
||||||
|
|
||||||
isc_nm_read(handle, dnslisten_readcb, dnssock);
|
isc_nm_read(handle, dnslisten_readcb, dnssock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
connection_limit(isc_nmsocket_t *sock) {
|
||||||
|
int ah;
|
||||||
|
|
||||||
|
REQUIRE(sock->type == isc_nm_tcpdnssocket && sock->outer != NULL);
|
||||||
|
|
||||||
|
if (atomic_load(&sock->sequential)) {
|
||||||
|
/*
|
||||||
|
* We're already non-pipelining, so there's
|
||||||
|
* no need to check per-connection limits.
|
||||||
|
*/
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOCK(&sock->lock);
|
||||||
|
ah = sock->ah;
|
||||||
|
UNLOCK(&sock->lock);
|
||||||
|
|
||||||
|
if (ah >= TCPDNS_CLIENTS_PER_CONN) {
|
||||||
|
atomic_store(&sock->overlimit, true);
|
||||||
|
isc_nm_pauseread(sock->outer);
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Process all complete packets out of incoming buffer */
|
||||||
|
static void
|
||||||
|
processbuffer(isc_nmsocket_t *dnssock) {
|
||||||
|
REQUIRE(VALID_NMSOCK(dnssock));
|
||||||
|
|
||||||
|
/* While we have a complete packet in the buffer */
|
||||||
|
while (dnssock->buf_len > 2 &&
|
||||||
|
dnslen(dnssock->buf) <= dnssock->buf_len - 2 &&
|
||||||
|
!connection_limit(dnssock))
|
||||||
|
{
|
||||||
|
isc_nmhandle_t *dnshandle = NULL;
|
||||||
|
isc_region_t r2 = {
|
||||||
|
.base = dnssock->buf + 2,
|
||||||
|
.length = dnslen(dnssock->buf)
|
||||||
|
};
|
||||||
|
size_t len;
|
||||||
|
|
||||||
|
dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
|
||||||
|
atomic_store(&dnssock->processing, true);
|
||||||
|
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the recv callback wants to hold on to the
|
||||||
|
* handle, it needs to attach to it.
|
||||||
|
*/
|
||||||
|
isc_nmhandle_unref(dnshandle);
|
||||||
|
|
||||||
|
len = dnslen(dnssock->buf) + 2;
|
||||||
|
dnssock->buf_len -= len;
|
||||||
|
if (len > 0) {
|
||||||
|
memmove(dnssock->buf, dnssock->buf + len,
|
||||||
|
dnssock->buf_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check here to make sure we do the processing at least once */
|
||||||
|
if (atomic_load(&dnssock->processing)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We've got a read on our underlying socket, need to check if we have
|
* We've got a read on our underlying socket, need to check if we have
|
||||||
* a complete DNS packet and, if so - call the callback
|
* a complete DNS packet and, if so - call the callback
|
||||||
@@ -118,6 +197,18 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
|
|||||||
/*
|
/*
|
||||||
* We have something in the buffer, we need to glue it.
|
* We have something in the buffer, we need to glue it.
|
||||||
*/
|
*/
|
||||||
|
if (dnssock->buf_len > 0) {
|
||||||
|
if (dnssock->buf_len == 1) {
|
||||||
|
/* Make sure we have the length */
|
||||||
|
dnssock->buf[1] = base[0];
|
||||||
|
dnssock->buf_len = 2;
|
||||||
|
base++;
|
||||||
|
len--;
|
||||||
|
}
|
||||||
|
|
||||||
|
processbuffer(dnssock);
|
||||||
|
}
|
||||||
|
|
||||||
if (dnssock->buf_len > 0) {
|
if (dnssock->buf_len > 0) {
|
||||||
size_t plen;
|
size_t plen;
|
||||||
|
|
||||||
@@ -132,8 +223,19 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
|
|||||||
/* At this point we definitely have 2 bytes there. */
|
/* At this point we definitely have 2 bytes there. */
|
||||||
plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
|
plen = ISC_MIN(len, (dnslen(dnssock->buf) + 2 -
|
||||||
dnssock->buf_len));
|
dnssock->buf_len));
|
||||||
if (plen > dnssock->buf_size) {
|
|
||||||
alloc_dnsbuf(dnssock, plen);
|
if (dnssock->buf_len + plen > NM_BIG_BUF) {
|
||||||
|
/*
|
||||||
|
* XXX: continuing to read will overrun the
|
||||||
|
* socket buffer. We may need to force the
|
||||||
|
* connection to close so the client will have
|
||||||
|
* to open a new one.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnssock->buf_len + plen > dnssock->buf_size) {
|
||||||
|
alloc_dnsbuf(dnssock, dnssock->buf_len + plen);
|
||||||
}
|
}
|
||||||
|
|
||||||
memmove(dnssock->buf + dnssock->buf_len, base, plen);
|
memmove(dnssock->buf + dnssock->buf_len, base, plen);
|
||||||
@@ -142,12 +244,15 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
|
|||||||
len -= plen;
|
len -= plen;
|
||||||
|
|
||||||
/* Do we have a complete packet in the buffer? */
|
/* Do we have a complete packet in the buffer? */
|
||||||
if (dnslen(dnssock->buf) == dnssock->buf_len - 2) {
|
if (dnslen(dnssock->buf) >= dnssock->buf_len - 2 &&
|
||||||
|
!connection_limit(dnssock))
|
||||||
|
{
|
||||||
isc_nmhandle_t *dnshandle = NULL;
|
isc_nmhandle_t *dnshandle = NULL;
|
||||||
isc_region_t r2 = {
|
isc_region_t r2 = {
|
||||||
.base = dnssock->buf + 2,
|
.base = dnssock->buf + 2,
|
||||||
.length = dnslen(dnssock->buf)
|
.length = dnslen(dnssock->buf)
|
||||||
};
|
};
|
||||||
|
|
||||||
dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
|
dnshandle = isc__nmhandle_get(dnssock, NULL, &local);
|
||||||
atomic_store(&dnssock->processing, true);
|
atomic_store(&dnssock->processing, true);
|
||||||
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
|
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
|
||||||
@@ -165,11 +270,12 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
|
|||||||
* At this point we've processed whatever was previously in the
|
* At this point we've processed whatever was previously in the
|
||||||
* socket buffer. If there are more messages to be found in what
|
* socket buffer. If there are more messages to be found in what
|
||||||
* we've read, and if we're either pipelining or not processing
|
* we've read, and if we're either pipelining or not processing
|
||||||
* anything else, then we can process those messages now.
|
* anything else currently, then we can process those messages now.
|
||||||
*/
|
*/
|
||||||
while (len >= 2 && dnslen(base) <= len - 2 &&
|
while (len >= 2 && dnslen(base) <= len - 2 &&
|
||||||
!(atomic_load(&dnssock->sequential) &&
|
(!atomic_load(&dnssock->sequential) ||
|
||||||
atomic_load(&dnssock->processing)))
|
!atomic_load(&dnssock->processing)) &&
|
||||||
|
!connection_limit(dnssock))
|
||||||
{
|
{
|
||||||
isc_nmhandle_t *dnshandle = NULL;
|
isc_nmhandle_t *dnshandle = NULL;
|
||||||
isc_region_t r2 = {
|
isc_region_t r2 = {
|
||||||
@@ -206,46 +312,6 @@ dnslisten_readcb(isc_nmhandle_t *handle, isc_region_t *region, void *arg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Process all complete packets out of incoming buffer */
|
|
||||||
static void
|
|
||||||
processbuffer(isc_nmsocket_t *dnssock) {
|
|
||||||
REQUIRE(VALID_NMSOCK(dnssock));
|
|
||||||
|
|
||||||
/* While we have a complete packet in the buffer */
|
|
||||||
while (dnssock->buf_len > 2 &&
|
|
||||||
dnslen(dnssock->buf) <= dnssock->buf_len - 2)
|
|
||||||
{
|
|
||||||
isc_nmhandle_t *dnshandle = NULL;
|
|
||||||
isc_region_t r2 = {
|
|
||||||
.base = dnssock->buf + 2,
|
|
||||||
.length = dnslen(dnssock->buf)
|
|
||||||
};
|
|
||||||
size_t len;
|
|
||||||
|
|
||||||
dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
|
|
||||||
atomic_store(&dnssock->processing, true);
|
|
||||||
dnssock->rcb.recv(dnshandle, &r2, dnssock->rcbarg);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the recv callback wants to hold on to the
|
|
||||||
* handle, it needs to attach to it.
|
|
||||||
*/
|
|
||||||
isc_nmhandle_unref(dnshandle);
|
|
||||||
|
|
||||||
len = dnslen(dnssock->buf) + 2;
|
|
||||||
dnssock->buf_len -= len;
|
|
||||||
if (len > 0) {
|
|
||||||
memmove(dnssock->buf, dnssock->buf + len,
|
|
||||||
dnssock->buf_len);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Check here to make sure we do the processing at least once */
|
|
||||||
if (atomic_load(&dnssock->processing)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* isc_nm_listentcpdns listens for connections and accepts
|
* isc_nm_listentcpdns listens for connections and accepts
|
||||||
* them immediately, then calls the cb for each incoming DNS packet
|
* them immediately, then calls the cb for each incoming DNS packet
|
||||||
@@ -306,13 +372,11 @@ isc_nm_tcpdns_sequential(isc_nmhandle_t *handle) {
|
|||||||
|
|
||||||
/*
|
/*
|
||||||
* We don't want pipelining on this connection. That means
|
* We don't want pipelining on this connection. That means
|
||||||
* that we can launch query processing only when the previous
|
* that we need to pause after reading each request, and
|
||||||
* one returned.
|
* resume only after the request has been processed. This
|
||||||
*
|
* is done in resume_processing(), which is the socket's
|
||||||
* The socket MUST be unpaused after the query is processed.
|
* closehandle_cb callback, called whenever a handle
|
||||||
* This is done by isc_nm_resumeread() in tcpdnssend_cb() below.
|
* is released.
|
||||||
*
|
|
||||||
* XXX: The callback is not currently executed in failure cases!
|
|
||||||
*/
|
*/
|
||||||
isc_nm_pauseread(handle->sock->outer);
|
isc_nm_pauseread(handle->sock->outer);
|
||||||
atomic_store(&handle->sock->sequential, true);
|
atomic_store(&handle->sock->sequential, true);
|
||||||
@@ -327,6 +391,28 @@ typedef struct tcpsend {
|
|||||||
void *cbarg;
|
void *cbarg;
|
||||||
} tcpsend_t;
|
} tcpsend_t;
|
||||||
|
|
||||||
|
static void
|
||||||
|
resume_processing(void *arg) {
|
||||||
|
isc_nmsocket_t *sock = (isc_nmsocket_t *) arg;
|
||||||
|
|
||||||
|
REQUIRE(VALID_NMSOCK(sock));
|
||||||
|
|
||||||
|
if (sock->type != isc_nm_tcpdnssocket || sock->outer == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we're in sequential mode or over the
|
||||||
|
* clients-per-connection limit, the sock can
|
||||||
|
* resume reading now.
|
||||||
|
*/
|
||||||
|
if (atomic_load(&sock->overlimit) || atomic_load(&sock->sequential)) {
|
||||||
|
atomic_store(&sock->overlimit, false);
|
||||||
|
atomic_store(&sock->processing, false);
|
||||||
|
isc_nm_resumeread(sock->outer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
||||||
tcpsend_t *ts = (tcpsend_t *) cbarg;
|
tcpsend_t *ts = (tcpsend_t *) cbarg;
|
||||||
@@ -337,11 +423,14 @@ tcpdnssend_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
|
|||||||
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
|
isc_mem_put(ts->mctx, ts->region.base, ts->region.length);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The response was sent, if we're in sequential mode resume
|
* The response was sent; if we're in sequential or overlimit
|
||||||
* processing.
|
* mode, resume processing now.
|
||||||
*/
|
*/
|
||||||
if (atomic_load(&ts->orighandle->sock->sequential)) {
|
if (atomic_load(&ts->orighandle->sock->sequential) ||
|
||||||
|
atomic_load(&ts->orighandle->sock->overlimit))
|
||||||
|
{
|
||||||
atomic_store(&ts->orighandle->sock->processing, false);
|
atomic_store(&ts->orighandle->sock->processing, false);
|
||||||
|
atomic_store(&ts->orighandle->sock->overlimit, false);
|
||||||
processbuffer(ts->orighandle->sock);
|
processbuffer(ts->orighandle->sock);
|
||||||
isc_nm_resumeread(handle->sock);
|
isc_nm_resumeread(handle->sock);
|
||||||
}
|
}
|
||||||
|
@@ -2473,7 +2473,6 @@ ns_clientmgr_destroy(ns_clientmgr_t **managerp) {
|
|||||||
|
|
||||||
MTRACE("destroy");
|
MTRACE("destroy");
|
||||||
|
|
||||||
/* XXXWPK TODO we need to pause netmgr here */
|
|
||||||
/*
|
/*
|
||||||
* Check for success because we may already be task-exclusive
|
* Check for success because we may already be task-exclusive
|
||||||
* at this point. Only if we succeed at obtaining an exclusive
|
* at this point. Only if we succeed at obtaining an exclusive
|
||||||
|
Reference in New Issue
Block a user