diff --git a/lib/dns/message.c b/lib/dns/message.c index 6565cbf09d..a2f766ed15 100644 --- a/lib/dns/message.c +++ b/lib/dns/message.c @@ -21,9 +21,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -186,6 +188,7 @@ msgblock_allocate(isc_mem_t *, unsigned int, unsigned int); * asynchronously. */ typedef struct checksig_ctx { + isc_loop_t *loop; dns_message_t *msg; dns_view_t *view; dns_message_cb_t cb; @@ -3218,21 +3221,27 @@ dns_message_dumpsig(dns_message_t *msg, char *txt1) { } #endif /* ifdef SKAN_MSG_DEBUG */ +static void +checksig_done(void *arg); + static void checksig_run(void *arg) { checksig_ctx_t *chsigctx = arg; chsigctx->result = dns_message_checksig(chsigctx->msg, chsigctx->view); + + isc_async_run(chsigctx->loop, checksig_done, chsigctx); } static void -checksig_cb(void *arg) { +checksig_done(void *arg) { checksig_ctx_t *chsigctx = arg; dns_message_t *msg = chsigctx->msg; chsigctx->cb(chsigctx->cbarg, chsigctx->result); dns_view_detach(&chsigctx->view); + isc_loop_detach(&chsigctx->loop); isc_mem_put(msg->mctx, chsigctx, sizeof(*chsigctx)); dns_message_detach(&msg); } @@ -3250,12 +3259,13 @@ dns_message_checksig_async(dns_message_t *msg, dns_view_t *view, .cb = cb, .cbarg = cbarg, .result = ISC_R_UNSET, + .loop = isc_loop_ref(loop), }; dns_message_attach(msg, &chsigctx->msg); dns_view_attach(view, &chsigctx->view); dns_message_clonebuffer(msg); - isc_work_enqueue(loop, checksig_run, checksig_cb, chsigctx); + isc_helper_run(loop, checksig_run, chsigctx); return (DNS_R_WAIT); } diff --git a/lib/dns/validator.c b/lib/dns/validator.c index 62f60c7c42..ce5d04205d 100644 --- a/lib/dns/validator.c +++ b/lib/dns/validator.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -127,6 +128,8 @@ static void validate_async_done(dns_validator_t *val, isc_result_t result); static isc_result_t validate_async_run(dns_validator_t *val, isc_job_cb cb); +static isc_result_t +validate_helper_run(dns_validator_t *val, isc_job_cb cb); static void validate_dnskey(void *arg); @@ -355,6 +358,9 @@ trynsec3: return (found); } +static void +resume_answer_with_key_done(void *arg); + static void resume_answer_with_key(void *arg) { dns_validator_t *val = arg; @@ -364,6 +370,8 @@ resume_answer_with_key(void *arg) { if (result == ISC_R_SUCCESS) { val->keyset = &val->frdataset; } + + (void)validate_async_run(val, resume_answer_with_key_done); } static void @@ -422,9 +430,8 @@ fetch_callback_dnskey(void *arg) { if (eresult == ISC_R_SUCCESS && rdataset->trust >= dns_trust_secure) { - isc_work_enqueue(val->loop, resume_answer_with_key, - resume_answer, val); - result = DNS_R_WAIT; + result = validate_helper_run(val, + resume_answer_with_key); } else { result = validate_async_run(val, resume_answer); } @@ -598,9 +605,8 @@ validator_callback_dnskey(void *arg) { * Only extract the dst key if the keyset is secure. */ if (val->frdataset.trust >= dns_trust_secure) { - isc_work_enqueue(val->loop, resume_answer_with_key, - resume_answer_with_key_done, val); - result = DNS_R_WAIT; + result = validate_helper_run(val, + resume_answer_with_key); } else { result = validate_async_run(val, resume_answer); } @@ -1184,9 +1190,8 @@ seek_dnskey(dns_validator_t *val) { dns_rdataset_disassociate(&val->fsigrdataset); } - isc_work_enqueue(val->loop, resume_answer_with_key, - resume_answer_with_key_done, val); - return (DNS_R_WAIT); + return (validate_helper_run(val, + resume_answer_with_key)); } break; @@ -1565,6 +1570,9 @@ cleanup: static void validate_answer_finish(void *arg); +static void +validate_answer_signing_key_done(void *arg); + static void validate_answer_signing_key(void *arg) { dns_validator_t *val = arg; @@ -1599,6 +1607,8 @@ validate_answer_signing_key(void *arg) { } else { INSIST(val->key == NULL); } + + (void)validate_async_run(val, validate_answer_signing_key_done); } static void @@ -1609,8 +1619,7 @@ validate_answer_signing_key_done(void *arg) { val->result = ISC_R_CANCELED; } else if (val->key != NULL) { /* Process with next key if we selected one */ - isc_work_enqueue(val->loop, validate_answer_signing_key, - validate_answer_signing_key_done, val); + (void)validate_helper_run(val, validate_answer_signing_key); return; } @@ -1672,8 +1681,7 @@ validate_answer_process(void *arg) { goto next_key; } - isc_work_enqueue(val->loop, validate_answer_signing_key, - validate_answer_signing_key_done, val); + (void)validate_helper_run(val, validate_answer_signing_key); return; next_key: @@ -1794,6 +1802,12 @@ validate_async_run(dns_validator_t *val, isc_job_cb cb) { return (DNS_R_WAIT); } +static isc_result_t +validate_helper_run(dns_validator_t *val, isc_job_cb cb) { + isc_helper_run(val->loop, cb, val); + return (DNS_R_WAIT); +} + static void validate_async_done(dns_validator_t *val, isc_result_t result) { if (result == DNS_R_NOVALIDSIG && @@ -2028,6 +2042,9 @@ validate_dnskey_dsset(dns_validator_t *val) { return (ISC_R_SUCCESS); } +static void +validate_dnskey_dsset_next_done(void *arg); + static void validate_dnskey_dsset_next(void *arg) { dns_validator_t *val = arg; @@ -2042,6 +2059,8 @@ validate_dnskey_dsset_next(void *arg) { /* continue async run */ val->result = validate_dnskey_dsset(val); } + + validate_async_run(val, validate_dnskey_dsset_next_done); } static void @@ -2064,8 +2083,7 @@ validate_dnskey_dsset_next_done(void *arg) { break; default: /* Continue validation until we have success or no more data */ - isc_work_enqueue(val->loop, validate_dnskey_dsset_next, - validate_dnskey_dsset_next_done, val); + (void)validate_helper_run(val, validate_dnskey_dsset_next); return; } @@ -2087,8 +2105,8 @@ validate_dnskey_dsset_first(dns_validator_t *val) { /* continue async run */ result = validate_dnskey_dsset(val); if (result != ISC_R_SUCCESS) { - isc_work_enqueue(val->loop, validate_dnskey_dsset_next, - validate_dnskey_dsset_next_done, val); + (void)validate_helper_run(val, + validate_dnskey_dsset_next); return; } } @@ -3384,7 +3402,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type, .options = options, .keytable = kt, .link = ISC_LINK_INITIALIZER, - .loop = loop, + .loop = isc_loop_ref(loop), .cb = cb, .arg = arg, .rdata = DNS_RDATA_INIT, @@ -3481,6 +3499,7 @@ destroy_validator(dns_validator_t *val) { isc_counter_detach(&val->qc); } dns_view_detach(&val->view); + isc_loop_detach(&val->loop); isc_mem_put(mctx, val, sizeof(*val)); } diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 8428ee771b..77eda51858 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -31,6 +31,7 @@ libisc_la_HEADERS = \ include/isc/hash.h \ include/isc/hashmap.h \ include/isc/heap.h \ + include/isc/helper.h \ include/isc/hex.h \ include/isc/histo.h \ include/isc/hmac.h \ @@ -135,6 +136,7 @@ libisc_la_SOURCES = \ hash.c \ hashmap.c \ heap.c \ + helper.c \ hex.c \ histo.c \ hmac.c \ diff --git a/lib/isc/helper.c b/lib/isc/helper.c new file mode 100644 index 0000000000..f5a83cc833 --- /dev/null +++ b/lib/isc/helper.c @@ -0,0 +1,68 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "async_p.h" +#include "job_p.h" +#include "loop_p.h" + +void +isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { + REQUIRE(VALID_LOOP(loop)); + REQUIRE(cb != NULL); + + isc_loop_t *helper = &loop->loopmgr->helpers[loop->tid]; + + isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job)); + *job = (isc_job_t){ + .cb = cb, + .cbarg = cbarg, + }; + + cds_wfcq_node_init(&job->wfcq_node); + + /* + * cds_wfcq_enqueue() is non-blocking and enqueues the job to async + * queue. + * + * The function returns 'false' in case the queue was empty - in such + * case we need to trigger the async callback. + */ + if (!cds_wfcq_enqueue(&helper->async_jobs.head, + &helper->async_jobs.tail, &job->wfcq_node)) + { + int r = uv_async_send(&helper->async_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } +} diff --git a/lib/isc/include/isc/helper.h b/lib/isc/include/isc/helper.h new file mode 100644 index 0000000000..fa13f0baf2 --- /dev/null +++ b/lib/isc/include/isc/helper.h @@ -0,0 +1,45 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +/*! \file isc/helper.h */ + +#pragma once + +#include + +#include +#include +#include +#include +#include + +ISC_LANG_BEGINDECLS + +void +isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg); +/*%< + * Schedule the job callback 'cb' to be run on the 'loop' event loop. + * + * Requires: + * + *\li 'loop' is a valid isc event loop + *\li 'cb' is a callback function, must be non-NULL + *\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL + */ + +#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg) +/*%< + * Helper macro to run the job on the current loop + */ + +ISC_LANG_ENDDECLS diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 8f2fcdf8e8..fb18356973 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -186,7 +186,8 @@ shutdown_cb(uv_async_t *handle) { } static void -loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { +loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid, + const char *kind) { *loop = (isc_loop_t){ .tid = tid, .loopmgr = loopmgr, @@ -225,7 +226,7 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { uv_handle_set_data(&loop->quiescent, loop); char name[16]; - snprintf(name, sizeof(name), "loop-%08" PRIx32, tid); + snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid); isc_mem_create(&loop->mctx); isc_mem_setname(loop->mctx, name); @@ -249,6 +250,16 @@ quiescent_cb(uv_prepare_t *handle) { #endif } +static void +helper_close(isc_loop_t *loop) { + int r = uv_loop_close(&loop->loop); + UV_RUNTIME_CHECK(uv_loop_close, r); + + INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail)); + + isc_mem_detach(&loop->mctx); +} + static void loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); @@ -262,9 +273,32 @@ loop_close(isc_loop_t *loop) { isc_mem_detach(&loop->mctx); } +static void * +helper_thread(void *arg) { + isc_loop_t *helper = (isc_loop_t *)arg; + + int r = uv_prepare_start(&helper->quiescent, quiescent_cb); + UV_RUNTIME_CHECK(uv_prepare_start, r); + + isc_barrier_wait(&helper->loopmgr->starting); + + r = uv_run(&helper->loop, UV_RUN_DEFAULT); + UV_RUNTIME_CHECK(uv_run, r); + + /* Invalidate the helper early */ + helper->magic = 0; + + isc_barrier_wait(&helper->loopmgr->stopping); + + return (NULL); +} + static void * loop_thread(void *arg) { isc_loop_t *loop = (isc_loop_t *)arg; + isc_loopmgr_t *loopmgr = loop->loopmgr; + isc_loop_t *helper = &loopmgr->helpers[loop->tid]; + char name[32]; /* Initialize the thread_local variables*/ REQUIRE(isc__loop_local == NULL || isc__loop_local == loop); @@ -272,10 +306,15 @@ loop_thread(void *arg) { isc__tid_init(loop->tid); + /* Start the helper thread */ + isc_thread_create(helper_thread, helper, &helper->thread); + snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid); + isc_thread_setname(helper->thread, name); + int r = uv_prepare_start(&loop->quiescent, quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); - isc_barrier_wait(&loop->loopmgr->starting); + isc_barrier_wait(&loopmgr->starting); enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( &loop->async_jobs.head, &loop->async_jobs.tail, @@ -293,7 +332,11 @@ loop_thread(void *arg) { /* Invalidate the loop early */ loop->magic = 0; - isc_barrier_wait(&loop->loopmgr->stopping); + /* Shutdown the helper thread */ + r = uv_async_send(&helper->shutdown_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + + isc_barrier_wait(&loopmgr->stopping); return (NULL); } @@ -342,16 +385,24 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) { isc_mem_attach(mctx, &loopmgr->mctx); - isc_barrier_init(&loopmgr->pausing, loopmgr->nloops); - isc_barrier_init(&loopmgr->resuming, loopmgr->nloops); - isc_barrier_init(&loopmgr->starting, loopmgr->nloops); - isc_barrier_init(&loopmgr->stopping, loopmgr->nloops); + /* We need to double the number for loops and helpers */ + isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2); + isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2); loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, sizeof(loopmgr->loops[0])); for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; - loop_init(loop, loopmgr, i); + loop_init(loop, loopmgr, i, "loop"); + } + + loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops, + sizeof(loopmgr->helpers[0])); + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *loop = &loopmgr->helpers[i]; + loop_init(loop, loopmgr, i, "helper"); } loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr, @@ -465,6 +516,7 @@ isc_loopmgr_run(isc_loopmgr_t *loopmgr) { void isc_loopmgr_pause(isc_loopmgr_t *loopmgr) { REQUIRE(VALID_LOOPMGR(loopmgr)); + REQUIRE(isc_tid() != ISC_TID_UNKNOWN); if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) { isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER, @@ -472,6 +524,13 @@ isc_loopmgr_pause(isc_loopmgr_t *loopmgr) { "loop exclusive mode: starting"); } + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + + int r = uv_async_send(&helper->pause_trigger); + UV_RUNTIME_CHECK(uv_async_send, r); + } + for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; @@ -526,6 +585,12 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) { RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running, &(bool){ true }, false)); + /* Wait for all helpers to finish */ + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + isc_thread_join(helper->thread, NULL); + } + /* First wait for all loops to finish */ for (size_t i = 1; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; @@ -534,6 +599,13 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) { loopmgr->magic = 0; + for (size_t i = 0; i < loopmgr->nloops; i++) { + isc_loop_t *helper = &loopmgr->helpers[i]; + helper_close(helper); + } + isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops, + sizeof(loopmgr->helpers[0])); + for (size_t i = 0; i < loopmgr->nloops; i++) { isc_loop_t *loop = &loopmgr->loops[i]; loop_close(loop); diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index 185406d0d7..a822004e88 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -110,6 +110,7 @@ struct isc_loopmgr { /* per-thread objects */ isc_loop_t *loops; + isc_loop_t *helpers; }; /*