diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 8428ee771b..77eda51858 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -31,6 +31,7 @@ libisc_la_HEADERS = \ include/isc/hash.h \ include/isc/hashmap.h \ include/isc/heap.h \ + include/isc/helper.h \ include/isc/hex.h \ include/isc/histo.h \ include/isc/hmac.h \ @@ -135,6 +136,7 @@ libisc_la_SOURCES = \ hash.c \ hashmap.c \ heap.c \ + helper.c \ hex.c \ histo.c \ hmac.c \ diff --git a/lib/isc/helper.c b/lib/isc/helper.c new file mode 100644 index 0000000000..f5a83cc833 --- /dev/null +++ b/lib/isc/helper.c @@ -0,0 +1,68 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_p.h" +#include "job_p.h" +#include "loop_p.h" + +void +isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { + REQUIRE(VALID_LOOP(loop)); + REQUIRE(cb != NULL); + + isc_loop_t *helper = &loop->loopmgr->helpers[loop->tid]; + + isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job)); + *job = (isc_job_t){ + .cb = cb, + .cbarg = cbarg, + }; + + cds_wfcq_node_init(&job->wfcq_node); + + /* + * cds_wfcq_enqueue() is non-blocking and enqueues the job to async + * queue. + * + * The function returns 'false' in case the queue was empty - in such + * case we need to trigger the async callback. + */ + if (!cds_wfcq_enqueue(&helper->async_jobs.head, + &helper->async_jobs.tail, &job->wfcq_node)) + { + int r = uv_async_send(&helper->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } +} diff --git a/lib/isc/include/isc/helper.h b/lib/isc/include/isc/helper.h new file mode 100644 index 0000000000..fa13f0baf2 --- /dev/null +++ b/lib/isc/include/isc/helper.h @@ -0,0 +1,45 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +/*! \file isc/helper.h */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +ISC_LANG_BEGINDECLS + +void +isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg); +/*%< + * Schedule the job callback 'cb' to be run on the 'loop' event loop. + * + * Requires: + * + *\li 'loop' is a valid isc event loop + *\li 'cb' is a callback function, must be non-NULL + *\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL + */ + +#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg) +/*%< + * Helper macro to run the job on the current loop + */ + +ISC_LANG_ENDDECLS diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 8f2fcdf8e8..fb18356973 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -186,7 +186,8 @@ shutdown_cb(uv_async_t *handle) { } static void -loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { +loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid, + const char *kind) { *loop = (isc_loop_t){ .tid = tid, .loopmgr = loopmgr, @@ -225,7 +226,7 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { uv_handle_set_data(&loop->quiescent, loop); char name[16]; - snprintf(name, sizeof(name), "loop-%08" PRIx32, tid); + snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid); isc_mem_create(&loop->mctx); isc_mem_setname(loop->mctx, name); @@ -249,6 +250,16 @@ quiescent_cb(uv_prepare_t *handle) { #endif } +static void +helper_close(isc_loop_t *loop) { + int r = uv_loop_close(&loop->loop); + UV_RUNTIME_CHECK(uv_loop_close, r); + + INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); + + isc_mem_detach(&loop->mctx); +} + static void loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); @@ -262,9 +273,32 @@ loop_close(isc_loop_t *loop) { isc_mem_detach(&loop->mctx); } +static void * +helper_thread(void *arg) { + isc_loop_t *helper = (isc_loop_t *)arg; + + int r = uv_prepare_start(&helper->quiescent, quiescent_cb); + UV_RUNTIME_CHECK(uv_prepare_start, r); + + isc_barrier_wait(&helper->loopmgr->starting); + + r = uv_run(&helper->loop, UV_RUN_DEFAULT); + UV_RUNTIME_CHECK(uv_run, r); + + /* Invalidate the helper early */ + helper->magic = 0; + + isc_barrier_wait(&helper->loopmgr->stopping); + + return (NULL); +} + static void * loop_thread(void *arg) { isc_loop_t *loop = (isc_loop_t *)arg; + isc_loopmgr_t *loopmgr = loop->loopmgr; + isc_loop_t *helper = &loopmgr->helpers[loop->tid]; + char name[32]; /* Initialize the thread_local variables*/ REQUIRE(isc__loop_local == NULL || isc__loop_local == loop); @@ -272,10 +306,15 @@ loop_thread(void *arg) { isc__tid_init(loop->tid); + /* Start the helper thread */ + isc_thread_create(helper_thread, helper, &helper->thread); + snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid); + isc_thread_setname(helper->thread, name); + int r = uv_prepare_start(&loop->quiescent, quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); - isc_barrier_wait(&loop->loopmgr->starting); + isc_barrier_wait(&loopmgr->starting); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( &loop->async_jobs.head, &loop->async_jobs.tail, @@ -293,7 +332,11 @@ loop_thread(void *arg) { /* Invalidate the loop early */ loop->magic = 0; - isc_barrier_wait(&loop->loopmgr->stopping); + /* Shutdown the helper thread */ + r = uv_async_send(&helper->shutdown_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + + isc_barrier_wait(&loopmgr->stopping); return (NULL); } @@ -342,16 +385,24 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) { isc_mem_attach(mctx, &loopmgr->mctx); - isc_barrier_init(&loopmgr->pausing, loopmgr->nloops); - isc_barrier_init(&loopmgr->resuming, loopmgr->nloops); - isc_barrier_init(&loopmgr->starting, loopmgr->nloops); - isc_barrier_init(&loopmgr->stopping, loopmgr->nloops); + /* We need to double the number for loops and helpers */ + isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2); loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, sizeof(loopmgr->loops[0])); for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; - loop_init(loop, loopmgr, i); + loop_init(loop, loopmgr, i, "loop"); + } + + loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, + sizeof(loopmgr->helpers[0])); + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *loop = &loopmgr->helpers[i]; + loop_init(loop, loopmgr, i, "helper"); } loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr, @@ -465,6 +516,7 @@ isc_loopmgr_run(isc_loopmgr_t *loopmgr) { void isc_loopmgr_pause(isc_loopmgr_t *loopmgr) { REQUIRE(VALID_LOOPMGR(loopmgr)); + REQUIRE(isc_tid() != ISC_TID_UNKNOWN); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, @@ -472,6 +524,13 @@ isc_loopmgr_pause(isc_loopmgr_t *loopmgr) { "loop exclusive mode: starting"); } + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + + int r = uv_async_send(&helper->pause_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } + for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; @@ -526,6 +585,12 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) { RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running, &(bool){ true }, false)); + /* Wait for all helpers to finish */ + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + isc_thread_join(helper->thread, NULL); + } + /* First wait for all loops to finish */ for (size_t i = 1; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; @@ -534,6 +599,13 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) { loopmgr->magic = 0; + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + helper_close(helper); + } + isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops, + sizeof(loopmgr->helpers[0])); + for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; loop_close(loop); diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index 185406d0d7..a822004e88 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -110,6 +110,7 @@ struct isc_loopmgr { /* per-thread objects */ isc_loop_t *loops; + isc_loop_t *helpers; }; /*