/* * Copyright (C) Internet Systems Consortium, Inc. ("ISC") * * SPDX-License-Identifier: MPL-2.0 * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, you can obtain one at https://mozilla.org/MPL/2.0/. * * See the COPYRIGHT file distributed with this work for additional * information regarding copyright ownership. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "async_p.h" #include "job_p.h" #include "loop_p.h" #include "thread_p.h" /** * Private */ thread_local isc_loop_t *isc__loop_local = NULL; isc_loopmgr_t *isc__loopmgr = NULL; static void ignore_signal(int sig, void (*handler)(int)) { struct sigaction sa = { .sa_handler = handler }; if (sigfillset(&sa.sa_mask) != 0 || sigaction(sig, &sa, NULL) < 0) { FATAL_SYSERROR(errno, "ignore_signal(%d)", sig); } } void isc_loopmgr_shutdown(void) { isc_loopmgr_t *loopmgr = isc__loopmgr; if (!atomic_compare_exchange_strong(&loopmgr->shuttingdown, &(bool){ false }, true)) { return; } for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; int r; r = uv_async_send(&loop->shutdown_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } } static void isc__loopmgr_signal(void *arg ISC_ATTR_UNUSED, int signum) { switch (signum) { case SIGINT: case SIGTERM: isc_loopmgr_shutdown(); break; default: UNREACHABLE(); } } static void pause_loop(isc_loop_t *loop) { isc_loopmgr_t *loopmgr = isc__loopmgr; rcu_thread_offline(); loop->paused = true; (void)isc_barrier_wait(&loopmgr->pausing); } static void resume_loop(isc_loop_t *loop) { isc_loopmgr_t *loopmgr = isc__loopmgr; (void)isc_barrier_wait(&loopmgr->resuming); loop->paused = false; rcu_thread_online(); } static void pauseresume_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); pause_loop(loop); resume_loop(loop); } #define XX(uc, lc) \ case UV_##uc: \ fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n", \ __func__, (char *)arg, handle->loop, handle, #lc); \ break; static void loop_walk_cb(uv_handle_t *handle, void *arg) { if (uv_is_closing(handle)) { return; } switch (handle->type) { UV_HANDLE_TYPE_MAP(XX) default: fprintf(stderr, "%s, %s: dangling %p: %p.type = %s\n", __func__, (char *)arg, &handle->loop, handle, "unknown"); } } static void shutdown_trigger_close_cb(uv_handle_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); isc_loop_detach(&loop); } static void destroy_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); /* Again, the first close callback here is called last */ uv_close(&loop->async_trigger, isc__async_close); uv_close(&loop->run_trigger, isc__job_close); uv_close(&loop->destroy_trigger, NULL); uv_close(&loop->pause_trigger, NULL); uv_close(&loop->quiescent, NULL); uv_walk(&loop->loop, loop_walk_cb, (char *)"destroy_cb"); } static void shutdown_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); isc_loopmgr_t *loopmgr = isc__loopmgr; /* Make sure, we can't be called again */ uv_close(&loop->shutdown_trigger, shutdown_trigger_close_cb); /* Mark this loop as shutting down */ loop->shuttingdown = true; if (DEFAULT_LOOP(loopmgr) == CURRENT_LOOP(loopmgr)) { /* Stop the signal handlers */ isc_signal_stop(loopmgr->sigterm); isc_signal_stop(loopmgr->sigint); /* Free the signal handlers */ isc_signal_destroy(&loopmgr->sigterm); isc_signal_destroy(&loopmgr->sigint); } enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( &loop->async_jobs.head, &loop->async_jobs.tail, &loop->teardown_jobs.head, &loop->teardown_jobs.tail); INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); int r = uv_async_send(&loop->async_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } static void loop_init(isc_loop_t *loop, isc_tid_t tid, const char *kind) { *loop = (isc_loop_t){ .tid = tid, .run_jobs = ISC_LIST_INITIALIZER, }; __cds_wfcq_init(&loop->async_jobs.head, &loop->async_jobs.tail); __cds_wfcq_init(&loop->setup_jobs.head, &loop->setup_jobs.tail); __cds_wfcq_init(&loop->teardown_jobs.head, &loop->teardown_jobs.tail); int r = uv_loop_init(&loop->loop); UV_RUNTIME_CHECK(uv_loop_init, r); r = uv_async_init(&loop->loop, &loop->pause_trigger, pauseresume_cb); UV_RUNTIME_CHECK(uv_async_init, r); uv_handle_set_data(&loop->pause_trigger, loop); r = uv_async_init(&loop->loop, &loop->shutdown_trigger, shutdown_cb); UV_RUNTIME_CHECK(uv_async_init, r); uv_handle_set_data(&loop->shutdown_trigger, loop); r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb); UV_RUNTIME_CHECK(uv_async_init, r); uv_handle_set_data(&loop->async_trigger, loop); r = uv_idle_init(&loop->loop, &loop->run_trigger); UV_RUNTIME_CHECK(uv_idle_init, r); uv_handle_set_data(&loop->run_trigger, loop); r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb); UV_RUNTIME_CHECK(uv_async_init, r); uv_handle_set_data(&loop->destroy_trigger, loop); r = uv_prepare_init(&loop->loop, &loop->quiescent); UV_RUNTIME_CHECK(uv_prepare_init, r); uv_handle_set_data(&loop->quiescent, loop); isc_mem_create(kind, &loop->mctx); isc_refcount_init(&loop->references, 1); loop->magic = LOOP_MAGIC; } static void quiescent_cb(uv_prepare_t *handle) { UNUSED(handle); #if defined(RCU_QSBR) /* safe memory reclamation */ rcu_quiescent_state(); /* mark the thread offline when polling */ rcu_thread_offline(); #else INSIST(!rcu_read_ongoing()); #endif } static void helper_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); UV_RUNTIME_CHECK(uv_loop_close, r); INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); isc_mem_detach(&loop->mctx); } static void loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); UV_RUNTIME_CHECK(uv_loop_close, r); INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); INSIST(ISC_LIST_EMPTY(loop->run_jobs)); loop->magic = 0; isc_mem_detach(&loop->mctx); } static void * helper_thread(void *arg) { isc_loop_t *helper = (isc_loop_t *)arg; int r = uv_prepare_start(&helper->quiescent, quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); isc_barrier_wait(&isc__loopmgr->starting); r = uv_run(&helper->loop, UV_RUN_DEFAULT); UV_RUNTIME_CHECK(uv_run, r); /* Invalidate the helper early */ helper->magic = 0; isc_barrier_wait(&isc__loopmgr->stopping); return NULL; } static void * loop_thread(void *arg) { isc_loop_t *loop = (isc_loop_t *)arg; isc_loopmgr_t *loopmgr = isc__loopmgr; isc_loop_t *helper = &loopmgr->helpers[loop->tid]; char name[32]; /* Initialize the thread_local variables*/ REQUIRE(isc__loop_local == NULL || isc__loop_local == loop); isc__loop_local = loop; isc__tid_init(loop->tid); /* Start the helper thread */ isc_thread_create(helper_thread, helper, &helper->thread); snprintf(name, sizeof(name), "isc-helper-%04" PRItid, loop->tid); isc_thread_setname(helper->thread, name); int r = uv_prepare_start(&loop->quiescent, quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); isc_barrier_wait(&loopmgr->starting); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( &loop->async_jobs.head, &loop->async_jobs.tail, &loop->setup_jobs.head, &loop->setup_jobs.tail); INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); r = uv_async_send(&loop->async_trigger); UV_RUNTIME_CHECK(uv_async_send, r); r = uv_run(&loop->loop, UV_RUN_DEFAULT); UV_RUNTIME_CHECK(uv_run, r); isc__loop_local = NULL; /* Invalidate the loop early */ loop->magic = 0; /* Shutdown the helper thread */ r = uv_async_send(&helper->shutdown_trigger); UV_RUNTIME_CHECK(uv_async_send, r); isc_barrier_wait(&loopmgr->stopping); return NULL; } /** * Public */ static void threadpool_initialize(uint32_t workers) { char buf[11]; int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf, &(size_t){ sizeof(buf) }); if (r == UV_ENOENT) { snprintf(buf, sizeof(buf), "%" PRIu32, workers); uv_os_setenv("UV_THREADPOOL_SIZE", buf); } } static void loop_destroy(isc_loop_t *loop) { int r = uv_async_send(&loop->destroy_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } #if ISC_LOOP_TRACE ISC_REFCOUNT_TRACE_IMPL(isc_loop, loop_destroy) #else ISC_REFCOUNT_IMPL(isc_loop, loop_destroy); #endif void isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops) { REQUIRE(isc__loopmgr == NULL); REQUIRE(nloops > 0); isc_loopmgr_t *loopmgr = NULL; threadpool_initialize(nloops); isc__tid_initcount(nloops); loopmgr = isc_mem_get(mctx, sizeof(*loopmgr)); *loopmgr = (isc_loopmgr_t){ .nloops = nloops, .magic = LOOPMGR_MAGIC, }; isc_mem_attach(mctx, &loopmgr->mctx); /* We need to double the number for loops and helpers */ isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2); isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2); isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2); isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2); loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, sizeof(loopmgr->loops[0])); for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; loop_init(loop, i, "loop"); } loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, sizeof(loopmgr->helpers[0])); for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->helpers[i]; loop_init(loop, i, "helper"); } isc__loopmgr = loopmgr; loopmgr->sigint = isc_signal_new(isc__loopmgr_signal, loopmgr, SIGINT); loopmgr->sigterm = isc_signal_new(isc__loopmgr_signal, loopmgr, SIGTERM); isc_signal_start(loopmgr->sigint); isc_signal_start(loopmgr->sigterm); } isc_job_t * isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { REQUIRE(VALID_LOOP(loop)); REQUIRE(cb != NULL); isc_loopmgr_t *loopmgr = isc__loopmgr; isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, }; cds_wfcq_node_init(&job->wfcq_node); REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); cds_wfcq_enqueue(&loop->setup_jobs.head, &loop->setup_jobs.tail, &job->wfcq_node); return job; } isc_job_t * isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { REQUIRE(VALID_LOOP(loop)); isc_loopmgr_t *loopmgr = isc__loopmgr; isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); *job = (isc_job_t){ .cb = cb, .cbarg = cbarg, }; cds_wfcq_node_init(&job->wfcq_node); REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); cds_wfcq_enqueue(&loop->teardown_jobs.head, &loop->teardown_jobs.tail, &job->wfcq_node); return job; } void isc_loopmgr_setup(isc_job_cb cb, void *cbarg) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); REQUIRE(!atomic_load(&isc__loopmgr->running) || atomic_load(&isc__loopmgr->paused)); for (size_t i = 0; i < isc__loopmgr->nloops; i++) { isc_loop_t *loop = &isc__loopmgr->loops[i]; (void)isc_loop_setup(loop, cb, cbarg); } } void isc_loopmgr_teardown(isc_job_cb cb, void *cbarg) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); REQUIRE(!atomic_load(&isc__loopmgr->running) || atomic_load(&isc__loopmgr->paused)); for (size_t i = 0; i < isc__loopmgr->nloops; i++) { isc_loop_t *loop = &isc__loopmgr->loops[i]; (void)isc_loop_teardown(loop, cb, cbarg); } } void isc_loopmgr_run(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->running, &(bool){ false }, true)); /* * Always ignore SIGPIPE. */ ignore_signal(SIGPIPE, SIG_IGN); isc__thread_initialize(); /* * The thread 0 is this one. */ for (size_t i = 1; i < isc__loopmgr->nloops; i++) { char name[32]; isc_loop_t *loop = &isc__loopmgr->loops[i]; isc_thread_create(loop_thread, loop, &loop->thread); snprintf(name, sizeof(name), "isc-loop-%04zu", i); isc_thread_setname(loop->thread, name); } isc_thread_main(loop_thread, &isc__loopmgr->loops[0]); } void isc_loopmgr_pause(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); REQUIRE(isc_tid() != ISC_TID_UNKNOWN); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), "loop exclusive mode: starting"); } for (size_t i = 0; i < isc__loopmgr->nloops; i++) { isc_loop_t *helper = &isc__loopmgr->helpers[i]; int r = uv_async_send(&helper->pause_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } for (size_t i = 0; i < isc__loopmgr->nloops; i++) { isc_loop_t *loop = &isc__loopmgr->loops[i]; /* Skip current loop */ if (i == (size_t)isc_tid()) { continue; } int r = uv_async_send(&loop->pause_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->paused, &(bool){ false }, true)); pause_loop(CURRENT_LOOP(isc__loopmgr)); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), "loop exclusive mode: started"); } } void isc_loopmgr_resume(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), "loop exclusive mode: ending"); } RUNTIME_CHECK(atomic_compare_exchange_strong(&isc__loopmgr->paused, &(bool){ true }, false)); resume_loop(CURRENT_LOOP(isc__loopmgr)); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, ISC_LOG_DEBUG(1), "loop exclusive mode: ended"); } } bool isc_loopmgr_paused(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); return atomic_load(&isc__loopmgr->paused); } void isc_loopmgr_destroy(void) { isc_loopmgr_t *loopmgr = isc__loopmgr; RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running, &(bool){ true }, false)); isc__loopmgr = NULL; /* Wait for all helpers to finish */ for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *helper = &loopmgr->helpers[i]; isc_thread_join(helper->thread, NULL); } /* First wait for all loops to finish */ for (size_t i = 1; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; isc_thread_join(loop->thread, NULL); } loopmgr->magic = 0; for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *helper = &loopmgr->helpers[i]; helper_close(helper); } isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops, sizeof(loopmgr->helpers[0])); for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; loop_close(loop); } isc_mem_cput(loopmgr->mctx, loopmgr->loops, loopmgr->nloops, sizeof(loopmgr->loops[0])); isc_barrier_destroy(&loopmgr->starting); isc_barrier_destroy(&loopmgr->stopping); isc_barrier_destroy(&loopmgr->resuming); isc_barrier_destroy(&loopmgr->pausing); isc_mem_putanddetach(&loopmgr->mctx, loopmgr, sizeof(*loopmgr)); isc__thread_shutdown(); } uint32_t isc_loopmgr_nloops(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); return isc__loopmgr->nloops; } isc_mem_t * isc_loop_getmctx(isc_loop_t *loop) { REQUIRE(VALID_LOOP(loop)); return loop->mctx; } isc_loop_t * isc_loop_main(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); return DEFAULT_LOOP(isc__loopmgr); } isc_loop_t * isc_loop_get(isc_tid_t tid) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); REQUIRE((uint32_t)tid < isc__loopmgr->nloops); return LOOP(isc__loopmgr, tid); } void isc_loopmgr_blocking(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); isc_signal_stop(isc__loopmgr->sigterm); isc_signal_stop(isc__loopmgr->sigint); } void isc_loopmgr_nonblocking(void) { REQUIRE(VALID_LOOPMGR(isc__loopmgr)); isc_signal_start(isc__loopmgr->sigint); isc_signal_start(isc__loopmgr->sigterm); } isc_time_t isc_loop_now(isc_loop_t *loop) { REQUIRE(VALID_LOOP(loop)); uint64_t msec = uv_now(&loop->loop); isc_time_t t = { .seconds = msec / MS_PER_SEC, .nanoseconds = (msec % MS_PER_SEC) * NS_PER_MS, }; return t; } bool isc_loop_shuttingdown(isc_loop_t *loop) { REQUIRE(VALID_LOOP(loop)); REQUIRE(loop->tid == isc_tid()); return loop->shuttingdown; } isc_loop_t * isc_loop_helper(isc_loop_t *loop) { REQUIRE(VALID_LOOP(loop)); return &isc__loopmgr->helpers[loop->tid]; }