diff --git a/bin/delv/delv.c b/bin/delv/delv.c index 357bdfd1ad..e42af92dd7 100644 --- a/bin/delv/delv.c +++ b/bin/delv/delv.c @@ -25,6 +25,7 @@ #include #include +#include #include #include #include @@ -2168,7 +2169,8 @@ run_server(void *arg) { ns_client_request, ifp, accept_cb, ifp, 10, NULL, NULL, &ifp->tcplistensocket)); ifp->flags |= NS_INTERFACEFLAG_LISTENING; - isc_job_run(loopmgr, sendquery, ifp->tcplistensocket); + isc_async_run(isc_loop_current(loopmgr), sendquery, + ifp->tcplistensocket); return; diff --git a/bin/dig/nslookup.c b/bin/dig/nslookup.c index 2d5f0b890d..1012a99bd0 100644 --- a/bin/dig/nslookup.c +++ b/bin/dig/nslookup.c @@ -16,11 +16,11 @@ #include #include +#include #include #include #include #include -#include #include #include #include @@ -889,12 +889,13 @@ start_next_command(void); static void process_next_command(void *arg __attribute__((__unused__))) { + isc_loop_t *loop = isc_loop_main(loopmgr); if (cmdline == NULL) { in_use = false; } else { do_next_command(cmdline); if (ISC_LIST_HEAD(lookup_list) != NULL) { - isc_job_run(loopmgr, run_loop, NULL); + isc_async_run(loop, run_loop, NULL); return; } } diff --git a/bin/dnssec/dnssec-signzone.c b/bin/dnssec/dnssec-signzone.c index 7c9ab5c870..a8bb578532 100644 --- a/bin/dnssec/dnssec-signzone.c +++ b/bin/dnssec/dnssec-signzone.c @@ -33,6 +33,7 @@ #include #include +#include #include #include #include @@ -41,7 +42,6 @@ #include #include #include -#include #include #include #include @@ -1667,7 +1667,7 @@ assignwork(void *arg) { lock_and_dumpnode(dns_fixedname_name(&fname), node); dns_db_detachnode(gdb, &node); - isc_job_run(loopmgr, assignwork, NULL); + isc_async_run(isc_loop_current(loopmgr), assignwork, NULL); } /*% diff --git a/bin/nsupdate/nsupdate.c b/bin/nsupdate/nsupdate.c index e93b8abc2b..46eb03d423 100644 --- a/bin/nsupdate/nsupdate.c +++ b/bin/nsupdate/nsupdate.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -28,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -2424,7 +2424,7 @@ static void done_update(void) { ddebug("done_update()"); - isc_job_run(loopmgr, getinput, NULL); + isc_async_run(isc_loop_main(loopmgr), getinput, NULL); } static void diff --git a/lib/dns/include/dns/validator.h b/lib/dns/include/dns/validator.h index b1c743e6f5..19eae8bb5a 100644 --- a/lib/dns/include/dns/validator.h +++ b/lib/dns/include/dns/validator.h @@ -74,7 +74,7 @@ struct dns_validator { unsigned int magic; dns_view_t *view; - isc_loopmgr_t *loopmgr; + isc_loop_t *loop; uint32_t tid; isc_refcount_t references; @@ -160,7 +160,7 @@ isc_result_t dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type, dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset, dns_message_t *message, unsigned int options, - isc_loopmgr_t *loop, isc_job_cb cb, void *arg, + isc_loop_t *loop, isc_job_cb cb, void *arg, dns_validator_t **validatorp); /*%< * Start a DNSSEC validation. diff --git a/lib/dns/resolver.c b/lib/dns/resolver.c index 10ba6e807d..e40c2b33c5 100644 --- a/lib/dns/resolver.c +++ b/lib/dns/resolver.c @@ -977,7 +977,7 @@ valcreate(fetchctx_t *fctx, dns_message_t *message, dns_adbaddrinfo_t *addrinfo, result = dns_validator_create( fctx->res->view, name, type, rdataset, sigrdataset, message, - valoptions, fctx->res->loopmgr, validated, valarg, &validator); + valoptions, fctx->loop, validated, valarg, &validator); RUNTIME_CHECK(result == ISC_R_SUCCESS); inc_stats(fctx->res, dns_resstatscounter_val); if ((valoptions & DNS_VALIDATOR_DEFER) == 0) { @@ -10420,7 +10420,7 @@ dns_resolver_createfetch(dns_resolver_t *res, const dns_name_t *name, if (new_fctx) { fetchctx_ref(fctx); - isc_job_run(res->loopmgr, (isc_job_cb)fctx_start, fctx); + isc_async_run(fctx->loop, (isc_job_cb)fctx_start, fctx); } unlock: diff --git a/lib/dns/validator.c b/lib/dns/validator.c index 6e2efe1474..08a5395ef2 100644 --- a/lib/dns/validator.c +++ b/lib/dns/validator.c @@ -227,7 +227,7 @@ validator_done(dns_validator_t *val, isc_result_t result) { val->result = result; dns_validator_ref(val); - isc_job_run(val->loopmgr, validator_done_cb, val); + isc_async_run(val->loop, validator_done_cb, val); } /*% @@ -950,8 +950,8 @@ create_fetch(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type, dns_validator_ref(val); return (dns_resolver_createfetch( val->view->resolver, name, type, NULL, NULL, NULL, NULL, 0, - fopts, 0, NULL, isc_loop_current(val->loopmgr), callback, val, - &val->frdataset, &val->fsigrdataset, &val->fetch)); + fopts, 0, NULL, val->loop, callback, val, &val->frdataset, + &val->fsigrdataset, &val->fetch)); } /*% @@ -981,7 +981,7 @@ create_validator(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type, validator_logcreate(val, name, type, caller, "validator"); result = dns_validator_create(val->view, name, type, rdataset, sig, - NULL, vopts, val->loopmgr, cb, val, + NULL, vopts, val->loop, cb, val, &val->subvalidator); if (result == ISC_R_SUCCESS) { dns_validator_attach(val, &val->subvalidator->parent); @@ -2978,7 +2978,7 @@ isc_result_t dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type, dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset, dns_message_t *message, unsigned int options, - isc_loopmgr_t *loopmgr, isc_job_cb cb, void *arg, + isc_loop_t *loop, isc_job_cb cb, void *arg, dns_validator_t **validatorp) { isc_result_t result = ISC_R_FAILURE; dns_validator_t *val = NULL; @@ -2997,7 +2997,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type, .type = type, .options = options, .link = ISC_LINK_INITIALIZER, - .loopmgr = loopmgr, + .loop = loop, .cb = cb, .arg = arg }; @@ -3024,7 +3024,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type, if ((options & DNS_VALIDATOR_DEFER) == 0) { dns_validator_ref(val); - isc_job_run(val->loopmgr, validator_start, val); + isc_async_run(val->loop, validator_start, val); } *validatorp = val; @@ -3050,7 +3050,7 @@ dns_validator_send(dns_validator_t *validator) { validator->options &= ~DNS_VALIDATOR_DEFER; dns_validator_ref(validator); - isc_job_run(validator->loopmgr, validator_start, validator); + isc_async_run(validator->loop, validator_start, validator); } void diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index e822b73452..9963c8cc60 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -115,6 +115,7 @@ libisc_la_SOURCES = \ ascii.c \ assertions.c \ async.c \ + async_p.h \ backtrace.c \ base32.c \ base64.c \ diff --git a/lib/isc/async.c b/lib/isc/async.c index 8eab9dba72..326d3a0803 100644 --- a/lib/isc/async.c +++ b/lib/isc/async.c @@ -34,28 +34,62 @@ #include #include +#include "async_p.h" #include "job_p.h" #include "loop_p.h" void isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { - int r; - isc_job_t *job = NULL; - REQUIRE(VALID_LOOP(loop)); REQUIRE(cb != NULL); - job = isc__job_new(loop, cb, cbarg); + isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); + *job = (isc_job_t){ + .link = ISC_LINK_INITIALIZER, + .cb = cb, + .cbarg = cbarg, + }; /* * Now send the half-initialized job to the loop queue. - * - * The ISC_ASTACK_PUSH is counterintuitive here, but uv_idle - * drains its queue backwards, so if there's more than one event - * to be processed then they need to be in reverse order. */ - ISC_ASTACK_PUSH(loop->queue_jobs, job, link); + ISC_ASTACK_PUSH(loop->async_jobs, job, link); - r = uv_async_send(&loop->queue_trigger); + int r = uv_async_send(&loop->async_trigger); UV_RUNTIME_CHECK(uv_async_send, r); } + +void +isc__async_cb(uv_async_t *handle) { + isc_loop_t *loop = uv_handle_get_data(handle); + + REQUIRE(VALID_LOOP(loop)); + + ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs); + ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER; + + isc_job_t *job = ISC_STACK_POP(drain, link); + isc_job_t *next = NULL; + while (job != NULL) { + ISC_LIST_PREPEND(jobs, job, link); + + job = ISC_STACK_POP(drain, link); + } + + for (job = ISC_LIST_HEAD(jobs), + next = (job ? ISC_LIST_NEXT(job, link) : NULL); + job != NULL; + job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL)) + { + job->cb(job->cbarg); + + isc_mem_put(loop->mctx, job, sizeof(*job)); + } +} + +void +isc__async_close(uv_handle_t *handle) { + isc_loop_t *loop = uv_handle_get_data(handle); + + isc__async_cb(&loop->async_trigger); +} diff --git a/lib/isc/async_p.h b/lib/isc/async_p.h new file mode 100644 index 0000000000..0d854cd0ae --- /dev/null +++ b/lib/isc/async_p.h @@ -0,0 +1,29 @@ +/* + * Copyright (C) Internet Systems Consortium, Inc. ("ISC") + * + * SPDX-License-Identifier: MPL-2.0 + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, you can obtain one at https://mozilla.org/MPL/2.0/. + * + * See the COPYRIGHT file distributed with this work for additional + * information regarding copyright ownership. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t; + +void +isc__async_cb(uv_async_t *handle); + +void +isc__async_close(uv_handle_t *handle); diff --git a/lib/isc/include/isc/async.h b/lib/isc/include/isc/async.h index a413f2792a..ea36a150dd 100644 --- a/lib/isc/include/isc/async.h +++ b/lib/isc/include/isc/async.h @@ -15,7 +15,7 @@ * \brief The isc_async unit provides a way to schedule jobs on any isc * event loop (isc_loop unit) * - * The unit is built around the uv_async_t primitive and locked list with + * The unit is built around the uv_async_t primitive and lock-free stack with * isc_job_cb. Jobs are first scheduled onto the locked list, then the * uv_async_send() is called and the uv_async_t callback processes the enqueued * jobs are scheduled to be run on the isc event loop. diff --git a/lib/isc/include/isc/job.h b/lib/isc/include/isc/job.h index 19ab75d724..8eeb60d0f4 100644 --- a/lib/isc/include/isc/job.h +++ b/lib/isc/include/isc/job.h @@ -30,23 +30,31 @@ #include typedef void (*isc_job_cb)(void *); +typedef struct isc_job isc_job_t; + +struct isc_job { + isc_job_cb cb; + void *cbarg; + ISC_LINK(isc_job_t) link; +}; + +#define ISC_JOB_INITIALIZER \ + { \ + .link = ISC_LINK_INITIALIZER \ + } ISC_LANG_BEGINDECLS void -isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg); +isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg); /*%< * Schedule the job callback 'cb' to be run on the currently * running event loop. * - * Note: Because of the design of uv_idle_start(), if more than one - * job is posted at once, the jobs will be pushed onto a stack and - * executed in last-in-first-out order. To post events that are - * executed in order posted, use isc_async_run() instead. - * * Requires: * - *\li 'loopmgr' is the active loop manager. + *\li 'loop' is the current loop + *\li 'job' is initialized *\li 'cb' is a callback function, must be non-NULL *\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL */ diff --git a/lib/isc/include/isc/loop.h b/lib/isc/include/isc/loop.h index 2a85702c2a..05440375a0 100644 --- a/lib/isc/include/isc/loop.h +++ b/lib/isc/include/isc/loop.h @@ -15,6 +15,7 @@ #include +#include #include #include #include diff --git a/lib/isc/include/isc/types.h b/lib/isc/include/isc/types.h index e8c08043bf..0c0b609bc1 100644 --- a/lib/isc/include/isc/types.h +++ b/lib/isc/include/isc/types.h @@ -48,19 +48,18 @@ typedef struct isc_httpdurl isc_httpdurl_t; /*%< HTTP URL */ typedef void(isc_httpdondestroy_t)(void *); /*%< Callback on destroying httpd */ typedef struct isc_interface isc_interface_t; /*%< Interface */ typedef struct isc_interfaceiter isc_interfaceiter_t; /*%< Interface Iterator */ -typedef struct isc_job isc_job_t; -typedef struct isc_lex isc_lex_t; /*%< Lex */ -typedef struct isc_log isc_log_t; /*%< Log */ -typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */ -typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */ -typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */ -typedef struct isc_loop isc_loop_t; /*%< Event loop */ -typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */ -typedef struct isc_mem isc_mem_t; /*%< Memory */ -typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */ -typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */ -typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */ -typedef struct isc_nm isc_nm_t; /*%< Network manager */ +typedef struct isc_lex isc_lex_t; /*%< Lex */ +typedef struct isc_log isc_log_t; /*%< Log */ +typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */ +typedef struct isc_logconfig isc_logconfig_t; /*%< Log Configuration */ +typedef struct isc_logmodule isc_logmodule_t; /*%< Log Module */ +typedef struct isc_loop isc_loop_t; /*%< Event loop */ +typedef struct isc_loopmgr isc_loopmgr_t; /*%< Event loop manager */ +typedef struct isc_mem isc_mem_t; /*%< Memory */ +typedef struct isc_mempool isc_mempool_t; /*%< Memory Pool */ +typedef struct isc_netaddr isc_netaddr_t; /*%< Net Address */ +typedef struct isc_netprefix isc_netprefix_t; /*%< Net Prefix */ +typedef struct isc_nm isc_nm_t; /*%< Network manager */ typedef struct isc_nmsocket isc_nmsocket_t; /*%< Network manager socket */ typedef struct isc_nmhandle isc_nmhandle_t; /*%< Network manager handle */ typedef struct isc_portset isc_portset_t; /*%< Port Set */ diff --git a/lib/isc/job.c b/lib/isc/job.c index 6bd588a63f..9f64df73e8 100644 --- a/lib/isc/job.c +++ b/lib/isc/job.c @@ -36,100 +36,51 @@ #include "job_p.h" #include "loop_p.h" -#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ') -#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC) - -/* - * Private: static - */ - -static void -isc__job_close_cb(uv_handle_t *handle) { - isc_job_t *job = uv_handle_get_data(handle); - isc_loop_t *loop = job->loop; - - REQUIRE(loop == isc_loop_current(job->loop->loopmgr)); - - isc_mem_put(loop->mctx, job, sizeof(*job)); - - isc_loop_detach(&loop); -} - -static void -isc__job_destroy(isc_job_t *job) { - REQUIRE(VALID_JOB(job)); - REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr)); - - job->magic = 0; - - uv_close(&job->idle, isc__job_close_cb); -} - -static void -isc__job_cb(uv_idle_t *idle) { - isc_job_t *job = uv_handle_get_data(idle); - int r; - - REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr)); - - job->cb(job->cbarg); - - r = uv_idle_stop(idle); - UV_RUNTIME_CHECK(uv_idle_stop, r); - - isc__job_destroy(job); -} - /* * Public: #include */ void -isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) { - isc_loop_t *loop = isc_loop_current(loopmgr); - isc_job_t *job = isc__job_new(loop, cb, cbarg); - isc__job_init(loop, job); - isc__job_run(job); +isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) { + if (ISC_LIST_EMPTY(loop->run_jobs)) { + uv_idle_start(&loop->run_trigger, isc__job_cb); + } + + job->cb = cb; + job->cbarg = cbarg; + + ISC_LIST_APPEND(loop->run_jobs, job, link); } /* * Protected: #include */ -isc_job_t * -isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { - isc_job_t *job = NULL; +void +isc__job_cb(uv_idle_t *handle) { + isc_loop_t *loop = uv_handle_get_data(handle); + ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER; - REQUIRE(VALID_LOOP(loop)); - REQUIRE(cb != NULL); + ISC_LIST_MOVE(jobs, loop->run_jobs); - job = isc_mem_get(loop->mctx, sizeof(*job)); - *job = (isc_job_t){ - .magic = JOB_MAGIC, - .cb = cb, - .cbarg = cbarg, - .link = ISC_LINK_INITIALIZER, - }; + isc_job_t *job, *next; + for (job = ISC_LIST_HEAD(jobs), + next = (job != NULL) ? ISC_LIST_NEXT(job, link) : NULL; + job != NULL; + job = next, next = job ? ISC_LIST_NEXT(job, link) : NULL) + { + ISC_LIST_UNLINK(jobs, job, link); + job->cb(job->cbarg); + } - isc_loop_attach(loop, &job->loop); - - return (job); + if (ISC_LIST_EMPTY(loop->run_jobs)) { + uv_idle_stop(&loop->run_trigger); + } } void -isc__job_init(isc_loop_t *loop, isc_job_t *job) { - int r = uv_idle_init(&loop->loop, &job->idle); - UV_RUNTIME_CHECK(uv_idle_init, r); - uv_handle_set_data(&job->idle, job); -} - -void -isc__job_run(isc_job_t *job) { - int r; - - REQUIRE(VALID_JOB(job)); - REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr)); - - r = uv_idle_start(&job->idle, isc__job_cb); - UV_RUNTIME_CHECK(uv_idle_start, r); +isc__job_close(uv_handle_t *handle) { + isc_loop_t *loop = uv_handle_get_data(handle); + + isc__job_cb(&loop->run_trigger); } diff --git a/lib/isc/job_p.h b/lib/isc/job_p.h index b021975cb5..89fa078077 100644 --- a/lib/isc/job_p.h +++ b/lib/isc/job_p.h @@ -15,12 +15,12 @@ #include #include +#include -isc_job_t * -isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg); +typedef ISC_LIST(isc_job_t) isc_joblist_t; void -isc__job_init(isc_loop_t *loop, isc_job_t *job); +isc__job_cb(uv_idle_t *handle); void -isc__job_run(isc_job_t *job); +isc__job_close(uv_handle_t *handle); diff --git a/lib/isc/loop.c b/lib/isc/loop.c index 61d28c3ba1..65250c14af 100644 --- a/lib/isc/loop.c +++ b/lib/isc/loop.c @@ -38,6 +38,7 @@ #include #include +#include "async_p.h" #include "job_p.h" #include "loop_p.h" @@ -140,8 +141,10 @@ static void destroy_cb(uv_async_t *handle) { isc_loop_t *loop = uv_handle_get_data(handle); + /* Again, the first close callback here is called last */ + uv_close(&loop->async_trigger, isc__async_close); + uv_close(&loop->run_trigger, isc__job_close); uv_close(&loop->destroy_trigger, NULL); - uv_close(&loop->queue_trigger, NULL); uv_close(&loop->pause_trigger, NULL); uv_close(&loop->wakeup_trigger, NULL); uv_close(&loop->quiescent, NULL); @@ -175,29 +178,14 @@ shutdown_cb(uv_async_t *handle) { isc_job_t *prev = ISC_LIST_PREV(job, link); ISC_LIST_UNLINK(loop->teardown_jobs, job, link); - isc__job_run(job); + job->cb(job->cbarg); + + isc_mem_put(loop->mctx, job, sizeof(*job)); job = prev; } } -static void -queue_cb(uv_async_t *handle) { - isc_loop_t *loop = uv_handle_get_data(handle); - - REQUIRE(VALID_LOOP(loop)); - - ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->queue_jobs); - isc_job_t *job = ISC_STACK_POP(drain, link); - - while (job != NULL) { - isc__job_init(loop, job); - isc__job_run(job); - - job = ISC_STACK_POP(drain, link); - } -} - static void wakeup_cb(uv_async_t *handle) { /* we only woke up to make the loop take a spin */ @@ -209,7 +197,8 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { *loop = (isc_loop_t){ .tid = tid, .loopmgr = loopmgr, - .queue_jobs = ISC_ASTACK_INITIALIZER, + .async_jobs = ISC_ASTACK_INITIALIZER, + .run_jobs = ISC_LIST_INITIALIZER, .setup_jobs = ISC_LIST_INITIALIZER, .teardown_jobs = ISC_LIST_INITIALIZER, }; @@ -225,9 +214,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { UV_RUNTIME_CHECK(uv_async_init, r); uv_handle_set_data(&loop->shutdown_trigger, loop); - r = uv_async_init(&loop->loop, &loop->queue_trigger, queue_cb); + r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb); UV_RUNTIME_CHECK(uv_async_init, r); - uv_handle_set_data(&loop->queue_trigger, loop); + uv_handle_set_data(&loop->async_trigger, loop); + + r = uv_idle_init(&loop->loop, &loop->run_trigger); + UV_RUNTIME_CHECK(uv_idle_init, r); + uv_handle_set_data(&loop->run_trigger, loop); r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb); UV_RUNTIME_CHECK(uv_async_init, r); @@ -251,25 +244,31 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) { } static void -loop_run(isc_loop_t *loop) { - int r; - isc_job_t *job; +setup_jobs_cb(void *arg) { + isc_loop_t *loop = arg; + isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs); - job = ISC_LIST_HEAD(loop->setup_jobs); while (job != NULL) { isc_job_t *next = ISC_LIST_NEXT(job, link); ISC_LIST_UNLINK(loop->setup_jobs, job, link); - isc__job_run(job); + job->cb(job->cbarg); + + isc_mem_put(loop->mctx, job, sizeof(*job)); job = next; } +} - r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb); +static void +loop_run(isc_loop_t *loop) { + int r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb); UV_RUNTIME_CHECK(uv_prepare_start, r); isc_barrier_wait(&loop->loopmgr->starting); + isc_async_run(loop, setup_jobs_cb, loop); + r = uv_run(&loop->loop, UV_RUN_DEFAULT); UV_RUNTIME_CHECK(uv_run, r); @@ -281,7 +280,8 @@ loop_close(isc_loop_t *loop) { int r = uv_loop_close(&loop->loop); UV_RUNTIME_CHECK(uv_loop_close, r); - INSIST(ISC_ASTACK_EMPTY(loop->queue_jobs)); + INSIST(ISC_ASTACK_EMPTY(loop->async_jobs)); + INSIST(ISC_LIST_EMPTY(loop->run_jobs)); loop->magic = 0; @@ -382,53 +382,41 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) { isc_job_t * isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { - isc_loopmgr_t *loopmgr = NULL; - isc_job_t *job = NULL; - REQUIRE(VALID_LOOP(loop)); REQUIRE(cb != NULL); - loopmgr = loop->loopmgr; + isc_loopmgr_t *loopmgr = loop->loopmgr; + isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); + *job = (isc_job_t){ + .cb = cb, + .cbarg = cbarg, + .link = ISC_LINK_INITIALIZER, + }; REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - job = isc__job_new(loop, cb, cbarg); - isc__job_init(loop, job); - - /* - * The ISC_LIST_PREPEND is counterintuitive here, but actually, the - * uv_idle_start() puts the item on the HEAD of the internal list, so we - * want to store items here in reverse order, so on the uv_loop, they - * are scheduled in the correct order - */ - ISC_LIST_PREPEND(loop->setup_jobs, job, link); + ISC_LIST_APPEND(loop->setup_jobs, job, link); return (job); } isc_job_t * isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { - isc_loopmgr_t *loopmgr = NULL; - isc_job_t *job = NULL; - REQUIRE(VALID_LOOP(loop)); - loopmgr = loop->loopmgr; + isc_loopmgr_t *loopmgr = loop->loopmgr; + isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); + *job = (isc_job_t){ + .cb = cb, + .cbarg = cbarg, + .link = ISC_LINK_INITIALIZER, + }; REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) || atomic_load(&loopmgr->paused)); - job = isc__job_new(loop, cb, cbarg); - isc__job_init(loop, job); - - /* - * The ISC_LIST_PREPEND is counterintuitive here, but actually, the - * uv_idle_start() puts the item on the HEAD of the internal list, so we - * want to store items here in reverse order, so on the uv_loop, they - * are scheduled in the correct order - */ - ISC_LIST_PREPEND(loop->teardown_jobs, job, link); + ISC_LIST_APPEND(loop->teardown_jobs, job, link); return (job); } diff --git a/lib/isc/loop_p.h b/lib/isc/loop_p.h index 667f851097..8810a36619 100644 --- a/lib/isc/loop_p.h +++ b/lib/isc/loop_p.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -30,15 +31,15 @@ #include #include +#include "async_p.h" +#include "job_p.h" + /* * Per-thread loop */ #define LOOP_MAGIC ISC_MAGIC('L', 'O', 'O', 'P') #define VALID_LOOP(t) ISC_MAGIC_VALID(t, LOOP_MAGIC) -typedef ISC_LIST(isc_job_t) isc_joblist_t; -typedef ISC_ASTACK(isc_job_t) isc_jobstack_t; - struct isc_loop { int magic; isc_refcount_t references; @@ -56,8 +57,12 @@ struct isc_loop { bool shuttingdown; /* Async queue */ - uv_async_t queue_trigger; - isc_jobstack_t queue_jobs; + uv_async_t async_trigger; + isc_asyncstack_t async_jobs; + + /* Jobs queue */ + uv_idle_t run_trigger; + isc_joblist_t run_jobs; /* Pause */ uv_async_t pause_trigger; @@ -130,15 +135,6 @@ struct isc_signal { #define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ') #define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC) -struct isc_job { - int magic; - uv_idle_t idle; - isc_loop_t *loop; - isc_job_cb cb; - void *cbarg; - ISC_LINK(isc_job_t) link; -}; - /* * Work to be offloaded to an external thread. */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 023fbec40b..1fdcec7528 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -252,7 +252,6 @@ typedef union { isc_nm_recv_cb_t recv; isc_nm_cb_t send; isc_nm_cb_t connect; - isc_nm_accept_cb_t accept; } isc__nm_cb_t; /* @@ -278,7 +277,6 @@ struct isc__nm_uvreq { isc_nm_timer_t *timer; /* TCP write timer */ int connect_tries; /* connect retries */ isc_result_t result; - uv_idle_t idle; union { uv_handle_t handle; @@ -293,6 +291,8 @@ struct isc__nm_uvreq { } uv_req; ISC_LINK(isc__nm_uvreq_t) link; ISC_LINK(isc__nm_uvreq_t) active_link; + + isc_job_t job; }; /* diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index aede4e92a8..9ba0874675 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -120,9 +120,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG); static void nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle); -static void -uvreq_free(uv_handle_t *handle); - /*%< * Issue a 'handle closed' callback on the socket. */ @@ -1021,8 +1018,8 @@ nmhandle_destroy(isc_nmhandle_t *handle) { * call it now asynchronously. */ if (sock->closehandle_cb != NULL) { - isc_job_run(sock->worker->netmgr->loopmgr, - isc__nm_closehandle_job, sock); + isc_async_run(sock->worker->loop, isc__nm_closehandle_job, + sock); } else { isc___nmsocket_detach(&sock FLARG_PASS); } @@ -1600,14 +1597,11 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) { .connect_tries = 3, .link = ISC_LINK_INITIALIZER, .active_link = ISC_LINK_INITIALIZER, + .job = ISC_JOB_INITIALIZER, .magic = UVREQ_MAGIC, }; uv_handle_set_data(&req->uv_req.handle, req); - int r = uv_idle_init(&worker->loop->loop, &req->idle); - UV_RUNTIME_CHECK(uv_idle_init, r); - uv_handle_set_data(&req->idle, req); - isc___nmsocket_attach(sock, &req->sock FLARG_PASS); ISC_LIST_APPEND(sock->active_uvreqs, req, active_link); @@ -1615,16 +1609,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) { return (req); } -static void -uvreq_free(uv_handle_t *handle) { - isc__nm_uvreq_t *req = uv_handle_get_data(handle); - isc_nmsocket_t *sock = req->sock; - - isc_mempool_put(sock->worker->uvreq_pool, req); - - isc___nmsocket_detach(&sock FLARG_PASS); -} - void isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) { REQUIRE(reqp != NULL && VALID_UVREQ(*reqp)); @@ -1644,7 +1628,9 @@ isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) { isc__nmhandle_detach(&handle FLARG_PASS); } - uv_close(&req->idle, uvreq_free); + isc_mempool_put(sock->worker->uvreq_pool, req); + + isc___nmsocket_detach(&sock FLARG_PASS); } void @@ -1838,12 +1824,10 @@ isc__nmsocket_barrier_init(isc_nmsocket_t *listener) { } static void -isc___nm_connectcb(uv_idle_t *handle) { - isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle); +isc___nm_connectcb(void *arg) { + isc__nm_uvreq_t *uvreq = arg; uvreq->cb.connect(uvreq->handle, uvreq->result, uvreq->cbarg); - - uv_idle_stop(handle); isc__nm_uvreq_put(&uvreq); } @@ -1855,28 +1839,25 @@ isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, REQUIRE(VALID_NMHANDLE(uvreq->handle)); REQUIRE(uvreq->cb.connect != NULL); + uvreq->result = eresult; + if (!async) { - uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq); + isc___nm_connectcb(uvreq); return; } - uvreq->result = eresult; - - uv_idle_start(&uvreq->idle, isc___nm_connectcb); + isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_connectcb, uvreq); } static void -isc___nm_readcb(uv_idle_t *handle) { - isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle); +isc___nm_readcb(void *arg) { + isc__nm_uvreq_t *uvreq = arg; isc_region_t region; region.base = (unsigned char *)uvreq->uvbuf.base; region.length = uvreq->uvbuf.len; - uvreq->cb.recv(uvreq->handle, uvreq->result, ®ion, uvreq->cbarg); - uv_idle_stop(handle); isc__nm_uvreq_put(&uvreq); } @@ -1887,29 +1868,21 @@ isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); + uvreq->result = eresult; + if (!async) { - isc_region_t region; - - region.base = (unsigned char *)uvreq->uvbuf.base; - region.length = uvreq->uvbuf.len; - - uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq); + isc___nm_readcb(uvreq); return; } - uvreq->result = eresult; - - uv_idle_start(&uvreq->idle, isc___nm_readcb); + isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_readcb, uvreq); } static void -isc___nm_sendcb(uv_idle_t *handle) { - isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle); +isc___nm_sendcb(void *arg) { + isc__nm_uvreq_t *uvreq = arg; uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg); - - uv_idle_stop(handle); isc__nm_uvreq_put(&uvreq); } @@ -1919,15 +1892,15 @@ isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); + uvreq->result = eresult; if (!async) { - uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq); + isc___nm_sendcb(uvreq); return; } - uv_idle_start(&uvreq->idle, isc___nm_sendcb); + isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_sendcb, uvreq); } static void diff --git a/tests/isc/doh_test.c b/tests/isc/doh_test.c index 9bdf18107e..2c770ef0fc 100644 --- a/tests/isc/doh_test.c +++ b/tests/isc/doh_test.c @@ -702,7 +702,8 @@ doh_receive_send_reply_cb(isc_nmhandle_t *handle, isc_result_t eresult, assert_true(eresult == ISC_R_SUCCESS); } - isc_job_run(loopmgr, doh_connect_thread, connect_nm); + isc_async_run(isc_loop_current(loopmgr), doh_connect_thread, + connect_nm); } if (sends <= 0) { isc_loopmgr_shutdown(loopmgr); diff --git a/tests/isc/job_test.c b/tests/isc/job_test.c index 2a8c049bed..9c3a9fff4a 100644 --- a/tests/isc/job_test.c +++ b/tests/isc/job_test.c @@ -39,37 +39,55 @@ static atomic_uint executed; #define MAX_EXECUTED 1000000 +struct test_arg { + isc_job_t job; + union { + int n; + void *ptr; + } arg; +}; + static void shutdown_cb(void *arg) { - UNUSED(arg); + struct test_arg *ta = arg; + + isc_mem_put(mctx, ta, sizeof(*ta)); isc_loopmgr_shutdown(loopmgr); } static void -job_cb(void *arg __attribute__((__unused__))) { +job_cb(void *arg) { + struct test_arg *ta = arg; unsigned int n = atomic_fetch_add(&executed, 1); if (n <= MAX_EXECUTED) { atomic_fetch_add(&scheduled, 1); - isc_job_run(loopmgr, job_cb, loopmgr); + isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta); } else { - isc_job_run(loopmgr, shutdown_cb, loopmgr); + isc_job_run(isc_loop_current(loopmgr), &ta->job, shutdown_cb, + ta); } } static void -job_run_cb(void *arg __attribute__((__unused__))) { +job_run_cb(void *arg) { + struct test_arg *ta = arg; atomic_fetch_add(&scheduled, 1); - isc_job_run(loopmgr, job_cb, loopmgr); + if (arg == NULL) { + ta = isc_mem_get(mctx, sizeof(*ta)); + *ta = (struct test_arg){ .job = ISC_JOB_INITIALIZER }; + } + + isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta); } ISC_RUN_TEST_IMPL(isc_job_run) { atomic_init(&scheduled, 0); atomic_init(&executed, 0); - isc_loopmgr_setup(loopmgr, job_run_cb, loopmgr); + isc_loopmgr_setup(loopmgr, job_run_cb, NULL); isc_loopmgr_run(loopmgr); @@ -77,12 +95,18 @@ ISC_RUN_TEST_IMPL(isc_job_run) { } static char string[32] = ""; -int n1 = 1, n2 = 2, n3 = 3, n4 = 4, n5 = 5; +struct test_arg n1 = { .job = ISC_JOB_INITIALIZER, .arg.n = 1 }; +struct test_arg n2 = { .job = ISC_JOB_INITIALIZER, .arg.n = 2 }; +struct test_arg n3 = { .job = ISC_JOB_INITIALIZER, .arg.n = 3 }; +struct test_arg n4 = { .job = ISC_JOB_INITIALIZER, .arg.n = 4 }; +struct test_arg n5 = { .job = ISC_JOB_INITIALIZER, .arg.n = 5 }; static void append(void *arg) { + struct test_arg *ta = arg; + char value[32]; - sprintf(value, "%d", *(int *)arg); + sprintf(value, "%d", ta->arg.n); strlcat(string, value, 10); } @@ -90,12 +114,12 @@ static void job_multiple(void *arg) { UNUSED(arg); - /* These will be processed in reverse order */ - isc_job_run(loopmgr, append, &n1); - isc_job_run(loopmgr, append, &n2); - isc_job_run(loopmgr, append, &n3); - isc_job_run(loopmgr, append, &n4); - isc_job_run(loopmgr, append, &n5); + /* These will be processed in normal order */ + isc_job_run(mainloop, &n1.job, append, &n1); + isc_job_run(mainloop, &n2.job, append, &n2); + isc_job_run(mainloop, &n3.job, append, &n3); + isc_job_run(mainloop, &n4.job, append, &n4); + isc_job_run(mainloop, &n5.job, append, &n5); isc_loopmgr_shutdown(loopmgr); } @@ -103,7 +127,7 @@ ISC_RUN_TEST_IMPL(isc_job_multiple) { string[0] = '\0'; isc_loop_setup(isc_loop_main(loopmgr), job_multiple, loopmgr); isc_loopmgr_run(loopmgr); - assert_string_equal(string, "54321"); + assert_string_equal(string, "12345"); } ISC_TEST_LIST_START diff --git a/tests/isc/loop_test.c b/tests/isc/loop_test.c index 27f118eb5c..d863309c51 100644 --- a/tests/isc/loop_test.c +++ b/tests/isc/loop_test.c @@ -66,11 +66,12 @@ ISC_RUN_TEST_IMPL(isc_loopmgr) { static void runjob(void *arg __attribute__((__unused__))) { - if (isc_tid() == 0) { - isc_job_run(loopmgr, shutdown_loopmgr, loopmgr); - } + isc_async_run(isc_loop_current(loopmgr), count, loopmgr); - isc_job_run(loopmgr, count, loopmgr); + if (isc_tid() == 0) { + isc_async_run(isc_loop_current(loopmgr), shutdown_loopmgr, + loopmgr); + } } ISC_RUN_TEST_IMPL(isc_loopmgr_runjob) { diff --git a/tests/isc/netmgr_common.c b/tests/isc/netmgr_common.c index ce8aff3b65..ad44777918 100644 --- a/tests/isc/netmgr_common.c +++ b/tests/isc/netmgr_common.c @@ -368,9 +368,10 @@ connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) { do_cconnects_shutdown(loopmgr); } else if (do_send) { - isc_job_run(loopmgr, stream_recv_send_connect, - (cbarg == NULL ? get_stream_connect_function() - : (stream_connect_function)cbarg)); + isc_async_run(isc_loop_current(loopmgr), + stream_recv_send_connect, + (cbarg == NULL ? get_stream_connect_function() + : (stream_connect_function)cbarg)); } isc_refcount_increment0(&active_creads); diff --git a/tests/isc/udp_test.c b/tests/isc/udp_test.c index c086bbd126..54d4489a8a 100644 --- a/tests/isc/udp_test.c +++ b/tests/isc/udp_test.c @@ -540,7 +540,8 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) { ISC_LOOP_TEST_IMPL(udp_shutdown_connect) { isc_loopmgr_shutdown(loopmgr); isc_refcount_increment0(&active_cconnects); - isc_job_run(loopmgr, udp_connect_udpconnect, netmgr); + isc_async_run(isc_loop_current(loopmgr), udp_connect_udpconnect, + netmgr); } static void @@ -864,7 +865,8 @@ udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { { do_cconnects_shutdown(loopmgr); } else if (do_send) { - isc_job_run(loopmgr, udp__connect, cbarg); + isc_async_run(isc_loop_current(loopmgr), udp__connect, + cbarg); } isc_refcount_increment0(&active_creads);