2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 06:25:31 +00:00

Refactor the pausing/unpausing and finishing the nm_thread

The isc_nm_pause(), isc_nm_resume() and finishing the nm_thread() from
nm_destroy() has been refactored, so all use the netievents instead of
directly touching the worker structure members.  This allows us to
remove most of the locking as the .paused and .finished members are
always accessed from the matching nm_thread.

When shutting down the nm_thread(), instead of issuing uv_stop(), we
just shutdown the .async handler, so all uv_loop_t events are properly
finished first and uv_run() ends gracefully with no outstanding active
handles in the loop.
This commit is contained in:
Ondřej Surý
2020-09-23 21:49:46 +02:00
parent b8b3ddb02f
commit e5ab137ba3
2 changed files with 142 additions and 132 deletions

View File

@@ -130,8 +130,21 @@ static isc_threadresult_t
nm_thread(isc_threadarg_t worker0);
static void
async_cb(uv_async_t *handle);
static void
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue);
static bool
process_priority_queue(isc__networker_t *worker);
static bool
process_normal_queue(isc__networker_t *worker);
static void
process_queues(isc__networker_t *worker);
static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0);
int
isc_nm_tid(void) {
@@ -155,10 +168,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
isc_mutex_init(&mgr->lock);
isc_condition_init(&mgr->wkstatecond);
isc_refcount_init(&mgr->references, 1);
atomic_init(&mgr->workers_running, 0);
atomic_init(&mgr->workers_paused, 0);
atomic_init(&mgr->maxudp, 0);
atomic_init(&mgr->paused, false);
atomic_init(&mgr->interlocked, false);
#ifdef NETMGR_TRACE
@@ -218,8 +228,7 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) {
* race - we could exit isc_nm_start, launch nm_destroy,
* and nm_thread would still not be up.
*/
atomic_fetch_add_explicit(&mgr->workers_running, 1,
memory_order_relaxed);
mgr->workers_running++;
isc_thread_create(nm_thread, &mgr->workers[i], &worker->thread);
snprintf(name, sizeof(name), "isc-net-%04zu", i);
@@ -246,17 +255,14 @@ nm_destroy(isc_nm_t **mgr0) {
mgr->magic = 0;
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
LOCK(&mgr->workers[i].lock);
mgr->workers[i].finished = true;
UNLOCK(&mgr->workers[i].lock);
event = isc__nm_get_ievent(mgr, netievent_stop);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_stop);
isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
while (atomic_load(&mgr->workers_running) > 0) {
while (mgr->workers_running > 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
@@ -315,28 +321,17 @@ isc_nm_pause(isc_nm_t *mgr) {
REQUIRE(VALID_NM(mgr));
REQUIRE(!isc__nm_in_netthread());
atomic_store(&mgr->paused, true);
isc__nm_acquire_interlocked_force(mgr);
for (size_t i = 0; i < mgr->nworkers; i++) {
isc__netievent_t *event = NULL;
LOCK(&mgr->workers[i].lock);
mgr->workers[i].paused = true;
UNLOCK(&mgr->workers[i].lock);
/*
* We have to issue a stop, otherwise the uv_run loop will
* run indefinitely!
*/
event = isc__nm_get_ievent(mgr, netievent_stop);
isc__nm_enqueue_ievent(&mgr->workers[i], event);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_pause);
isc__nm_enqueue_ievent(worker, event);
}
LOCK(&mgr->lock);
while (atomic_load_relaxed(&mgr->workers_paused) !=
atomic_load_relaxed(&mgr->workers_running))
{
while (mgr->workers_paused != mgr->workers_running) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
@@ -348,17 +343,19 @@ isc_nm_resume(isc_nm_t *mgr) {
REQUIRE(!isc__nm_in_netthread());
for (size_t i = 0; i < mgr->nworkers; i++) {
LOCK(&mgr->workers[i].lock);
mgr->workers[i].paused = false;
SIGNAL(&mgr->workers[i].cond);
UNLOCK(&mgr->workers[i].lock);
isc__networker_t *worker = &mgr->workers[i];
isc__netievent_t *event = isc__nm_get_ievent(mgr,
netievent_resume);
isc__nm_enqueue_ievent(worker, event);
}
isc__nm_drop_interlocked(mgr);
/*
* We're not waiting for all the workers to come back to life;
* they eventually will, we don't care.
*/
LOCK(&mgr->lock);
while (mgr->workers_paused != 0) {
WAIT(&mgr->wkstatecond, &mgr->lock);
}
UNLOCK(&mgr->lock);
isc__nm_drop_interlocked(mgr);
}
void
@@ -402,6 +399,7 @@ void
isc_nm_destroy(isc_nm_t **mgr0) {
isc_nm_t *mgr = NULL;
int counter = 0;
uint_fast32_t references;
REQUIRE(mgr0 != NULL);
REQUIRE(VALID_NM(*mgr0));
@@ -416,28 +414,17 @@ isc_nm_destroy(isc_nm_t **mgr0) {
/*
* Wait for the manager to be dereferenced elsewhere.
*/
while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) {
/*
* Sometimes libuv gets stuck, pausing and unpausing
* netmgr goes over all events in async queue for all
* the workers, and since it's done only on shutdown it
* doesn't cost us anything.
*/
isc_nm_pause(mgr);
isc_nm_resume(mgr);
while ((references = isc_refcount_current(&mgr->references)) > 1 &&
counter++ < 1000)
{
#ifdef WIN32
_sleep(10);
#else /* ifdef WIN32 */
usleep(10000);
#endif /* ifdef WIN32 */
}
#ifdef NETMGR_TRACE
if (!ISC_LIST_EMPTY(mgr->active_sockets)) {
isc__nm_dump_active(mgr);
INSIST(ISC_LIST_EMPTY(mgr->active_sockets));
}
#endif
INSIST(counter <= 1000);
INSIST(references == 1);
/*
* Detach final reference.
@@ -492,93 +479,59 @@ isc_nm_tcp_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
static isc_threadresult_t
nm_thread(isc_threadarg_t worker0) {
isc__networker_t *worker = (isc__networker_t *)worker0;
isc_nm_t *mgr = worker->mgr;
isc__nm_tid_v = worker->id;
isc_thread_setaffinity(isc__nm_tid_v);
while (true) {
int r = uv_run(&worker->loop, UV_RUN_DEFAULT);
bool pausing = false;
/* There's always the async handle until we are done */
INSIST(r > 0 || worker->finished);
/*
* or there's nothing to do. In the first case - wait
* for condition. In the latter - timedwait
*/
LOCK(&worker->lock);
while (worker->paused) {
LOCK(&worker->mgr->lock);
if (!pausing) {
atomic_fetch_add_explicit(
&worker->mgr->workers_paused, 1,
memory_order_acquire);
pausing = true;
}
SIGNAL(&worker->mgr->wkstatecond);
UNLOCK(&worker->mgr->lock);
WAIT(&worker->cond, &worker->lock);
/* Process priority events */
process_queue(worker, worker->ievents_prio);
}
if (pausing) {
uint32_t wp = atomic_fetch_sub_explicit(
&worker->mgr->workers_paused, 1,
memory_order_release);
if (wp == 1) {
atomic_store(&worker->mgr->paused, false);
}
}
bool finished = worker->finished;
UNLOCK(&worker->lock);
if (finished) {
/*
* We need to launch the loop one more time
* in UV_RUN_NOWAIT mode to make sure that
* worker->async is closed, so that we can
* close the loop cleanly. We don't care
* about the callback, as in this case we can
* be certain that uv_run() will eat the event.
*
* XXX: We may need to take steps here to ensure
* that all netmgr handles are freed.
if (worker->paused) {
LOCK(&worker->lock);
/* We need to lock the worker first otherwise
* isc_nm_resume() might slip in before WAIT() in the
* while loop starts and the signal never gets delivered
* and we are forever stuck in the paused loop.
*/
uv_close((uv_handle_t *)&worker->async, NULL);
uv_run(&worker->loop, UV_RUN_NOWAIT);
break;
LOCK(&mgr->lock);
mgr->workers_paused++;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
while (worker->paused) {
WAIT(&worker->cond, &worker->lock);
(void)process_priority_queue(worker);
}
LOCK(&mgr->lock);
mgr->workers_paused--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
UNLOCK(&worker->lock);
}
if (r == 0) {
/*
* XXX: uv_run() in UV_RUN_DEFAULT mode returns
* zero if there are still active uv_handles.
* This shouldn't happen, but if it does, we just
* keep checking until they're done. We nap for a
* tenth of a second on each loop so as not to burn
* CPU. (We do a conditional wait instead, but it
* seems like overkill for this case.)
*/
#ifdef WIN32
_sleep(100);
#else /* ifdef WIN32 */
usleep(100000);
#endif /* ifdef WIN32 */
INSIST(worker->finished);
break;
}
INSIST(!worker->finished);
/*
* Empty the async queue.
*/
process_queue(worker, worker->ievents_prio);
process_queue(worker, worker->ievents);
process_queues(worker);
}
LOCK(&worker->mgr->lock);
atomic_fetch_sub_explicit(&worker->mgr->workers_running, 1,
memory_order_relaxed);
SIGNAL(&worker->mgr->wkstatecond);
UNLOCK(&worker->mgr->lock);
LOCK(&mgr->lock);
mgr->workers_running--;
SIGNAL(&mgr->wkstatecond);
UNLOCK(&mgr->lock);
return ((isc_threadresult_t)0);
}
@@ -592,21 +545,64 @@ nm_thread(isc_threadarg_t worker0) {
static void
async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
process_queue(worker, worker->ievents_prio);
process_queue(worker, worker->ievents);
process_queues(worker);
}
static void
isc__nm_async_stopcb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
worker->finished = true;
/* Close the async handler */
uv_close((uv_handle_t *)&worker->async, NULL);
/* uv_stop(&worker->loop); */
}
static void
isc__nm_async_pausecb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == false);
worker->paused = true;
uv_stop(&worker->loop);
}
static void
isc__nm_async_resumecb(isc__networker_t *worker, isc__netievent_t *ev0) {
UNUSED(ev0);
REQUIRE(worker->paused == true);
worker->paused = false;
}
static bool
process_priority_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents_prio));
}
static bool
process_normal_queue(isc__networker_t *worker) {
return (process_queue(worker, worker->ievents));
}
static void
process_queues(isc__networker_t *worker) {
if (!process_priority_queue(worker)) {
return;
}
(void)process_normal_queue(worker);
}
static bool
process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__netievent_t *ievent = NULL;
bool more = true;
while ((ievent = (isc__netievent_t *)isc_queue_dequeue(queue)) != NULL)
{
switch (ievent->type) {
case netievent_stop:
uv_stop(&worker->loop);
isc_mempool_put(worker->mgr->evpool, ievent);
return;
isc__nm_async_stopcb(worker, ievent);
/* Don't process more ievents when we are stopping */
more = false;
break;
case netievent_udplisten:
isc__nm_async_udplisten(worker, ievent);
@@ -659,13 +655,26 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
case netievent_shutdown:
isc__nm_async_shutdown(worker, ievent);
break;
case netievent_resume:
isc__nm_async_resumecb(worker, ievent);
break;
case netievent_pause:
isc__nm_async_pausecb(worker, ievent);
/* Don't process more ievents when we are pausing */
more = false;
break;
default:
INSIST(0);
ISC_UNREACHABLE();
}
isc__nm_put_ievent(worker->mgr, ievent);
if (!more) {
break;
}
}
return (more);
}
void *