mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-31 14:35:26 +00:00
Merge branch 'ondrej/enforce-thread-affinity-on-dnsstream' into 'main'
Enforce strong thread-affinity on StreamDNS sockets See merge request isc-projects/bind9!7301
This commit is contained in:
@@ -266,8 +266,6 @@ typedef enum isc__netievent_type {
|
||||
netievent_httpsend,
|
||||
netievent_httpendpoints,
|
||||
|
||||
netievent_streamdnsclose,
|
||||
netievent_streamdnssend,
|
||||
netievent_streamdnsread,
|
||||
netievent_streamdnscancel,
|
||||
|
||||
@@ -1633,16 +1631,10 @@ void
|
||||
isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb,
|
||||
void *cbarg);
|
||||
|
||||
void
|
||||
isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0);
|
||||
|
||||
void
|
||||
isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
||||
isc_nm_cb_t cb, void *cbarg);
|
||||
|
||||
void
|
||||
isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0);
|
||||
|
||||
void
|
||||
isc__nm_streamdns_close(isc_nmsocket_t *sock);
|
||||
|
||||
@@ -1854,8 +1846,6 @@ NETIEVENT_SOCKET_HANDLE_TYPE(udpcancel);
|
||||
|
||||
NETIEVENT_SOCKET_QUOTA_TYPE(tcpaccept);
|
||||
|
||||
NETIEVENT_SOCKET_TYPE(streamdnsclose);
|
||||
NETIEVENT_SOCKET_REQ_TYPE(streamdnssend);
|
||||
NETIEVENT_SOCKET_TYPE(streamdnsread);
|
||||
NETIEVENT_SOCKET_HANDLE_TYPE(streamdnscancel);
|
||||
|
||||
@@ -1900,8 +1890,6 @@ NETIEVENT_SOCKET_DECL(detach);
|
||||
|
||||
NETIEVENT_SOCKET_QUOTA_DECL(tcpaccept);
|
||||
|
||||
NETIEVENT_SOCKET_DECL(streamdnsclose);
|
||||
NETIEVENT_SOCKET_REQ_DECL(streamdnssend);
|
||||
NETIEVENT_SOCKET_DECL(streamdnsread);
|
||||
NETIEVENT_SOCKET_HANDLE_DECL(streamdnscancel);
|
||||
|
||||
|
@@ -459,8 +459,6 @@ process_netievent(void *arg) {
|
||||
NETIEVENT_CASE(httpendpoints);
|
||||
#endif
|
||||
NETIEVENT_CASE(streamdnsread);
|
||||
NETIEVENT_CASE(streamdnssend);
|
||||
NETIEVENT_CASE(streamdnsclose);
|
||||
NETIEVENT_CASE(streamdnscancel);
|
||||
|
||||
NETIEVENT_CASE(settlsctx);
|
||||
@@ -519,8 +517,6 @@ NETIEVENT_SOCKET_DEF(detach);
|
||||
|
||||
NETIEVENT_SOCKET_QUOTA_DEF(tcpaccept);
|
||||
|
||||
NETIEVENT_SOCKET_DEF(streamdnsclose);
|
||||
NETIEVENT_SOCKET_REQ_DEF(streamdnssend);
|
||||
NETIEVENT_SOCKET_DEF(streamdnsread);
|
||||
NETIEVENT_SOCKET_HANDLE_DEF(streamdnscancel);
|
||||
|
||||
|
@@ -53,7 +53,7 @@
|
||||
* 'streamdns_send_req_t' object.
|
||||
*
|
||||
* To understand how sending is done, start by looking at
|
||||
* 'isc__nm_async_streamdnssend()'. Additionally also take a look at
|
||||
* 'isc__nm_streamdns_send()'. Additionally also take a look at
|
||||
* 'streamdns_get_send_req()' and 'streamdns_put_send_req()' which are
|
||||
* responsible for send requests allocation/reuse and initialisation.
|
||||
*
|
||||
@@ -822,16 +822,15 @@ isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb,
|
||||
void *cbarg) {
|
||||
isc_nmsocket_t *sock = NULL;
|
||||
bool closing = false;
|
||||
bool worker_thread;
|
||||
|
||||
REQUIRE(VALID_NMHANDLE(handle));
|
||||
sock = handle->sock;
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->type == isc_nm_streamdnssocket);
|
||||
REQUIRE(sock->recv_handle == NULL);
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
closing = streamdns_closing(sock);
|
||||
worker_thread = sock->tid == isc_tid();
|
||||
|
||||
sock->recv_cb = cb;
|
||||
sock->recv_cbarg = cbarg;
|
||||
@@ -843,64 +842,28 @@ isc__nm_streamdns_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb,
|
||||
* asynchronous as we just want to start reading from the
|
||||
* underlying transport.
|
||||
*/
|
||||
if (worker_thread && !closing &&
|
||||
isc_dnsstream_assembler_result(sock->streamdns.input) ==
|
||||
ISC_R_UNSET)
|
||||
if (!closing && isc_dnsstream_assembler_result(sock->streamdns.input) ==
|
||||
ISC_R_UNSET)
|
||||
{
|
||||
isc__netievent_streamdnsread_t event = { .sock = sock };
|
||||
isc__nm_async_streamdnsread(sock->worker,
|
||||
(isc__netievent_t *)&event);
|
||||
} else {
|
||||
isc__netievent_streamdnsread_t *ievent = NULL;
|
||||
/*
|
||||
* We want the read operation to be asynchronous in most cases
|
||||
* because:
|
||||
*
|
||||
* 1. A read operation might be initiated from within the read
|
||||
* callback itself.
|
||||
*
|
||||
* 2. Due to the above, we need to make the operation
|
||||
* asynchronous to keep the socket state consistent.
|
||||
*/
|
||||
ievent = isc__nm_get_netievent_streamdnsread(sock->worker,
|
||||
sock);
|
||||
isc__nm_enqueue_ievent(sock->worker,
|
||||
(isc__netievent_t *)ievent);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
isc__nm_async_streamdnssend(isc__networker_t *worker, isc__netievent_t *ev0) {
|
||||
isc__netievent_streamdnssend_t *ievent =
|
||||
(isc__netievent_streamdnssend_t *)ev0;
|
||||
isc_nmsocket_t *sock = ievent->sock;
|
||||
isc__nm_uvreq_t *req = ievent->req;
|
||||
streamdns_send_req_t *send_req;
|
||||
isc_mem_t *mctx;
|
||||
isc_region_t data = { 0 };
|
||||
|
||||
REQUIRE(VALID_UVREQ(req));
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
UNUSED(worker);
|
||||
|
||||
ievent->req = NULL;
|
||||
|
||||
if (streamdns_closing(sock)) {
|
||||
isc__nm_failed_send_cb(sock, req, ISC_R_CANCELED, true);
|
||||
return;
|
||||
}
|
||||
|
||||
mctx = sock->worker->mctx;
|
||||
|
||||
send_req = streamdns_get_send_req(sock, mctx, req);
|
||||
data.base = (unsigned char *)req->uvbuf.base;
|
||||
data.length = req->uvbuf.len;
|
||||
isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb,
|
||||
(void *)send_req);
|
||||
|
||||
isc__nm_uvreq_put(&req, sock);
|
||||
return;
|
||||
/*
|
||||
* We want the read operation to be asynchronous in most cases
|
||||
* because:
|
||||
*
|
||||
* 1. A read operation might be initiated from within the read
|
||||
* callback itself.
|
||||
*
|
||||
* 2. Due to the above, we need to make the operation
|
||||
* asynchronous to keep the socket state consistent.
|
||||
*/
|
||||
isc__netievent_streamdnsread_t *ievent =
|
||||
isc__nm_get_netievent_streamdnsread(sock->worker, sock);
|
||||
isc__nm_enqueue_ievent(sock->worker, (isc__netievent_t *)ievent);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -908,6 +871,9 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
||||
isc_nm_cb_t cb, void *cbarg) {
|
||||
isc__nm_uvreq_t *uvreq = NULL;
|
||||
isc_nmsocket_t *sock = NULL;
|
||||
streamdns_send_req_t *send_req;
|
||||
isc_mem_t *mctx;
|
||||
isc_region_t data = { 0 };
|
||||
|
||||
REQUIRE(VALID_NMHANDLE(handle));
|
||||
REQUIRE(VALID_NMSOCK(handle->sock));
|
||||
@@ -916,6 +882,7 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
||||
sock = handle->sock;
|
||||
|
||||
REQUIRE(sock->type == isc_nm_streamdnssocket);
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
uvreq = isc__nm_uvreq_get(sock->worker, sock);
|
||||
isc_nmhandle_attach(handle, &uvreq->handle);
|
||||
@@ -924,23 +891,24 @@ isc__nm_streamdns_send(isc_nmhandle_t *handle, const isc_region_t *region,
|
||||
uvreq->uvbuf.base = (char *)region->base;
|
||||
uvreq->uvbuf.len = region->length;
|
||||
|
||||
if (streamdns_closing(sock)) {
|
||||
isc__nm_failed_send_cb(sock, uvreq, ISC_R_CANCELED, true);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* As when sending, we, basically, handing data to the underlying
|
||||
* transport, we can treat the operation synchronously, as the
|
||||
* transport code will take care of the asynchronicity if required.
|
||||
*/
|
||||
if (sock->tid == isc_tid()) {
|
||||
isc__netievent_streamdnssend_t event = { .sock = sock,
|
||||
.req = uvreq };
|
||||
isc__nm_async_streamdnssend(sock->worker,
|
||||
(isc__netievent_t *)&event);
|
||||
} else {
|
||||
isc__netievent_streamdnssend_t *ievent =
|
||||
isc__nm_get_netievent_streamdnssend(sock->worker, sock,
|
||||
uvreq);
|
||||
isc__nm_enqueue_ievent(sock->worker,
|
||||
(isc__netievent_t *)ievent);
|
||||
}
|
||||
mctx = sock->worker->mctx;
|
||||
send_req = streamdns_get_send_req(sock, mctx, uvreq);
|
||||
data.base = (unsigned char *)uvreq->uvbuf.base;
|
||||
data.length = uvreq->uvbuf.len;
|
||||
isc__nm_senddns(sock->outerhandle, &data, streamdns_writecb,
|
||||
(void *)send_req);
|
||||
|
||||
isc__nm_uvreq_put(&uvreq, sock);
|
||||
}
|
||||
|
||||
static void
|
||||
@@ -969,21 +937,11 @@ streamdns_close_direct(isc_nmsocket_t *sock) {
|
||||
atomic_store(&sock->active, false);
|
||||
}
|
||||
|
||||
void
|
||||
isc__nm_async_streamdnsclose(isc__networker_t *worker, isc__netievent_t *ev0) {
|
||||
isc__netievent_streamdnsclose_t *ievent =
|
||||
(isc__netievent_streamdnsclose_t *)ev0;
|
||||
isc_nmsocket_t *sock = ievent->sock;
|
||||
|
||||
UNUSED(worker);
|
||||
|
||||
streamdns_close_direct(sock);
|
||||
}
|
||||
|
||||
void
|
||||
isc__nm_streamdns_close(isc_nmsocket_t *sock) {
|
||||
REQUIRE(VALID_NMSOCK(sock));
|
||||
REQUIRE(sock->type == isc_nm_streamdnssocket);
|
||||
REQUIRE(sock->tid == isc_tid());
|
||||
|
||||
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
|
||||
true))
|
||||
@@ -991,15 +949,7 @@ isc__nm_streamdns_close(isc_nmsocket_t *sock) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (sock->tid == isc_tid()) {
|
||||
streamdns_close_direct(sock);
|
||||
} else {
|
||||
isc__netievent_streamdnsclose_t *ievent =
|
||||
isc__nm_get_netievent_streamdnsclose(sock->worker,
|
||||
sock);
|
||||
isc__nm_enqueue_ievent(sock->worker,
|
||||
(isc__netievent_t *)ievent);
|
||||
}
|
||||
streamdns_close_direct(sock);
|
||||
}
|
||||
|
||||
void
|
||||
|
Reference in New Issue
Block a user