2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-22 10:10:06 +00:00

Use barriers for netmgr synchronization

The netmgr listening, stoplistening, pausing and resuming functions
now use barriers for synchronization, which makes the code much simpler.

isc/barrier.h defines isc_barrier macros as a front-end for uv_barrier
on platforms where that works, and pthread_barrier where it doesn't
(including TSAN builds).
This commit is contained in:
Ondřej Surý 2021-05-05 11:51:39 +02:00 committed by Evan Hunt
parent 2eae7813b6
commit 4c8f6ebeb1
14 changed files with 589 additions and 328 deletions

View File

@ -11,6 +11,7 @@ libisc_la_HEADERS = \
include/isc/atomic.h \
include/isc/attributes.h \
include/isc/backtrace.h \
include/isc/barrier.h \
include/isc/base32.h \
include/isc/base64.h \
include/isc/bind9.h \

View File

@ -0,0 +1,37 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#pragma once
#include <isc/util.h>
#if __SANITIZE_THREAD__ && !defined(WIN32)
#include <pthread.h>
#define isc_barrier_t pthread_barrier_t
#define isc_barrier_init(barrier, count) \
pthread_barrier_init(barrier, NULL, count)
#define isc_barrier_destroy(barrier) pthread_barrier_destroy(barrier)
#define isc_barrier_wait(barrier) pthread_barrier_wait(barrier)
#else
#include <uv.h>
#define isc_barrier_t uv_barrier_t
#define isc_barrier_init(barrier, count) uv_barrier_init(barrier, count)
#define isc_barrier_destroy(barrier) uv_barrier_destroy(barrier)
#define isc_barrier_wait(barrier) uv_barrier_wait(barrier)
#endif /* __SANITIZE_THREAD__ */

View File

@ -1199,7 +1199,7 @@ isc_nm_httpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
sock->connect_cb = cb;
sock->connect_cbarg = cbarg;
atomic_init(&sock->client, true);
@ -2170,7 +2170,7 @@ isc_nm_listenhttp(isc_nm_t *mgr, isc_nmiface_t *iface, int backlog,
isc__nmsocket_attach(sock, &sock->outer->h2.httpserver);
sock->nchildren = sock->outer->nchildren;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
sock->tid = isc_random_uniform(sock->nchildren);
sock->fd = (uv_os_sock_t)-1;

View File

@ -19,6 +19,7 @@
#include <isc/astack.h>
#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/magic.h>
@ -174,7 +175,6 @@ typedef struct isc__networker {
uv_async_t async; /* async channel to send
* data to this networker */
isc_mutex_t lock;
isc_condition_t cond;
bool paused;
bool finished;
isc_thread_t thread;
@ -185,6 +185,8 @@ typedef struct isc__networker {
* used for listening etc.
* can be processed while
* worker is paused */
isc_condition_t cond_prio;
isc_refcount_t references;
atomic_int_fast64_t pktcount;
char *recvbuf;
@ -671,7 +673,7 @@ struct isc_nm {
isc_mutex_t evlock;
uint_fast32_t workers_running;
uint_fast32_t workers_paused;
atomic_uint_fast32_t workers_paused;
atomic_uint_fast32_t maxudp;
atomic_bool paused;
@ -702,6 +704,9 @@ struct isc_nm {
atomic_uint_fast32_t keepalive;
atomic_uint_fast32_t advertised;
isc_barrier_t pausing;
isc_barrier_t resuming;
#ifdef NETMGR_TRACE
ISC_LIST(isc_nmsocket_t) active_sockets;
#endif
@ -836,6 +841,9 @@ struct isc_nmsocket {
/*% Self socket */
isc_nmsocket_t *self;
isc_barrier_t startlistening;
isc_barrier_t stoplistening;
/*% TLS stuff */
struct tls {
isc_tls_t *tls;
@ -930,7 +938,7 @@ struct isc_nmsocket {
/* Atomic */
/*% Number of running (e.g. listening) child sockets */
uint_fast32_t rchildren;
atomic_uint_fast32_t rchildren;
/*%
* Socket is active if it's listening, working, etc. If it's

View File

@ -15,6 +15,7 @@
#include <isc/atomic.h>
#include <isc/backtrace.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
@ -135,8 +136,12 @@ nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static bool
process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue,
unsigned int *quantump);
static void
wait_for_priority_queue(isc__networker_t *worker);
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
@ -146,6 +151,15 @@ process_task_queue(isc__networker_t *worker, unsigned int *quantump);
static bool
process_normal_queue(isc__networker_t *worker, unsigned int *quantump);
#define drain_priority_queue(worker) \
(void)process_priority_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_privilege_queue(worker) \
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_task_queue(worker) \
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX })
#define drain_normal_queue(worker) \
(void)process_normal_queue(worker, &(unsigned int){ UINT_MAX })
static void
isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
static void
@ -229,6 +243,7 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_refcount_init(&mgr->references, 1);
atomic_init(&mgr->maxudp, 0);
atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED);
atomic_init(&mgr->workers_paused, 0);
#ifdef NETMGR_TRACE
ISC_LIST_INIT(mgr->active_sockets);
@ -258,6 +273,9 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_mempool_associatelock(mgr->evpool, &mgr->evlock);
isc_mempool_setfillcount(mgr->evpool, 32);
isc_barrier_init(&mgr->pausing, workers);
isc_barrier_init(&mgr->resuming, workers);
mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
for (size_t i = 0; i < workers; i++) {
int r;
@ -277,12 +295,13 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
RUNTIME_CHECK(r == 0);
isc_mutex_init(&worker->lock);
isc_condition_init(&worker->cond);
worker->ievents = isc_queue_new(mgr->mctx, 128);
worker->ievents_task = isc_queue_new(mgr->mctx, 128);
worker->ievents_priv = isc_queue_new(mgr->mctx, 128);
worker->ievents_prio = isc_queue_new(mgr->mctx, 128);
isc_condition_init(&worker->cond_prio);
worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE);
@ -351,6 +370,7 @@ nm_destroy(isc_nm_t **mgr0) {
{
isc_mempool_put(mgr->evpool, ievent);
}
isc_condition_destroy(&worker->cond_prio);
r = uv_loop_close(&worker->loop);
INSIST(r == 0);
@ -360,7 +380,6 @@ nm_destroy(isc_nm_t **mgr0) {
isc_queue_destroy(worker->ievents_task);
isc_queue_destroy(worker->ievents_prio);
isc_mutex_destroy(&worker->lock);
isc_condition_destroy(&worker->cond);
isc_mem_put(mgr->mctx, worker->sendbuf,
ISC_NETMGR_SENDBUF_SIZE);
@ -373,6 +392,9 @@ nm_destroy(isc_nm_t **mgr0) {
isc_stats_detach(&mgr->stats);
}
isc_barrier_destroy(&mgr->resuming);
isc_barrier_destroy(&mgr->pausing);
isc_condition_destroy(&mgr->wkstatecond);
isc_condition_destroy(&mgr->wkpausecond);
isc_mutex_destroy(&mgr->lock);
@ -413,13 +435,18 @@ isc_nm_pause(isc_nm_t *mgr) {
}
}
if (isc__nm_in_netthread()) {
isc_barrier_wait(&mgr->pausing);
}
LOCK(&mgr->lock);
while (mgr->workers_paused != pausing) {
while (atomic_load(&mgr->workers_paused) != pausing) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false },
true));
UNLOCK(&mgr->lock);
}
void
@ -439,14 +466,19 @@ isc_nm_resume(isc_nm_t *mgr) {
}
}
if (isc__nm_in_netthread()) {
isc_barrier_wait(&mgr->resuming);
}
LOCK(&mgr->lock);
while (mgr->workers_paused != 0) {
while (atomic_load(&mgr->workers_paused) != 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true },
false));
BROADCAST(&mgr->wkpausecond);
UNLOCK(&mgr->lock);
isc__nm_drop_interlocked(mgr);
}
@ -617,45 +649,29 @@ nm_thread(isc_threadarg_t worker0) {
if (worker->paused) {
INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid());
/*
* We need to lock the worker first; otherwise
* isc_nm_resume() might slip in before WAIT() in
* the while loop starts, then the signal never
* gets delivered and we are stuck forever in the
* paused loop.
*/
LOCK(&worker->lock);
LOCK(&mgr->lock);
mgr->workers_paused++;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
while (worker->paused) {
WAIT(&worker->cond, &worker->lock);
UNLOCK(&worker->lock);
(void)process_priority_queue(
worker, &(unsigned int){ UINT_MAX });
LOCK(&worker->lock);
atomic_fetch_add(&mgr->workers_paused, 1);
if (isc_barrier_wait(&mgr->pausing) != 0) {
LOCK(&mgr->lock);
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
}
LOCK(&mgr->lock);
mgr->workers_paused--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
UNLOCK(&worker->lock);
while (worker->paused) {
wait_for_priority_queue(worker);
}
/*
* All workers must drain the privileged event
* queue before we resume from pause.
*/
(void)process_privilege_queue(
worker, &(unsigned int){ UINT_MAX });
drain_privilege_queue(worker);
LOCK(&mgr->lock);
while (atomic_load(&mgr->paused)) {
WAIT(&mgr->wkpausecond, &mgr->lock);
atomic_fetch_sub(&mgr->workers_paused, 1);
if (isc_barrier_wait(&mgr->resuming) != 0) {
LOCK(&mgr->lock);
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
}
UNLOCK(&mgr->lock);
}
if (r == 0) {
@ -671,8 +687,8 @@ nm_thread(isc_threadarg_t worker0) {
* (they may include shutdown events) but do not process
* the netmgr event queue.
*/
(void)process_privilege_queue(worker, &(unsigned int){ UINT_MAX });
(void)process_task_queue(worker, &(unsigned int){ UINT_MAX });
drain_privilege_queue(worker);
drain_task_queue(worker);
LOCK(&mgr->lock);
mgr->workers_running--;
@ -792,6 +808,34 @@ isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) {
}
}
static void
wait_for_priority_queue(isc__networker_t *worker) {
isc_queue_t *queue = worker->ievents_prio;
isc_condition_t *cond = &worker->cond_prio;
bool wait_for_work = true;
while (true) {
isc__netievent_t *ievent;
LOCK(&worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(queue);
if (wait_for_work) {
while (ievent == NULL) {
WAIT(cond, &worker->lock);
ievent = (isc__netievent_t *)isc_queue_dequeue(
queue);
}
}
UNLOCK(&worker->lock);
wait_for_work = false;
if (ievent == NULL) {
return;
}
(void)process_netievent(worker, ievent);
}
}
static bool
process_priority_queue(isc__networker_t *worker, unsigned int *quantump) {
return (process_queue(worker, worker->ievents_prio, quantump));
@ -917,13 +961,13 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue,
isc__netievent_t *ievent =
(isc__netievent_t *)isc_queue_dequeue(queue);
(*quantump)--;
if (ievent == NULL) {
/* We fully drained this queue */
return (true);
}
(*quantump)--;
if (!process_netievent(worker, ievent)) {
/* Netievent told us to stop */
return (false);
@ -1034,7 +1078,7 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
*/
LOCK(&worker->lock);
isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event);
SIGNAL(&worker->cond);
SIGNAL(&worker->cond_prio);
UNLOCK(&worker->lock);
} else if (event->type == netievent_privilegedtask) {
isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event);
@ -1122,7 +1166,14 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) {
}
/*
* This was a parent socket; free the children.
* This was a parent socket: destroy the listening
* barriers that synchronized the children.
*/
isc_barrier_destroy(&sock->startlistening);
isc_barrier_destroy(&sock->stoplistening);
/*
* Now free them.
*/
isc_mem_put(sock->mgr->mctx, sock->children,
sock->nchildren * sizeof(*sock));
@ -1169,7 +1220,6 @@ nmsocket_cleanup(isc_nmsocket_t *sock, bool dofree FLARG) {
isc_mem_free(sock->mgr->mctx, sock->ah_frees);
isc_mem_free(sock->mgr->mctx, sock->ah_handles);
isc_mutex_destroy(&sock->lock);
isc_condition_destroy(&sock->cond);
isc_condition_destroy(&sock->scond);
isc__nm_tls_cleanup_data(sock);
isc__nm_http_cleanup_data(sock);
@ -1416,7 +1466,6 @@ isc___nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type,
}
isc_mutex_init(&sock->lock);
isc_condition_init(&sock->cond);
isc_condition_init(&sock->scond);
isc_refcount_init(&sock->references, 1);

View File

@ -14,6 +14,7 @@
#include <uv.h>
#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
@ -116,7 +117,7 @@ failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
@ -302,7 +303,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
sock->fd = (uv_os_sock_t)-1;
atomic_init(&sock->client, true);
@ -344,7 +345,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
(isc__netievent_t *)ievent);
}
LOCK(&sock->lock);
while (sock->result == ISC_R_DEFAULT) {
while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
@ -375,6 +376,39 @@ isc__nm_tcp_lb_socket(sa_family_t sa_family) {
return (sock);
}
static void
start_tcp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
uv_os_sock_t fd, int tid) {
isc__netievent_tcplisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[tid];
isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
csock->parent = sock;
csock->accept_cb = sock->accept_cb;
csock->accept_cbarg = sock->accept_cbarg;
csock->extrahandlesize = sock->extrahandlesize;
csock->backlog = sock->backlog;
csock->tid = tid;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = sock->pquota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
UNUSED(fd);
csock->fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
(isc__netievent_t *)ievent);
}
isc_result_t
isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_accept_cb_t accept_cb, void *accept_cbarg,
@ -382,18 +416,15 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface);
sock->rchildren = 0;
atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
@ -403,42 +434,36 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
sock->result = ISC_R_DEFAULT;
sock->tid = isc_random_uniform(sock->nchildren);
sock->result = ISC_R_UNSET;
sock->accept_cb = accept_cb;
sock->accept_cbarg = accept_cbarg;
sock->extrahandlesize = extrahandlesize;
sock->backlog = backlog;
sock->pquota = quota;
if (isc__nm_in_netthread()) {
sock->tid = isc_nm_tid();
} else {
sock->tid = isc_random_uniform(sock->nchildren);
}
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
fd = isc__nm_tcp_lb_socket(sa_family);
fd = isc__nm_tcp_lb_socket(iface->addr.type.sa.sa_family);
#endif
isc_barrier_init(&sock->startlistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tcplisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
if ((int)i == isc_nm_tid()) {
continue;
}
start_tcp_child(mgr, iface, sock, fd, i);
}
isc__nmsocket_init(csock, mgr, isc_nm_tcpsocket, iface);
csock->parent = sock;
csock->accept_cb = accept_cb;
csock->accept_cbarg = accept_cbarg;
csock->extrahandlesize = extrahandlesize;
csock->backlog = backlog;
csock->tid = i;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = quota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
csock->fd = isc__nm_tcp_lb_socket(sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcplisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
(isc__netievent_t *)ievent);
if (isc__nm_in_netthread()) {
start_tcp_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
@ -446,21 +471,21 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface,
#endif
LOCK(&sock->lock);
while (sock->rchildren != sock->nchildren) {
while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
INSIST(result != ISC_R_DEFAULT);
INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
REQUIRE(sock->rchildren == sock->nchildren);
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
isc__nm_tcp_stoplistening(sock);
isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
@ -565,16 +590,14 @@ done:
sock->pquota = NULL;
}
sock->parent->rchildren += 1;
if (sock->parent->result == ISC_R_DEFAULT) {
atomic_fetch_add(&sock->parent->rchildren, 1);
if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
if (!atomic_load(&sock->parent->active)) {
WAIT(&sock->parent->scond, &sock->parent->lock);
}
INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
isc_barrier_wait(&sock->parent->startlistening);
}
static void
@ -637,7 +660,15 @@ isc__nm_tcp_stoplistening(isc_nmsocket_t *sock) {
INSIST(0);
ISC_UNREACHABLE();
}
enqueue_stoplistening(sock);
if (!isc__nm_in_netthread()) {
enqueue_stoplistening(sock);
} else if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tcp_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
void
@ -655,7 +686,12 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
return;
}
stop_tcp_parent(sock);
if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tcp_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
void
@ -1194,8 +1230,6 @@ timer_close_cb(uv_handle_t *handle) {
static void
stop_tcp_child(isc_nmsocket_t *sock) {
bool last_child = false;
REQUIRE(sock->type == isc_nm_tcpsocket);
REQUIRE(sock->tid == isc_nm_tid());
@ -1206,33 +1240,40 @@ stop_tcp_child(isc_nmsocket_t *sock) {
tcp_close_direct(sock);
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
if (last_child) {
atomic_store(&sock->parent->closed, true);
isc__nmsocket_prep_destroy(sock->parent);
}
isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tcp_parent(isc_nmsocket_t *sock) {
isc_nmsocket_t *csock = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcplistener);
isc_barrier_init(&sock->stoplistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tcpstop_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if ((int)i == isc_nm_tid()) {
/*
* We need to schedule closing the other sockets first
*/
continue;
}
ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
atomic_store(&csock->active, false);
enqueue_stoplistening(csock);
}
csock = &sock->children[isc_nm_tid()];
atomic_store(&csock->active, false);
stop_tcp_child(csock);
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
static void

View File

@ -14,6 +14,7 @@
#include <uv.h>
#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
@ -85,7 +86,7 @@ stop_tcpdns_child(isc_nmsocket_t *sock);
static isc_result_t
tcpdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
@ -269,7 +270,7 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
atomic_init(&sock->client, true);
req = isc__nm_uvreq_get(mgr, sock);
@ -311,7 +312,7 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
}
LOCK(&sock->lock);
while (sock->result == ISC_R_DEFAULT) {
while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
@ -342,6 +343,40 @@ isc__nm_tcpdns_lb_socket(sa_family_t sa_family) {
return (sock);
}
static void
start_tcpdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
uv_os_sock_t fd, int tid) {
isc__netievent_tcpdnslisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[tid];
isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface);
csock->parent = sock;
csock->accept_cb = sock->accept_cb;
csock->accept_cbarg = sock->accept_cbarg;
csock->recv_cb = sock->recv_cb;
csock->recv_cbarg = sock->recv_cbarg;
csock->extrahandlesize = sock->extrahandlesize;
csock->backlog = sock->backlog;
csock->tid = tid;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = sock->pquota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
UNUSED(fd);
csock->fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
(isc__netievent_t *)ievent);
}
isc_result_t
isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t recv_cb, void *recv_cbarg,
@ -350,18 +385,15 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tcpdnslistener, iface);
sock->rchildren = 0;
atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
@ -371,44 +403,37 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
sock->result = ISC_R_DEFAULT;
sock->tid = isc_random_uniform(sock->nchildren);
sock->result = ISC_R_UNSET;
sock->accept_cb = accept_cb;
sock->accept_cbarg = accept_cbarg;
sock->recv_cb = recv_cb;
sock->recv_cbarg = recv_cbarg;
sock->extrahandlesize = extrahandlesize;
sock->backlog = backlog;
sock->pquota = quota;
if (isc__nm_in_netthread()) {
sock->tid = isc_nm_tid();
} else {
sock->tid = isc_random_uniform(sock->nchildren);
}
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
fd = isc__nm_tcpdns_lb_socket(sa_family);
fd = isc__nm_tcpdns_lb_socket(iface->addr.type.sa.sa_family);
#endif
isc_barrier_init(&sock->startlistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tcpdnslisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
if ((int)i == isc_nm_tid()) {
continue;
}
start_tcpdns_child(mgr, iface, sock, fd, i);
}
isc__nmsocket_init(csock, mgr, isc_nm_tcpdnssocket, iface);
csock->parent = sock;
csock->accept_cb = accept_cb;
csock->accept_cbarg = accept_cbarg;
csock->recv_cb = recv_cb;
csock->recv_cbarg = recv_cbarg;
csock->extrahandlesize = extrahandlesize;
csock->backlog = backlog;
csock->tid = i;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = quota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
csock->fd = isc__nm_tcpdns_lb_socket(sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
(isc__netievent_t *)ievent);
if (isc__nm_in_netthread()) {
start_tcpdns_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
@ -416,21 +441,21 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface,
#endif
LOCK(&sock->lock);
while (sock->rchildren != sock->nchildren) {
while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
INSIST(result != ISC_R_DEFAULT);
INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
REQUIRE(sock->rchildren == sock->nchildren);
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
isc__nm_tcpdns_stoplistening(sock);
isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
@ -446,7 +471,7 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) {
int r;
int flags = 0;
isc_nmsocket_t *sock = NULL;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
@ -534,16 +559,14 @@ done:
sock->pquota = NULL;
}
sock->parent->rchildren += 1;
if (sock->parent->result == ISC_R_DEFAULT) {
atomic_fetch_add(&sock->parent->rchildren, 1);
if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
if (!atomic_load(&sock->parent->active)) {
WAIT(&sock->parent->scond, &sock->parent->lock);
}
INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
isc_barrier_wait(&sock->parent->startlistening);
}
static void
@ -607,7 +630,15 @@ isc__nm_tcpdns_stoplistening(isc_nmsocket_t *sock) {
INSIST(0);
ISC_UNREACHABLE();
}
enqueue_stoplistening(sock);
if (!isc__nm_in_netthread()) {
enqueue_stoplistening(sock);
} else if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tcpdns_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
void
@ -626,7 +657,15 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) {
return;
}
stop_tcpdns_parent(sock);
/*
* If network manager is paused, re-enqueue the event for later.
*/
if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tcpdns_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
void
@ -1224,8 +1263,6 @@ timer_close_cb(uv_handle_t *timer) {
static void
stop_tcpdns_child(isc_nmsocket_t *sock) {
bool last_child = false;
REQUIRE(sock->type == isc_nm_tcpdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
@ -1236,33 +1273,41 @@ stop_tcpdns_child(isc_nmsocket_t *sock) {
tcpdns_close_direct(sock);
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
if (last_child) {
atomic_store(&sock->parent->closed, true);
isc__nmsocket_prep_destroy(sock->parent);
}
isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tcpdns_parent(isc_nmsocket_t *sock) {
isc_nmsocket_t *csock = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tcpdnslistener);
isc_barrier_init(&sock->stoplistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tcpdnsstop_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if ((int)i == isc_nm_tid()) {
/*
* We need to schedule closing the other sockets first
*/
continue;
}
ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
atomic_store(&csock->active, false);
enqueue_stoplistening(csock);
}
csock = &sock->children[isc_nm_tid()];
atomic_store(&csock->active, false);
stop_tcpdns_child(csock);
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
static void

View File

@ -14,6 +14,7 @@
#include <uv.h>
#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
@ -97,7 +98,7 @@ can_log_tlsdns_quota(void) {
static isc_result_t
tlsdns_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
int r;
REQUIRE(VALID_NMSOCK(sock));
@ -324,7 +325,7 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
sock->extrahandlesize = extrahandlesize;
sock->connect_timeout = timeout;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
sock->tls.ctx = sslctx;
atomic_init(&sock->client, true);
atomic_init(&sock->connecting, true);
@ -364,7 +365,7 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
(isc__netievent_t *)ievent);
}
LOCK(&sock->lock);
while (sock->result == ISC_R_DEFAULT) {
while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
@ -407,6 +408,43 @@ isc__nm_tlsdns_lb_socket(sa_family_t sa_family) {
return (sock);
}
static void
start_tlsdns_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
uv_os_sock_t fd, int tid) {
isc__netievent_tlsdnslisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[tid];
isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface);
csock->parent = sock;
csock->accept_cb = sock->accept_cb;
csock->accept_cbarg = sock->accept_cbarg;
csock->recv_cb = sock->recv_cb;
csock->recv_cbarg = sock->recv_cbarg;
csock->extrahandlesize = sock->extrahandlesize;
csock->backlog = sock->backlog;
csock->tid = tid;
csock->tls.ctx = sock->tls.ctx;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = sock->pquota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
UNUSED(fd);
csock->fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
(isc__netievent_t *)ievent);
}
isc_result_t
isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_nm_recv_cb_t recv_cb, void *recv_cbarg,
@ -415,18 +453,15 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface,
isc_tlsctx_t *sslctx, isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
#endif
REQUIRE(VALID_NM(mgr));
sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(sock, mgr, isc_nm_tlsdnslistener, iface);
sock->rchildren = 0;
atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
@ -436,47 +471,39 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface,
sock->children = isc_mem_get(mgr->mctx, children_size);
memset(sock->children, 0, children_size);
sock->result = ISC_R_DEFAULT;
sock->tid = isc_random_uniform(sock->nchildren);
sock->fd = -1;
sock->result = ISC_R_UNSET;
sock->accept_cb = accept_cb;
sock->accept_cbarg = accept_cbarg;
sock->recv_cb = recv_cb;
sock->recv_cbarg = recv_cbarg;
sock->extrahandlesize = extrahandlesize;
sock->backlog = backlog;
sock->pquota = quota;
if (isc__nm_in_netthread()) {
sock->tid = isc_nm_tid();
} else {
sock->tid = isc_random_uniform(sock->nchildren);
}
sock->tls.ctx = sslctx;
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
fd = isc__nm_tlsdns_lb_socket(sa_family);
fd = isc__nm_tlsdns_lb_socket(iface->addr.type.sa.sa_family);
#endif
isc_barrier_init(&sock->startlistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tlsdnslisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
if ((int)i == isc_nm_tid()) {
continue;
}
start_tlsdns_child(mgr, iface, sock, fd, i);
}
isc__nmsocket_init(csock, mgr, isc_nm_tlsdnssocket, iface);
csock->parent = sock;
csock->accept_cb = accept_cb;
csock->accept_cbarg = accept_cbarg;
csock->recv_cb = recv_cb;
csock->recv_cbarg = recv_cbarg;
csock->extrahandlesize = extrahandlesize;
csock->backlog = backlog;
csock->tid = i;
csock->tls.ctx = sslctx;
/*
* We don't attach to quota, just assign - to avoid
* increasing quota unnecessarily.
*/
csock->pquota = quota;
isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock);
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
csock->fd = isc__nm_tlsdns_lb_socket(sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
(isc__netievent_t *)ievent);
if (isc__nm_in_netthread()) {
start_tlsdns_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
@ -484,21 +511,21 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface,
#endif
LOCK(&sock->lock);
while (sock->rchildren != sock->nchildren) {
while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
INSIST(result != ISC_R_DEFAULT);
INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
REQUIRE(sock->rchildren == sock->nchildren);
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
isc__nm_tlsdns_stoplistening(sock);
isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
@ -514,7 +541,7 @@ isc__nm_async_tlsdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) {
int r;
int flags = 0;
isc_nmsocket_t *sock = NULL;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
@ -603,16 +630,14 @@ done:
sock->pquota = NULL;
}
sock->parent->rchildren += 1;
if (sock->parent->result == ISC_R_DEFAULT) {
atomic_fetch_add(&sock->parent->rchildren, 1);
if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
if (!atomic_load(&sock->parent->active)) {
WAIT(&sock->parent->scond, &sock->parent->lock);
}
INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
isc_barrier_wait(&sock->parent->startlistening);
}
static void
@ -676,7 +701,15 @@ isc__nm_tlsdns_stoplistening(isc_nmsocket_t *sock) {
INSIST(0);
ISC_UNREACHABLE();
}
enqueue_stoplistening(sock);
if (!isc__nm_in_netthread()) {
enqueue_stoplistening(sock);
} else if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tlsdns_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
static void
@ -770,7 +803,15 @@ isc__nm_async_tlsdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) {
return;
}
stop_tlsdns_parent(sock);
/*
* If network manager is paused, re-enqueue the event for later.
*/
if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_tlsdns_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
void
@ -1770,8 +1811,6 @@ timer_close_cb(uv_handle_t *handle) {
static void
stop_tlsdns_child(isc_nmsocket_t *sock) {
bool last_child = false;
REQUIRE(sock->type == isc_nm_tlsdnssocket);
REQUIRE(sock->tid == isc_nm_tid());
@ -1782,34 +1821,42 @@ stop_tlsdns_child(isc_nmsocket_t *sock) {
tlsdns_close_direct(sock);
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
if (last_child) {
atomic_store(&sock->parent->closed, true);
isc__nmsocket_prep_destroy(sock->parent);
}
isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_tlsdns_parent(isc_nmsocket_t *sock) {
isc_nmsocket_t *csock = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_tlsdnslistener);
isc_barrier_init(&sock->stoplistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_tlsdnsstop_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if ((int)i == isc_nm_tid()) {
/*
* We need to schedule closing the other sockets first
*/
continue;
}
ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid],
(isc__netievent_t *)ievent);
atomic_store(&csock->active, false);
enqueue_stoplistening(csock);
}
csock = &sock->children[isc_nm_tid()];
atomic_store(&csock->active, false);
stop_tlsdns_child(csock);
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
static void

View File

@ -621,7 +621,7 @@ isc_nm_listentls(isc_nm_t *mgr, isc_nmiface_t *iface,
tlssock->tlsstream.server_iface = *iface;
ISC_LINK_INIT(&tlssock->tlsstream.server_iface.addr, link);
tlssock->iface = &tlssock->tlsstream.server_iface;
tlssock->result = ISC_R_DEFAULT;
tlssock->result = ISC_R_UNSET;
tlssock->accept_cb = accept_cb;
tlssock->accept_cbarg = accept_cbarg;
tlssock->extrahandlesize = extrahandlesize;
@ -643,19 +643,12 @@ isc_nm_listentls(isc_nm_t *mgr, isc_nmiface_t *iface,
/* wait for listen result */
isc__nmsocket_attach(tlssock->outer, &tsock);
LOCK(&tlssock->outer->lock);
while (tlssock->outer->rchildren != tlssock->outer->nchildren) {
WAIT(&tlssock->outer->cond, &tlssock->outer->lock);
}
result = tlssock->outer->result;
tlssock->result = result;
atomic_store(&tlssock->active, true);
INSIST(tlssock->outer->tlsstream.tlslistener == NULL);
isc__nmsocket_attach(tlssock, &tlssock->outer->tlsstream.tlslistener);
BROADCAST(&tlssock->outer->scond);
UNLOCK(&tlssock->outer->lock);
isc__nmsocket_detach(&tsock);
INSIST(result != ISC_R_DEFAULT);
INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
atomic_store(&tlssock->listening, true);
@ -891,7 +884,7 @@ isc_nm_tlsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
ISC_LINK_INIT(&nsock->tlsstream.local_iface.addr, link);
nsock->iface = &nsock->tlsstream.local_iface;
nsock->extrahandlesize = extrahandlesize;
nsock->result = ISC_R_DEFAULT;
nsock->result = ISC_R_UNSET;
nsock->connect_cb = cb;
nsock->connect_cbarg = cbarg;
nsock->connect_timeout = timeout;

View File

@ -13,6 +13,7 @@
#include <uv.h>
#include <isc/atomic.h>
#include <isc/barrier.h>
#include <isc/buffer.h>
#include <isc/condition.h>
#include <isc/errno.h>
@ -77,18 +78,44 @@ isc__nm_udp_lb_socket(sa_family_t sa_family) {
return (sock);
}
static void
start_udp_child(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock,
uv_os_sock_t fd, int tid) {
isc_nmsocket_t *csock;
isc__netievent_udplisten_t *ievent = NULL;
csock = &sock->children[tid];
isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
csock->parent = sock;
csock->iface = sock->iface;
csock->reading = true;
csock->recv_cb = sock->recv_cb;
csock->recv_cbarg = sock->recv_cbarg;
csock->extrahandlesize = sock->extrahandlesize;
csock->tid = tid;
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
UNUSED(fd);
csock->fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_udplisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[tid],
(isc__netievent_t *)ievent);
}
isc_result_t
isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *sock = NULL;
sa_family_t sa_family = iface->addr.type.sa.sa_family;
size_t children_size = 0;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
uv_os_sock_t fd = -1;
#endif
REQUIRE(VALID_NM(mgr));
uv_os_sock_t fd = -1;
/*
* We are creating mgr->nworkers duplicated sockets, one
@ -97,7 +124,7 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
sock = isc_mem_get(mgr->mctx, sizeof(isc_nmsocket_t));
isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface);
sock->rchildren = 0;
atomic_init(&sock->rchildren, 0);
#if defined(WIN32)
sock->nchildren = 1;
#else
@ -111,37 +138,29 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->extrahandlesize = extrahandlesize;
sock->result = ISC_R_DEFAULT;
sock->tid = isc_random_uniform(sock->nchildren);
sock->result = ISC_R_UNSET;
if (isc__nm_in_netthread()) {
sock->tid = isc_nm_tid();
} else {
sock->tid = isc_random_uniform(sock->nchildren);
}
sock->fd = -1;
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
fd = isc__nm_udp_lb_socket(sa_family);
fd = isc__nm_udp_lb_socket(iface->addr.type.sa.sa_family);
#endif
isc_barrier_init(&sock->startlistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_udplisten_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
if ((int)i == isc_nm_tid()) {
continue;
}
start_udp_child(mgr, iface, sock, fd, i);
}
isc__nmsocket_init(csock, mgr, isc_nm_udpsocket, iface);
csock->parent = sock;
csock->iface = sock->iface;
csock->reading = true;
csock->recv_cb = cb;
csock->recv_cbarg = cbarg;
csock->extrahandlesize = sock->extrahandlesize;
csock->tid = i;
#if HAVE_SO_REUSEPORT_LB || defined(WIN32)
csock->fd = isc__nm_udp_lb_socket(sa_family);
#else
csock->fd = dup(fd);
#endif
REQUIRE(csock->fd >= 0);
ievent = isc__nm_get_netievent_udplisten(mgr, csock);
isc__nm_maybe_enqueue_ievent(&mgr->workers[i],
(isc__netievent_t *)ievent);
if (isc__nm_in_netthread()) {
start_udp_child(mgr, iface, sock, fd, isc_nm_tid());
}
#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32)
@ -149,21 +168,21 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
#endif
LOCK(&sock->lock);
while (sock->rchildren != sock->nchildren) {
while (atomic_load(&sock->rchildren) != sock->nchildren) {
WAIT(&sock->cond, &sock->lock);
}
result = sock->result;
atomic_store(&sock->active, true);
BROADCAST(&sock->scond);
UNLOCK(&sock->lock);
INSIST(result != ISC_R_DEFAULT);
INSIST(result != ISC_R_UNSET);
if (result == ISC_R_SUCCESS) {
REQUIRE(sock->rchildren == sock->nchildren);
REQUIRE(atomic_load(&sock->rchildren) == sock->nchildren);
*sockp = sock;
} else {
atomic_store(&sock->active, false);
isc__nm_udp_stoplistening(sock);
isc_nm_stoplistening(sock);
isc_nmsocket_close(&sock);
}
@ -181,7 +200,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
int r, uv_bind_flags = 0;
int uv_init_flags = 0;
sa_family_t sa_family;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(ievent->sock->tid == isc_nm_tid());
@ -269,16 +288,14 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
done:
result = isc__nm_uverr2result(r);
sock->parent->rchildren += 1;
if (sock->parent->result == ISC_R_DEFAULT) {
atomic_fetch_add(&sock->parent->rchildren, 1);
if (sock->parent->result == ISC_R_UNSET) {
sock->parent->result = result;
}
SIGNAL(&sock->parent->cond);
if (!atomic_load(&sock->parent->active)) {
WAIT(&sock->parent->scond, &sock->parent->lock);
}
INSIST(atomic_load(&sock->parent->active));
UNLOCK(&sock->parent->lock);
isc_barrier_wait(&sock->parent->startlistening);
}
static void
@ -300,7 +317,14 @@ isc__nm_udp_stoplistening(isc_nmsocket_t *sock) {
ISC_UNREACHABLE();
}
enqueue_stoplistening(sock);
if (!isc__nm_in_netthread()) {
enqueue_stoplistening(sock);
} else if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_udp_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
/*
@ -324,7 +348,12 @@ isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) {
/*
* If network manager is paused, re-enqueue the event for later.
*/
stop_udp_parent(sock);
if (!isc__nm_acquire_interlocked(sock->mgr)) {
enqueue_stoplistening(sock);
} else {
stop_udp_parent(sock);
isc__nm_drop_interlocked(sock->mgr);
}
}
/*
@ -590,7 +619,7 @@ static isc_result_t
udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL;
int uv_bind_flags = UV_UDP_REUSEADDR;
isc_result_t result = ISC_R_DEFAULT;
isc_result_t result = ISC_R_UNSET;
int tries = 3;
int r;
@ -733,7 +762,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
sock->read_timeout = timeout;
sock->extrahandlesize = extrahandlesize;
sock->peer = peer->addr;
sock->result = ISC_R_DEFAULT;
sock->result = ISC_R_UNSET;
atomic_init(&sock->client, true);
req = isc__nm_uvreq_get(mgr, sock);
@ -782,7 +811,7 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
(isc__netievent_t *)event);
}
LOCK(&sock->lock);
while (sock->result == ISC_R_DEFAULT) {
while (sock->result == ISC_R_UNSET) {
WAIT(&sock->cond, &sock->lock);
}
atomic_store(&sock->active, true);
@ -970,8 +999,6 @@ stop_udp_child(isc_nmsocket_t *sock) {
REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->tid == isc_nm_tid());
bool last_child = false;
if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false },
true)) {
return;
@ -979,33 +1006,41 @@ stop_udp_child(isc_nmsocket_t *sock) {
udp_close_direct(sock);
LOCK(&sock->parent->lock);
sock->parent->rchildren -= 1;
last_child = (sock->parent->rchildren == 0);
UNLOCK(&sock->parent->lock);
atomic_fetch_sub(&sock->parent->rchildren, 1);
if (last_child) {
atomic_store(&sock->parent->closed, true);
isc__nmsocket_prep_destroy(sock->parent);
}
isc_barrier_wait(&sock->parent->stoplistening);
}
static void
stop_udp_parent(isc_nmsocket_t *sock) {
isc_nmsocket_t *csock = NULL;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udplistener);
isc_barrier_init(&sock->stoplistening, sock->nchildren);
for (size_t i = 0; i < sock->nchildren; i++) {
isc__netievent_udpstop_t *ievent = NULL;
isc_nmsocket_t *csock = &sock->children[i];
csock = &sock->children[i];
REQUIRE(VALID_NMSOCK(csock));
atomic_store(&csock->active, false);
if ((int)i == isc_nm_tid()) {
/*
* We need to schedule closing the other sockets first
*/
continue;
}
ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock);
isc__nm_enqueue_ievent(&sock->mgr->workers[i],
(isc__netievent_t *)ievent);
atomic_store(&csock->active, false);
enqueue_stoplistening(csock);
}
csock = &sock->children[isc_nm_tid()];
atomic_store(&csock->active, false);
stop_udp_child(csock);
atomic_store(&sock->closed, true);
isc__nmsocket_prep_destroy(sock);
}
static void

View File

@ -1014,7 +1014,7 @@ isc__taskmgr_shutdown(isc_taskmgr_t *manager) {
REQUIRE(VALID_MANAGER(manager));
XTHREADTRACE(e "isc_taskmgr_shutdown");
XTHREADTRACE("isc_taskmgr_shutdown");
/*
* Only one non-worker thread may ever call this routine.
* If a worker thread wants to initiate shutdown of the

View File

@ -44,6 +44,9 @@
<ClInclude Include="..\include\isc\backtrace.h">
<Filter>Library Header Files</Filter>
</ClInclude>
<ClInclude Include="..\include\isc\barrier.h">
<Filter>Library Header Files</Filter>
</ClInclude>
<ClInclude Include="..\include\isc\base32.h">
<Filter>Library Header Files</Filter>
</ClInclude>

View File

@ -269,6 +269,7 @@ copy InstallFiles ..\Build\Release\
<ClInclude Include="..\include\isc\astack.h" />
<ClInclude Include="..\include\isc\atomic.h" />
<ClInclude Include="..\include\isc\backtrace.h" />
<ClInclude Include="..\include\isc\barrier.h" />
<ClInclude Include="..\include\isc\base32.h" />
<ClInclude Include="..\include\isc\base64.h" />
<ClInclude Include="..\include\isc\bind9.h" />

View File

@ -1852,6 +1852,7 @@
./lib/isc/include/isc/atomic.h C 2018,2019,2020,2021
./lib/isc/include/isc/attributes.h C 2020,2021
./lib/isc/include/isc/backtrace.h C 2009,2016,2018,2019,2020,2021
./lib/isc/include/isc/barrier.h C 2021
./lib/isc/include/isc/base32.h C 2008,2014,2016,2018,2019,2020,2021
./lib/isc/include/isc/base64.h C 1999,2000,2001,2004,2005,2006,2007,2016,2018,2019,2020,2021
./lib/isc/include/isc/bind9.h C 2009,2013,2016,2018,2019,2020,2021