From b540722bc3e0fb645c8074bd77e9bb67bd88b40a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Fri, 9 Apr 2021 11:31:19 +0200 Subject: [PATCH] Refactor taskmgr to run on top of netmgr This commit changes the taskmgr to run the individual tasks on the netmgr internal workers. While an effort has been put into keeping the taskmgr interface intact, couple of changes have been made: * The taskmgr has no concept of universal privileged mode - rather the tasks are either privileged or unprivileged (normal). The privileged tasks are run as a first thing when the netmgr is unpaused. There are now four different queues in in the netmgr: 1. priority queue - netievent on the priority queue are run even when the taskmgr enter exclusive mode and netmgr is paused. This is needed to properly start listening on the interfaces, free resources and resume. 2. privileged task queue - only privileged tasks are queued here and this is the first queue that gets processed when network manager is unpaused using isc_nm_resume(). All netmgr workers need to clean the privileged task queue before they all proceed normal operation. Both task queues are processed when the workers are finished. 3. task queue - only (traditional) task are scheduled here and this queue along with privileged task queues are process when the netmgr workers are finishing. This is needed to process the task shutdown events. 4. normal queue - this is the queue with netmgr events, e.g. reading, sending, callbacks and pretty much everything is processed here. * The isc_taskmgr_create() now requires initialized netmgr (isc_nm_t) object. * The isc_nm_destroy() function now waits for indefinite time, but it will print out the active objects when in tracing mode (-DNETMGR_TRACE=1 and -DNETMGR_TRACE_VERBOSE=1), the netmgr has been made a little bit more asynchronous and it might take longer time to shutdown all the active networking connections. * Previously, the isc_nm_stoplistening() was a synchronous operation. This has been changed and the isc_nm_stoplistening() just schedules the child sockets to stop listening and exits. This was needed to prevent a deadlock as the the (traditional) tasks are now executed on the netmgr threads. * The socket selection logic in isc__nm_udp_send() was flawed, but fortunatelly, it was broken, so we never hit the problem where we created uvreq_t on a socket from nmhandle_t, but then a different socket could be picked up and then we were trying to run the send callback on a socket that had different threadid than currently running. --- bin/delv/delv.c | 8 +- bin/dig/dighost.c | 2 +- bin/dnssec/dnssec-signzone.c | 6 +- bin/named/main.c | 8 +- bin/named/server.c | 18 +- bin/nsupdate/nsupdate.c | 8 +- bin/rndc/rndc.c | 2 +- bin/tests/system/.gitignore | 1 + bin/tests/system/pipelined/pipequeries.c | 9 +- bin/tests/system/resolve.c | 144 ++-- bin/tests/system/tkey/keycreate.c | 9 +- bin/tests/system/tkey/keydelete.c | 10 +- bin/tools/mdig.c | 9 +- doc/dev/style.md | 2 +- lib/dns/tests/dnstest.c | 7 +- lib/dns/zone.c | 37 +- lib/isc/Makefile.am | 1 - lib/isc/app.c | 5 +- lib/isc/include/isc/netmgr.h | 13 + lib/isc/include/isc/task.h | 59 +- lib/isc/include/isc/taskpool.h | 17 +- lib/isc/netmgr/netmgr-int.h | 62 +- lib/isc/netmgr/netmgr.c | 276 +++++-- lib/isc/netmgr/tcp.c | 37 +- lib/isc/netmgr/tcpdns.c | 37 +- lib/isc/netmgr/tlsdns.c | 38 +- lib/isc/netmgr/udp.c | 104 ++- lib/isc/task.c | 895 +++++------------------ lib/isc/task_p.h | 26 - lib/isc/taskpool.c | 29 +- lib/isc/tests/isctest.c | 8 +- lib/isc/tests/task_test.c | 84 ++- lib/isc/tests/taskpool_test.c | 37 +- lib/isc/tests/timer_test.c | 71 +- lib/isc/timer.c | 12 +- lib/isc/win32/libisc.def.in | 9 +- lib/ns/tests/nstest.c | 7 +- util/copyrights | 1 - 38 files changed, 844 insertions(+), 1264 deletions(-) delete mode 100644 lib/isc/task_p.h diff --git a/bin/delv/delv.c b/bin/delv/delv.c index 79addc0254..fc974853c4 100644 --- a/bin/delv/delv.c +++ b/bin/delv/delv.c @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef WIN32 #include #endif /* ifdef WIN32 */ @@ -1736,6 +1737,7 @@ main(int argc, char *argv[]) { dns_namelist_t namelist; unsigned int resopt; isc_appctx_t *actx = NULL; + isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_socketmgr_t *socketmgr = NULL; isc_timermgr_t *timermgr = NULL; @@ -1759,7 +1761,8 @@ main(int argc, char *argv[]) { isc_mem_create(&mctx); CHECK(isc_appctx_create(mctx, &actx)); - CHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); CHECK(isc_socketmgr_create(mctx, &socketmgr)); CHECK(isc_timermgr_create(mctx, &timermgr)); @@ -1867,6 +1870,9 @@ cleanup: if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } diff --git a/bin/dig/dighost.c b/bin/dig/dighost.c index 4d2aee207e..9473219c2a 100644 --- a/bin/dig/dighost.c +++ b/bin/dig/dighost.c @@ -1362,7 +1362,7 @@ setup_libs(void) { netmgr = isc_nm_start(mctx, 1); - result = isc_taskmgr_create(mctx, 1, 0, netmgr, &taskmgr); + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); check_result(result, "isc_taskmgr_create"); result = isc_task_create(taskmgr, 0, &global_task); diff --git a/bin/dnssec/dnssec-signzone.c b/bin/dnssec/dnssec-signzone.c index d97daa78f7..3a88e861e3 100644 --- a/bin/dnssec/dnssec-signzone.c +++ b/bin/dnssec/dnssec-signzone.c @@ -144,6 +144,7 @@ static unsigned int nsigned = 0, nretained = 0, ndropped = 0; static unsigned int nverified = 0, nverifyfailed = 0; static const char *directory = NULL, *dsdir = NULL; static isc_mutex_t namelock, statslock; +static isc_nm_t *netmgr = NULL; static isc_taskmgr_t *taskmgr = NULL; static dns_db_t *gdb; /* The database */ static dns_dbversion_t *gversion; /* The database version */ @@ -3953,7 +3954,9 @@ main(int argc, char *argv[]) { print_time(outfp); print_version(outfp); - result = isc_taskmgr_create(mctx, ntasks, 0, NULL, &taskmgr); + netmgr = isc_nm_start(mctx, ntasks); + + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); if (result != ISC_R_SUCCESS) { fatal("failed to create task manager: %s", isc_result_totext(result)); @@ -4009,6 +4012,7 @@ main(int argc, char *argv[]) { isc_task_detach(&tasks[i]); } isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_mem_put(mctx, tasks, ntasks * sizeof(isc_task_t *)); postsign(); TIME_NOW(&sign_finish); diff --git a/bin/named/main.c b/bin/named/main.c index c22cefc1b2..a6c6c3bd37 100644 --- a/bin/named/main.c +++ b/bin/named/main.c @@ -960,7 +960,7 @@ create_managers(void) { return (ISC_R_UNEXPECTED); } - result = isc_taskmgr_create(named_g_mctx, named_g_cpus, 0, named_g_nm, + result = isc_taskmgr_create(named_g_mctx, 0, named_g_nm, &named_g_taskmgr); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -1011,13 +1011,9 @@ destroy_managers(void) { * isc_taskmgr_destroy() will block until all tasks have exited. */ isc_taskmgr_destroy(&named_g_taskmgr); + isc_nm_destroy(&named_g_nm); isc_timermgr_destroy(&named_g_timermgr); isc_socketmgr_destroy(&named_g_socketmgr); - - /* - * At this point is safe to destroy the netmgr. - */ - isc_nm_destroy(&named_g_nm); } static void diff --git a/bin/named/server.c b/bin/named/server.c index 8ec8b4343a..6f050af36d 100644 --- a/bin/named/server.c +++ b/bin/named/server.c @@ -9866,7 +9866,7 @@ view_loaded(void *arg) { } static isc_result_t -load_zones(named_server_t *server, bool init, bool reconfig) { +load_zones(named_server_t *server, bool reconfig) { isc_result_t result; dns_view_t *view; ns_zoneload_t *zl; @@ -9921,16 +9921,6 @@ cleanup: if (isc_refcount_decrement(&zl->refs) == 1) { isc_refcount_destroy(&zl->refs); isc_mem_put(server->mctx, zl, sizeof(*zl)); - } else if (init) { - /* - * Place the task manager into privileged mode. This - * ensures that after we leave task-exclusive mode, no - * other tasks will be able to run except for the ones - * that are loading zones. (This should only be done during - * the initial server setup; it isn't necessary during - * a reload.) - */ - isc_taskmgr_setprivilegedmode(named_g_taskmgr); } isc_task_endexclusive(server->task); @@ -9998,7 +9988,7 @@ run_server(isc_task_t *task, isc_event_t *event) { CHECKFATAL(load_configuration(named_g_conffile, server, true), "loading configuration"); - CHECKFATAL(load_zones(server, true, false), "loading zones"); + CHECKFATAL(load_zones(server, false), "loading zones"); #ifdef ENABLE_AFL named_g_run_done = true; #endif /* ifdef ENABLE_AFL */ @@ -10511,7 +10501,7 @@ reload(named_server_t *server) { CHECK(loadconfig(server)); - result = load_zones(server, false, false); + result = load_zones(server, false); if (result == ISC_R_SUCCESS) { isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL, NAMED_LOGMODULE_SERVER, ISC_LOG_INFO, @@ -10880,7 +10870,7 @@ named_server_reconfigcommand(named_server_t *server) { CHECK(loadconfig(server)); - result = load_zones(server, false, true); + result = load_zones(server, true); if (result == ISC_R_SUCCESS) { isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL, NAMED_LOGMODULE_SERVER, ISC_LOG_INFO, diff --git a/bin/nsupdate/nsupdate.c b/bin/nsupdate/nsupdate.c index 73cb363fe3..3b505b9694 100644 --- a/bin/nsupdate/nsupdate.c +++ b/bin/nsupdate/nsupdate.c @@ -125,6 +125,7 @@ static bool usegsstsig = false; static bool use_win2k_gsstsig = false; static bool tried_other_gsstsig = false; static bool local_only = false; +static isc_nm_t *netmgr = NULL; static isc_taskmgr_t *taskmgr = NULL; static isc_task_t *global_task = NULL; static isc_event_t *global_event = NULL; @@ -927,7 +928,9 @@ setup_system(void) { result = isc_timermgr_create(gmctx, &timermgr); check_result(result, "dns_timermgr_create"); - result = isc_taskmgr_create(gmctx, 1, 0, NULL, &taskmgr); + netmgr = isc_nm_start(gmctx, 1); + + result = isc_taskmgr_create(gmctx, 0, netmgr, &taskmgr); check_result(result, "isc_taskmgr_create"); result = isc_task_create(taskmgr, 0, &global_task); @@ -3311,6 +3314,9 @@ cleanup(void) { ddebug("Shutting down task manager"); isc_taskmgr_destroy(&taskmgr); + ddebug("Shutting down network manager"); + isc_nm_destroy(&netmgr); + ddebug("Destroying event"); isc_event_free(&global_event); diff --git a/bin/rndc/rndc.c b/bin/rndc/rndc.c index 47c862e24f..624910a02d 100644 --- a/bin/rndc/rndc.c +++ b/bin/rndc/rndc.c @@ -1032,7 +1032,7 @@ main(int argc, char **argv) { isc_mem_create(&rndc_mctx); netmgr = isc_nm_start(rndc_mctx, 1); DO("create task manager", - isc_taskmgr_create(rndc_mctx, 1, 0, netmgr, &taskmgr)); + isc_taskmgr_create(rndc_mctx, 0, netmgr, &taskmgr)); DO("create task", isc_task_create(taskmgr, 0, &rndc_task)); isc_log_create(rndc_mctx, &log, &logconfig); isc_log_setcontext(log); diff --git a/bin/tests/system/.gitignore b/bin/tests/system/.gitignore index d8ac7e9268..82dbeafec9 100644 --- a/bin/tests/system/.gitignore +++ b/bin/tests/system/.gitignore @@ -13,6 +13,7 @@ named.run parallel.mk /*.log /*.trs +/resolve /run.sh /run.log /start.sh diff --git a/bin/tests/system/pipelined/pipequeries.c b/bin/tests/system/pipelined/pipequeries.c index 38335a148e..02f312ee2f 100644 --- a/bin/tests/system/pipelined/pipequeries.c +++ b/bin/tests/system/pipelined/pipequeries.c @@ -206,7 +206,8 @@ main(int argc, char *argv[]) { isc_result_t result; isc_log_t *lctx; isc_logconfig_t *lcfg; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_task_t *task; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; @@ -276,8 +277,9 @@ main(int argc, char *argv[]) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -322,6 +324,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); dst_lib_destroy(); diff --git a/bin/tests/system/resolve.c b/bin/tests/system/resolve.c index 88f95986c3..b2fc5225a8 100644 --- a/bin/tests/system/resolve.c +++ b/bin/tests/system/resolve.c @@ -53,6 +53,84 @@ #include +/* + * Global contexts + */ + +isc_mem_t *ctxs_mctx = NULL; +isc_appctx_t *ctxs_actx = NULL; +isc_nm_t *ctxs_netmgr = NULL; +isc_taskmgr_t *ctxs_taskmgr = NULL; +isc_socketmgr_t *ctxs_socketmgr = NULL; +isc_timermgr_t *ctxs_timermgr = NULL; + +static void +ctxs_destroy(void) { + if (ctxs_netmgr != NULL) { + isc_nm_closedown(ctxs_netmgr); + } + + if (ctxs_taskmgr != NULL) { + isc_taskmgr_destroy(&ctxs_taskmgr); + } + + if (ctxs_netmgr != NULL) { + isc_nm_destroy(&ctxs_netmgr); + } + + if (ctxs_timermgr != NULL) { + isc_timermgr_destroy(&ctxs_timermgr); + } + + if (ctxs_socketmgr != NULL) { + isc_socketmgr_destroy(&ctxs_socketmgr); + } + + if (ctxs_actx != NULL) { + isc_appctx_destroy(&ctxs_actx); + } + + if (ctxs_mctx != NULL) { + isc_mem_destroy(&ctxs_mctx); + } +} + +static isc_result_t +ctxs_init(void) { + isc_result_t result; + + isc_mem_create(&ctxs_mctx); + + result = isc_appctx_create(ctxs_mctx, &ctxs_actx); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + ctxs_netmgr = isc_nm_start(ctxs_mctx, 1); + + result = isc_taskmgr_create(ctxs_mctx, 0, ctxs_netmgr, &ctxs_taskmgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + result = isc_socketmgr_create(ctxs_mctx, &ctxs_socketmgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + result = isc_timermgr_create(ctxs_mctx, &ctxs_timermgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + return (ISC_R_SUCCESS); + +fail: + ctxs_destroy(); + + return (result); +} + static char *algname; static isc_result_t @@ -93,8 +171,7 @@ usage(void) { } static void -set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep, - isc_mem_t **mctxp) { +set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep) { isc_result_t result; dns_fixedname_t fkeyname; unsigned int namelen; @@ -109,8 +186,6 @@ set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep, isc_region_t r; dns_secalg_t alg; - isc_mem_create(mctxp); - if (algname != NULL) { tr.base = algname; tr.length = strlen(algname); @@ -176,7 +251,6 @@ addserver(dns_client_t *client, const char *addrstr, const char *port, isc_sockaddr_t sa; isc_sockaddrlist_t servers; isc_result_t result; - unsigned int namelen; isc_buffer_t b; dns_fixedname_t fname; dns_name_t *name = NULL; @@ -201,7 +275,7 @@ addserver(dns_client_t *client, const char *addrstr, const char *port, ISC_LIST_APPEND(servers, &sa, link); if (name_space != NULL) { - namelen = strlen(name_space); + unsigned int namelen = strlen(name_space); isc_buffer_constinit(&b, name_space, namelen); isc_buffer_add(&b, namelen); name = dns_fixedname_initname(&fname); @@ -240,15 +314,9 @@ main(int argc, char *argv[]) { dns_rdatatype_t type = dns_rdatatype_a; dns_rdataset_t *rdataset; dns_namelist_t namelist; - isc_mem_t *keymctx = NULL; unsigned int clientopt, resopt = 0; bool is_sep = false; const char *port = "53"; - isc_mem_t *mctx = NULL; - isc_appctx_t *actx = NULL; - isc_taskmgr_t *taskmgr = NULL; - isc_socketmgr_t *socketmgr = NULL; - isc_timermgr_t *timermgr = NULL; struct in_addr in4; struct in6_addr in6; isc_sockaddr_t a4, a6; @@ -361,32 +429,15 @@ main(int argc, char *argv[]) { exit(1); } - isc_mem_create(&mctx); - - result = isc_appctx_create(mctx, &actx); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_app_ctxstart(actx); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_socketmgr_create(mctx, &socketmgr); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_timermgr_create(mctx, &timermgr); + result = ctxs_init(); if (result != ISC_R_SUCCESS) { goto cleanup; } clientopt = 0; - result = dns_client_create(mctx, actx, taskmgr, socketmgr, timermgr, - clientopt, &client, addr4, addr6); + result = dns_client_create(ctxs_mctx, ctxs_actx, ctxs_taskmgr, + ctxs_socketmgr, ctxs_timermgr, clientopt, + &client, addr4, addr6); if (result != ISC_R_SUCCESS) { fprintf(stderr, "dns_client_create failed: %u, %s\n", result, isc_result_totext(result)); @@ -398,7 +449,8 @@ main(int argc, char *argv[]) { irs_resconf_t *resconf = NULL; isc_sockaddrlist_t *nameservers; - result = irs_resconf_load(mctx, "/etc/resolv.conf", &resconf); + result = irs_resconf_load(ctxs_mctx, "/etc/resolv.conf", + &resconf); if (result != ISC_R_SUCCESS && result != ISC_R_FILENOTFOUND) { fprintf(stderr, "irs_resconf_load failed: %u\n", result); @@ -430,7 +482,7 @@ main(int argc, char *argv[]) { "while key name is provided\n"); exit(1); } - set_key(client, keynamestr, keystr, is_sep, &keymctx); + set_key(client, keynamestr, keystr, is_sep); } /* Construct qname */ @@ -470,25 +522,11 @@ main(int argc, char *argv[]) { /* Cleanup */ cleanup: - dns_client_destroy(&client); + if (client != NULL) { + dns_client_destroy(&client); + } - if (taskmgr != NULL) { - isc_taskmgr_destroy(&taskmgr); - } - if (timermgr != NULL) { - isc_timermgr_destroy(&timermgr); - } - if (socketmgr != NULL) { - isc_socketmgr_destroy(&socketmgr); - } - if (actx != NULL) { - isc_appctx_destroy(&actx); - } - isc_mem_detach(&mctx); - - if (keynamestr != NULL) { - isc_mem_destroy(&keymctx); - } + ctxs_destroy(); dns_lib_shutdown(); return (0); diff --git a/bin/tests/system/tkey/keycreate.c b/bin/tests/system/tkey/keycreate.c index e2ecc74761..51ed1212dd 100644 --- a/bin/tests/system/tkey/keycreate.c +++ b/bin/tests/system/tkey/keycreate.c @@ -192,7 +192,8 @@ sendquery(isc_task_t *task, isc_event_t *event) { int main(int argc, char *argv[]) { char *ourkeyname; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; isc_socket_t *sock; @@ -234,8 +235,9 @@ main(int argc, char *argv[]) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -292,6 +294,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_socket_detach(&sock); isc_socketmgr_destroy(&socketmgr); isc_timermgr_destroy(&timermgr); diff --git a/bin/tests/system/tkey/keydelete.c b/bin/tests/system/tkey/keydelete.c index 2fb173bd39..cc92df049c 100644 --- a/bin/tests/system/tkey/keydelete.c +++ b/bin/tests/system/tkey/keydelete.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -135,7 +136,8 @@ sendquery(isc_task_t *task, isc_event_t *event) { int main(int argc, char **argv) { char *keyname; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr; + isc_taskmgr_t *taskmgr = NULL; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; isc_socket_t *sock; @@ -177,8 +179,9 @@ main(int argc, char **argv) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -235,6 +238,7 @@ main(int argc, char **argv) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_socket_detach(&sock); isc_socketmgr_destroy(&socketmgr); isc_timermgr_destroy(&timermgr); diff --git a/bin/tools/mdig.c b/bin/tools/mdig.c index d48b3e995b..b35292f51e 100644 --- a/bin/tools/mdig.c +++ b/bin/tools/mdig.c @@ -2083,7 +2083,8 @@ main(int argc, char *argv[]) { isc_sockaddr_t bind_any; isc_log_t *lctx; isc_logconfig_t *lcfg; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_task_t *task; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; @@ -2144,8 +2145,9 @@ main(int argc, char *argv[]) { fatal("can't choose between IPv4 and IPv6"); } - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -2225,6 +2227,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); dst_lib_destroy(); diff --git a/doc/dev/style.md b/doc/dev/style.md index a4fddefb6e..50b767da27 100644 --- a/doc/dev/style.md +++ b/doc/dev/style.md @@ -136,7 +136,7 @@ format. Private header files, describing interfaces that are for internal use within a library but not for public use, are kept in the source tree at the same level as their related C files, and often have `"_p"` in their names, -e.g. `lib/isc/task_p.h`. +e.g. `lib/isc/mem_p.h`. Header files that define modules should have a structure like the following. Note that `` MUST be included by any public header diff --git a/lib/dns/tests/dnstest.c b/lib/dns/tests/dnstest.c index a913b56a24..9562846dd2 100644 --- a/lib/dns/tests/dnstest.c +++ b/lib/dns/tests/dnstest.c @@ -63,6 +63,7 @@ isc_mem_t *dt_mctx = NULL; isc_log_t *lctx = NULL; +isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_task_t *maintask = NULL; isc_timermgr_t *timermgr = NULL; @@ -100,6 +101,9 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } @@ -113,7 +117,8 @@ create_managers(void) { isc_result_t result; ncpus = isc_os_ncpus(); - CHECK(isc_taskmgr_create(dt_mctx, ncpus, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(dt_mctx, ncpus); + CHECK(isc_taskmgr_create(dt_mctx, 0, netmgr, &taskmgr)); CHECK(isc_timermgr_create(dt_mctx, &timermgr)); CHECK(isc_socketmgr_create(dt_mctx, &socketmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); diff --git a/lib/dns/zone.c b/lib/dns/zone.c index 935b7b53c6..261c6242a7 100644 --- a/lib/dns/zone.c +++ b/lib/dns/zone.c @@ -18385,41 +18385,34 @@ dns_zonemgr_setsize(dns_zonemgr_t *zmgr, int num_zones) { /* Create or resize the zone task pools. */ if (zmgr->zonetasks == NULL) { result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, - 2, &pool); + 2, false, &pool); } else { - result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, &pool); + result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, false, + &pool); } if (result == ISC_R_SUCCESS) { zmgr->zonetasks = pool; } - pool = NULL; - if (zmgr->loadtasks == NULL) { - result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, - 2, &pool); - } else { - result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, &pool); - } - - if (result == ISC_R_SUCCESS) { - zmgr->loadtasks = pool; - } - /* * We always set all tasks in the zone-load task pool to * privileged. This prevents other tasks in the system from * running while the server task manager is in privileged * mode. - * - * NOTE: If we start using task privileges for any other - * part of the system than zone tasks, then this will need to be - * revisted. In that case we'd want to turn on privileges for - * zone tasks only when we were loading, and turn them off the - * rest of the time. For now, however, it's okay to just - * set it and forget it. */ - isc_taskpool_setprivilege(zmgr->loadtasks, true); + pool = NULL; + if (zmgr->loadtasks == NULL) { + result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, + 2, true, &pool); + } else { + result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, true, + &pool); + } + + if (result == ISC_R_SUCCESS) { + zmgr->loadtasks = pool; + } /* Create or resize the zone memory context pool. */ if (zmgr->mctxpool == NULL) { diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 71712754c1..22212d9e38 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -224,7 +224,6 @@ libisc_la_SOURCES = \ fsaccess_common_p.h \ lib_p.h \ mem_p.h \ - task_p.h \ tls_p.h libisc_la_CPPFLAGS = \ diff --git a/lib/isc/app.c b/lib/isc/app.c index 4c30af6d70..04f4712434 100644 --- a/lib/isc/app.c +++ b/lib/isc/app.c @@ -516,11 +516,10 @@ isc_appctx_create(isc_mem_t *mctx, isc_appctx_t **ctxp) { REQUIRE(ctxp != NULL && *ctxp == NULL); ctx = isc_mem_get(mctx, sizeof(*ctx)); + *ctx = (isc_appctx_t){ .magic = 0 }; - ctx->magic = APPCTX_MAGIC; - - ctx->mctx = NULL; isc_mem_attach(mctx, &ctx->mctx); + ctx->magic = APPCTX_MAGIC; *ctxp = ctx; diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index a89436345f..5ecb92d022 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -522,3 +522,16 @@ isc_nm_listenhttp(isc_nm_t *mgr, isc_nmiface_t *iface, int backlog, isc_result_t isc_nm_http_endpoint(isc_nmsocket_t *sock, const char *uri, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize); + +void +isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid); +/*%< + * Enqueue the 'task' onto the netmgr ievents queue. + * + * Requires: + * \li 'mgr' is a valid netmgr object + * \li 'task' is a valid task + * \li 'threadid' is either the preferred netmgr tid or -1, in which case + * tid will be picked randomly. The threadid is capped (by modulo) to + * maximum number of 'workers' as specifed in isc_nm_start() + */ diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index a78cec1a09..29e7c99544 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -102,12 +102,11 @@ typedef enum { isc_result_t isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp); - isc_result_t isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp, int threadid); /*%< - * Create a task. + * Create a task, optionally bound to a particular threadid. * * Notes: * @@ -138,6 +137,23 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, *\li #ISC_R_SHUTTINGDOWN */ +isc_result_t +isc_task_run(isc_task_t *task); +/*%< + * Run all the queued events for the 'task', returning + * when the queue is empty or the number of events executed + * exceeds the 'quantum' specified when the task was created. + * + * Requires: + * + *\li 'task' is a valid task. + * + * Returns: + * + *\li #ISC_R_SUCCESS + *\li #ISC_R_QUOTA + */ + void isc_task_attach(isc_task_t *source, isc_task_t **targetp); /*%< @@ -611,20 +627,13 @@ isc_task_privilege(isc_task_t *task); *****/ isc_result_t -isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, - unsigned int default_quantum, isc_nm_t *nm, +isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, isc_taskmgr_t **managerp); /*%< * Create a new task manager. * * Notes: * - *\li 'workers' in the number of worker threads to create. In general, - * the value should be close to the number of processors in the system. - * The 'workers' value is advisory only. An attempt will be made to - * create 'workers' threads, but if at least one thread creation - * succeeds, isc_taskmgr_create() may return ISC_R_SUCCESS. - * *\li If 'default_quantum' is non-zero, then it will be used as the default * quantum value when tasks are created. If zero, then an implementation * defined default quantum will be used. @@ -636,8 +645,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * *\li 'mctx' is a valid memory context. * - *\li workers > 0 - * *\li managerp != NULL && *managerp == NULL * * Ensures: @@ -651,34 +658,14 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, *\li #ISC_R_NOMEMORY *\li #ISC_R_NOTHREADS No threads could be created. *\li #ISC_R_UNEXPECTED An unexpected error occurred. - *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task + *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task * manager shutting down. */ void -isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager); - -isc_taskmgrmode_t -isc_taskmgr_mode(isc_taskmgr_t *manager); -/*%< - * Set/get the current operating mode of the task manager. Valid modes are: - * - *\li isc_taskmgrmode_normal - *\li isc_taskmgrmode_privileged - * - * In privileged execution mode, only tasks that have had the "privilege" - * flag set via isc_task_setprivilege() can be executed. When all such - * tasks are complete, the manager automatically returns to normal mode - * and proceeds with running non-privileged ready tasks. This means it is - * necessary to have at least one privileged task waiting on the ready - * queue *before* setting the manager into privileged execution mode, - * which in turn means the task which calls this function should be in - * task-exclusive mode when it does so. - * - * Requires: - * - *\li 'manager' is a valid task manager. - */ +isc_taskmgr_attach(isc_taskmgr_t *, isc_taskmgr_t **); +void +isc_taskmgr_detach(isc_taskmgr_t *); void isc_taskmgr_destroy(isc_taskmgr_t **managerp); diff --git a/lib/isc/include/isc/taskpool.h b/lib/isc/include/isc/taskpool.h index 5a468bf8e3..94bd0c7873 100644 --- a/lib/isc/include/isc/taskpool.h +++ b/lib/isc/include/isc/taskpool.h @@ -51,7 +51,7 @@ typedef struct isc_taskpool isc_taskpool_t; isc_result_t isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, - unsigned int quantum, isc_taskpool_t **poolp); + unsigned int quantum, bool priv, isc_taskpool_t **poolp); /*%< * Create a task pool of "ntasks" tasks, each with quantum * "quantum". @@ -90,7 +90,7 @@ isc_taskpool_size(isc_taskpool_t *pool); */ isc_result_t -isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, +isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv, isc_taskpool_t **targetp); /*%< @@ -131,19 +131,6 @@ isc_taskpool_destroy(isc_taskpool_t **poolp); * \li '*poolp' is a valid task pool. */ -void -isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv); -/*%< - * Set the privilege flag on all tasks in 'pool' to 'priv'. If 'priv' is - * true, then when the task manager is set into privileged mode, only - * tasks wihin this pool will be able to execute. (Note: It is important - * to turn the pool tasks' privilege back off before the last task finishes - * executing.) - * - * Requires: - * \li 'pool' is a valid task pool. - */ - ISC_LANG_ENDDECLS #endif /* ISC_TASKPOOL_H */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index ed285f8367..521ed58d54 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -41,6 +41,9 @@ #define ISC_NETMGR_TID_UNKNOWN -1 +/* Must be different from ISC_NETMGR_TID_UNKNOWN */ +#define ISC_NETMGR_NON_INTERLOCKED -2 + #define ISC_NETMGR_TLSBUF_SIZE 65536 #if !defined(WIN32) @@ -174,6 +177,8 @@ typedef struct isc__networker { bool finished; isc_thread_t thread; isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents_priv; /* privileged async tasks */ + isc_queue_t *ievents_task; /* async tasks */ isc_queue_t *ievents_prio; /* priority async events * used for listening etc. * can be processed while @@ -236,27 +241,27 @@ struct isc_nmiface { typedef enum isc__netievent_type { netievent_udpconnect, + netievent_udpclose, netievent_udpsend, netievent_udpread, netievent_udpstop, netievent_udpcancel, - netievent_udpclose, netievent_tcpconnect, + netievent_tcpclose, netievent_tcpsend, netievent_tcpstartread, netievent_tcppauseread, netievent_tcpaccept, netievent_tcpstop, netievent_tcpcancel, - netievent_tcpclose, netievent_tcpdnsaccept, netievent_tcpdnsconnect, + netievent_tcpdnsclose, netievent_tcpdnssend, netievent_tcpdnsread, netievent_tcpdnscancel, - netievent_tcpdnsclose, netievent_tcpdnsstop, netievent_tlsclose, @@ -268,19 +273,18 @@ typedef enum isc__netievent_type { netievent_tlsdnsaccept, netievent_tlsdnsconnect, + netievent_tlsdnsclose, netievent_tlsdnssend, netievent_tlsdnsread, netievent_tlsdnscancel, - netievent_tlsdnsclose, netievent_tlsdnsstop, netievent_tlsdnscycle, netievent_tlsdnsshutdown, + netievent_httpclose, netievent_httpstop, netievent_httpsend, - netievent_httpclose, - netievent_close, netievent_shutdown, netievent_stop, netievent_pause, @@ -289,6 +293,9 @@ typedef enum isc__netievent_type { netievent_readcb, netievent_sendcb, + netievent_task, + netievent_privilegedtask, + netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -300,6 +307,8 @@ typedef enum isc__netievent_type { netievent_tlsdnslisten, netievent_resume, netievent_detach, + netievent_close, + } isc__netievent_type; typedef union { @@ -556,6 +565,36 @@ typedef struct isc__netievent__socket_quota { isc__nm_put_netievent(nm, ievent); \ } +typedef struct isc__netievent__task { + isc__netievent_type type; + isc_task_t *task; +} isc__netievent__task_t; + +#define NETIEVENT_TASK_TYPE(type) \ + typedef isc__netievent__task_t isc__netievent_##type##_t; + +#define NETIEVENT_TASK_DECL(type) \ + isc__netievent_##type##_t *isc__nm_get_netievent_##type( \ + isc_nm_t *nm, isc_task_t *task); \ + void isc__nm_put_netievent_##type(isc_nm_t *nm, \ + isc__netievent_##type##_t *ievent); + +#define NETIEVENT_TASK_DEF(type) \ + isc__netievent_##type##_t *isc__nm_get_netievent_##type( \ + isc_nm_t *nm, isc_task_t *task) { \ + isc__netievent_##type##_t *ievent = \ + isc__nm_get_netievent(nm, netievent_##type); \ + ievent->task = task; \ + \ + return (ievent); \ + } \ + \ + void isc__nm_put_netievent_##type(isc_nm_t *nm, \ + isc__netievent_##type##_t *ievent) { \ + ievent->task = NULL; \ + isc__nm_put_netievent(nm, ievent); \ + } + typedef struct isc__netievent_udpsend { NETIEVENT__SOCKET; isc_sockaddr_t peer; @@ -617,6 +656,7 @@ struct isc_nm { uint32_t nworkers; isc_mutex_t lock; isc_condition_t wkstatecond; + isc_condition_t wkpausecond; isc__networker_t *workers; isc_stats_t *stats; @@ -631,6 +671,8 @@ struct isc_nm { uint_fast32_t workers_paused; atomic_uint_fast32_t maxudp; + atomic_bool paused; + /* * Active connections are being closed and new connections are * no longer allowed. @@ -643,7 +685,7 @@ struct isc_nm { * or pause, or we'll deadlock. We have to either re-enqueue our * event or wait for the other one to finish if we want to pause. */ - atomic_bool interlocked; + atomic_int interlocked; /* * Timeout values for TCP connections, corresponding to @@ -1783,6 +1825,9 @@ NETIEVENT_TYPE(resume); NETIEVENT_TYPE(shutdown); NETIEVENT_TYPE(stop); +NETIEVENT_TASK_TYPE(task); +NETIEVENT_TASK_TYPE(privilegedtask); + /* Now declared the helper functions */ NETIEVENT_SOCKET_DECL(close); @@ -1846,6 +1891,9 @@ NETIEVENT_DECL(resume); NETIEVENT_DECL(shutdown); NETIEVENT_DECL(stop); +NETIEVENT_TASK_DECL(task); +NETIEVENT_TASK_DECL(privilegedtask); + void isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); void diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 34828e0c2b..c2891f1a63 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -145,10 +146,12 @@ static bool process_queue(isc__networker_t *worker, isc_queue_t *queue); static bool process_priority_queue(isc__networker_t *worker); -static bool -process_normal_queue(isc__networker_t *worker); static void -process_queues(isc__networker_t *worker); +process_privilege_queue(isc__networker_t *worker); +static void +process_tasks_queue(isc__networker_t *worker); +static void +process_normal_queue(isc__networker_t *worker); static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); @@ -217,6 +220,8 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_nm_t *mgr = NULL; char name[32]; + REQUIRE(workers > 0); + #ifdef WIN32 isc__nm_winsock_initialize(); #endif /* WIN32 */ @@ -227,9 +232,10 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_mem_attach(mctx, &mgr->mctx); isc_mutex_init(&mgr->lock); isc_condition_init(&mgr->wkstatecond); + isc_condition_init(&mgr->wkpausecond); isc_refcount_init(&mgr->references, 1); atomic_init(&mgr->maxudp, 0); - atomic_init(&mgr->interlocked, false); + atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED); #ifdef NETMGR_TRACE ISC_LIST_INIT(mgr->active_sockets); @@ -280,6 +286,8 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); + worker->ievents_priv = isc_queue_new(mgr->mctx, 128); + worker->ievents_task = isc_queue_new(mgr->mctx, 128); worker->ievents_prio = isc_queue_new(mgr->mctx, 128); worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -339,6 +347,11 @@ nm_destroy(isc_nm_t **mgr0) { isc_mempool_put(mgr->evpool, ievent); } + INSIST(isc_queue_dequeue(worker->ievents_priv) == + (uintptr_t)NULL); + INSIST(isc_queue_dequeue(worker->ievents_task) == + (uintptr_t)NULL); + while ((ievent = (isc__netievent_t *)isc_queue_dequeue( worker->ievents_prio)) != NULL) { @@ -349,6 +362,8 @@ nm_destroy(isc_nm_t **mgr0) { INSIST(r == 0); isc_queue_destroy(worker->ievents); + isc_queue_destroy(worker->ievents_priv); + isc_queue_destroy(worker->ievents_task); isc_queue_destroy(worker->ievents_prio); isc_mutex_destroy(&worker->lock); isc_condition_destroy(&worker->cond); @@ -365,6 +380,7 @@ nm_destroy(isc_nm_t **mgr0) { } isc_condition_destroy(&mgr->wkstatecond); + isc_condition_destroy(&mgr->wkpausecond); isc_mutex_destroy(&mgr->lock); isc_mempool_destroy(&mgr->evpool); @@ -385,42 +401,58 @@ nm_destroy(isc_nm_t **mgr0) { void isc_nm_pause(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); - REQUIRE(!isc__nm_in_netthread()); + uint_fast32_t pausing = 0; + REQUIRE(!atomic_load(&mgr->paused)); isc__nm_acquire_interlocked_force(mgr); for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - isc__netievent_resume_t *event = - isc__nm_get_netievent_pause(mgr); - isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); + if (i != (size_t)isc_nm_tid()) { + isc__netievent_resume_t *event = + isc__nm_get_netievent_pause(mgr); + pausing++; + isc__nm_enqueue_ievent(worker, + (isc__netievent_t *)event); + } else { + isc__nm_async_pause(worker, NULL); + } } LOCK(&mgr->lock); - while (mgr->workers_paused != mgr->workers_running) { + while (mgr->workers_paused != pausing) { WAIT(&mgr->wkstatecond, &mgr->lock); } + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false }, + true)); UNLOCK(&mgr->lock); } void isc_nm_resume(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); - REQUIRE(!isc__nm_in_netthread()); + REQUIRE(atomic_load(&mgr->paused)); for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - isc__netievent_resume_t *event = - isc__nm_get_netievent_resume(mgr); - isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); + if (i != (size_t)isc_nm_tid()) { + isc__netievent_resume_t *event = + isc__nm_get_netievent_resume(mgr); + isc__nm_enqueue_ievent(worker, + (isc__netievent_t *)event); + } else { + isc__nm_async_resume(worker, NULL); + } } LOCK(&mgr->lock); while (mgr->workers_paused != 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true }, + false)); + BROADCAST(&mgr->wkpausecond); UNLOCK(&mgr->lock); - isc__nm_drop_interlocked(mgr); } @@ -465,7 +497,6 @@ void isc_nm_destroy(isc_nm_t **mgr0) { isc_nm_t *mgr = NULL; int counter = 0; - uint_fast32_t references; REQUIRE(mgr0 != NULL); REQUIRE(VALID_NM(*mgr0)); @@ -480,9 +511,7 @@ isc_nm_destroy(isc_nm_t **mgr0) { /* * Wait for the manager to be dereferenced elsewhere. */ - while ((references = isc_refcount_current(&mgr->references)) > 1 && - counter++ < 1000) - { + while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) { #ifdef WIN32 _sleep(10); #else /* ifdef WIN32 */ @@ -491,10 +520,23 @@ isc_nm_destroy(isc_nm_t **mgr0) { } #ifdef NETMGR_TRACE - isc__nm_dump_active(mgr); + if (isc_refcount_current(&mgr->references) > 1) { + isc__nm_dump_active(mgr); + INSIST(0); + ISC_UNREACHABLE(); + } #endif - INSIST(references == 1); + /* + * Now just patiently wait + */ + while (isc_refcount_current(&mgr->references) > 1) { +#ifdef WIN32 + _sleep(10); +#else /* ifdef WIN32 */ + usleep(10000); +#endif /* ifdef WIN32 */ + } /* * Detach final reference. @@ -545,7 +587,30 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, /* * nm_thread is a single worker thread, that runs uv_run event loop * until asked to stop. + * + * There are four queues for asynchronous events: + * + * 1. priority queue - netievents on the priority queue are run even when + * the taskmgr enters exclusive mode and the netmgr is paused. This + * is needed to properly start listening on the interfaces, free + * resources on shutdown, or resume from a pause. + * + * 2. privileged task queue - only privileged tasks are queued here and + * this is the first queue that gets processed when network manager + * is unpaused using isc_nm_resume(). All netmgr workers need to + * clean the privileged task queue before they all proceed to normal + * operation. Both task queues are processed when the workers are + * shutting down. + * + * 3. task queue - only (traditional) tasks are scheduled here, and this + * queue and the privileged task queue are both processed when the + * netmgr workers are finishing. This is needed to process the task + * shutdown events. + * + * 4. normal queue - this is the queue with netmgr events, e.g. reading, + * sending, callbacks, etc. */ + static isc_threadresult_t nm_thread(isc_threadarg_t worker0) { isc__networker_t *worker = (isc__networker_t *)worker0; @@ -555,18 +620,26 @@ nm_thread(isc_threadarg_t worker0) { isc_thread_setaffinity(isc__nm_tid_v); while (true) { + /* + * uv_run() runs async_cb() in a loop, which processes + * all four event queues until a "pause" or "stop" event + * is encountered. On pause, we process only priority and + * privileged events until resuming. + */ int r = uv_run(&worker->loop, UV_RUN_DEFAULT); - /* There's always the async handle until we are done */ INSIST(r > 0 || worker->finished); if (worker->paused) { - LOCK(&worker->lock); - /* We need to lock the worker first otherwise - * isc_nm_resume() might slip in before WAIT() in the - * while loop starts and the signal never gets delivered - * and we are forever stuck in the paused loop. - */ + INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid()); + /* + * We need to lock the worker first; otherwise + * isc_nm_resume() might slip in before WAIT() in + * the while loop starts, then the signal never + * gets delivered and we are stuck forever in the + * paused loop. + */ + LOCK(&worker->lock); LOCK(&mgr->lock); mgr->workers_paused++; SIGNAL(&mgr->wkstatecond); @@ -574,15 +647,28 @@ nm_thread(isc_threadarg_t worker0) { while (worker->paused) { WAIT(&worker->cond, &worker->lock); + UNLOCK(&worker->lock); (void)process_priority_queue(worker); + LOCK(&worker->lock); } LOCK(&mgr->lock); mgr->workers_paused--; SIGNAL(&mgr->wkstatecond); UNLOCK(&mgr->lock); - UNLOCK(&worker->lock); + + /* + * All workers must run the privileged event + * queue before we resume from pause. + */ + process_privilege_queue(worker); + + LOCK(&mgr->lock); + while (atomic_load(&mgr->paused)) { + WAIT(&mgr->wkpausecond, &mgr->lock); + } + UNLOCK(&mgr->lock); } if (r == 0) { @@ -593,11 +679,24 @@ nm_thread(isc_threadarg_t worker0) { INSIST(!worker->finished); /* - * Empty the async queue. + * We've fully resumed from pause. Drain the normal + * asynchronous event queues before resuming the uv_run() + * loop. (This is not strictly necessary, it just ensures + * that all pending events are processed before another + * pause can slip in.) */ - process_queues(worker); + process_tasks_queue(worker); + process_normal_queue(worker); } + /* + * We are shutting down. Process the task queues + * (they may include shutdown events) but do not process + * the netmgr event queue. + */ + process_privilege_queue(worker); + process_tasks_queue(worker); + LOCK(&mgr->lock); mgr->workers_running--; SIGNAL(&mgr->wkstatecond); @@ -607,15 +706,26 @@ nm_thread(isc_threadarg_t worker0) { } /* - * async_cb is a universal callback for 'async' events sent to event loop. + * async_cb() is a universal callback for 'async' events sent to event loop. * It's the only way to safely pass data to the libuv event loop. We use a - * single async event and a lockless queue of 'isc__netievent_t' structures - * passed from other threads. + * single async event and a set of lockless queues of 'isc__netievent_t' + * structures passed from other threads. */ static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; - process_queues(worker); + + /* + * process_priority_queue() returns false when pausing or stopping, + * so we don't want to process the other queues in that case. + */ + if (!process_priority_queue(worker)) { + return; + } + + process_privilege_queue(worker); + process_tasks_queue(worker); + process_normal_queue(worker); } static void @@ -624,13 +734,13 @@ isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0) { worker->finished = true; /* Close the async handler */ uv_close((uv_handle_t *)&worker->async, NULL); - /* uv_stop(&worker->loop); */ } static void isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == false); + worker->paused = true; uv_stop(&worker->loop); } @@ -639,25 +749,78 @@ static void isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == true); + worker->paused = false; } +void +isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) { + isc__netievent_t *event = NULL; + int tid; + isc__networker_t *worker = NULL; + + if (threadid == -1) { + tid = (int)isc_random_uniform(nm->nworkers); + } else { + tid = threadid % nm->nworkers; + } + + worker = &nm->workers[tid]; + + if (isc_task_privilege(task)) { + event = (isc__netievent_t *) + isc__nm_get_netievent_privilegedtask(nm, task); + } else { + event = (isc__netievent_t *)isc__nm_get_netievent_task(nm, + task); + } + + isc__nm_enqueue_ievent(worker, event); +} + +#define isc__nm_async_privilegedtask(worker, ev0) \ + isc__nm_async_task(worker, ev0) + +static void +isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0; + isc_result_t result; + + UNUSED(worker); + + result = isc_task_run(ievent->task); + + switch (result) { + case ISC_R_QUOTA: + isc_nm_task_enqueue(worker->mgr, (isc_task_t *)ievent->task, + isc_nm_tid()); + return; + case ISC_R_SUCCESS: + return; + default: + INSIST(0); + ISC_UNREACHABLE(); + } +} + static bool process_priority_queue(isc__networker_t *worker) { return (process_queue(worker, worker->ievents_prio)); } -static bool -process_normal_queue(isc__networker_t *worker) { - return (process_queue(worker, worker->ievents)); +static void +process_privilege_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents_priv); } static void -process_queues(isc__networker_t *worker) { - if (!process_priority_queue(worker)) { - return; - } - (void)process_normal_queue(worker); +process_tasks_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents_task); +} + +static void +process_normal_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents); } /* @@ -690,6 +853,9 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { /* Don't process more ievents when we are stopping */ NETIEVENT_CASE_NOMORE(stop); + NETIEVENT_CASE(privilegedtask); + NETIEVENT_CASE(task); + NETIEVENT_CASE(udpconnect); NETIEVENT_CASE(udplisten); NETIEVENT_CASE(udpstop); @@ -749,7 +915,6 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { NETIEVENT_CASE(shutdown); NETIEVENT_CASE(resume); NETIEVENT_CASE_NOMORE(pause); - default: INSIST(0); ISC_UNREACHABLE(); @@ -843,6 +1008,9 @@ NETIEVENT_DEF(resume); NETIEVENT_DEF(shutdown); NETIEVENT_DEF(stop); +NETIEVENT_TASK_DEF(task); +NETIEVENT_TASK_DEF(privilegedtask); + void isc__nm_maybe_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { @@ -869,6 +1037,10 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); SIGNAL(&worker->cond); UNLOCK(&worker->lock); + } else if (event->type == netievent_privilegedtask) { + isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event); + } else if (event->type == netievent_task) { + isc_queue_enqueue(worker->ievents_task, (uintptr_t)event); } else { isc_queue_enqueue(worker->ievents, (uintptr_t)event); } @@ -2537,8 +2709,9 @@ isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { bool isc__nm_acquire_interlocked(isc_nm_t *mgr) { LOCK(&mgr->lock); - bool success = atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ false }, true); + bool success = atomic_compare_exchange_strong( + &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, + isc_nm_tid()); UNLOCK(&mgr->lock); return (success); } @@ -2546,9 +2719,9 @@ isc__nm_acquire_interlocked(isc_nm_t *mgr) { void isc__nm_drop_interlocked(isc_nm_t *mgr) { LOCK(&mgr->lock); - bool success = atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ true }, false); - INSIST(success); + int tid = atomic_exchange(&mgr->interlocked, + ISC_NETMGR_NON_INTERLOCKED); + INSIST(tid != ISC_NETMGR_NON_INTERLOCKED); BROADCAST(&mgr->wkstatecond); UNLOCK(&mgr->lock); } @@ -2556,8 +2729,9 @@ isc__nm_drop_interlocked(isc_nm_t *mgr) { void isc__nm_acquire_interlocked_force(isc_nm_t *mgr) { LOCK(&mgr->lock); - while (!atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ false }, true)) + while (!atomic_compare_exchange_strong( + &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, + isc_nm_tid())) { WAIT(&mgr->wkstatecond, &mgr->lock); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index e4d7182999..d86e1866e5 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -433,8 +433,8 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcplisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -651,15 +651,7 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tcp_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tcp_parent(sock); } static void @@ -1200,6 +1192,8 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tcp_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1212,8 +1206,13 @@ stop_tcp_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1228,24 +1227,10 @@ stop_tcp_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tcp_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 7b2efa419e..f8005bb4d9 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -407,8 +407,8 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -626,15 +626,7 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tcpdns_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tcpdns_parent(sock); } void @@ -1230,6 +1222,8 @@ timer_close_cb(uv_handle_t *timer) { static void stop_tcpdns_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tcpdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1242,8 +1236,13 @@ stop_tcpdns_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1258,24 +1257,10 @@ stop_tcpdns_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tcpdns_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index 7b80dcc43a..ff83fa4624 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -475,8 +475,8 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -770,16 +770,7 @@ isc__nm_async_tlsdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for - * later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tlsdns_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tlsdns_parent(sock); } void @@ -1777,6 +1768,8 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tlsdns_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tlsdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1789,8 +1782,13 @@ stop_tlsdns_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1806,24 +1804,10 @@ stop_tlsdns_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tlsdns_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index b7bbe6598e..eb57c954be 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -140,8 +140,8 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_udplisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -324,12 +324,7 @@ isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { /* * If network manager is paused, re-enqueue the event for later. */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_udp_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_udp_parent(sock); } /* @@ -435,20 +430,13 @@ void isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = handle->sock; - isc_nmsocket_t *psock = NULL, *rsock = sock; + isc_nmsocket_t *rsock = NULL; isc_sockaddr_t *peer = &handle->peer; isc__nm_uvreq_t *uvreq = NULL; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); int ntid; - uvreq = isc__nm_uvreq_get(sock->mgr, sock); - uvreq->uvbuf.base = (char *)region->base; - uvreq->uvbuf.len = region->length; - - isc_nmhandle_attach(handle, &uvreq->handle); - - uvreq->cb.send = cb; - uvreq->cbarg = cbarg; + INSIST(sock->type == isc_nm_udpsocket); /* * We're simulating a firewall blocking UDP packets bigger than @@ -459,41 +447,45 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, * we need to do so here. */ if (maxudp != 0 && region->length > maxudp) { - isc__nm_uvreq_put(&uvreq, sock); - isc_nmhandle_detach(&handle); /* FIXME? */ + isc_nmhandle_detach(&handle); return; } - if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) { - INSIST(sock->parent != NULL); - psock = sock->parent; - } else if (sock->type == isc_nm_udplistener) { - psock = sock; - } else if (!atomic_load(&sock->client)) { - INSIST(0); - ISC_UNREACHABLE(); - } - - /* - * If we're in the network thread, we can send directly. If the - * handle is associated with a UDP socket, we can reuse its - * thread (assuming CPU affinity). Otherwise, pick a thread at - * random. - */ - if (isc__nm_in_netthread()) { - ntid = isc_nm_tid(); - } else if (sock->type == isc_nm_udpsocket && - !atomic_load(&sock->client)) { - ntid = sock->tid; + if (atomic_load(&sock->client)) { + /* + * When we are sending from the client socket, we directly use + * the socket provided. + */ + rsock = sock; + goto send; } else { - ntid = (int)isc_random_uniform(sock->nchildren); + /* + * When we are sending from the server socket, we either use the + * socket associated with the network thread we are in, or we + * use the thread from the socket associated with the handle. + */ + INSIST(sock->parent != NULL); + + if (isc__nm_in_netthread()) { + ntid = isc_nm_tid(); + } else { + ntid = sock->tid; + } + rsock = &sock->parent->children[ntid]; } - if (psock != NULL) { - rsock = &psock->children[ntid]; - } +send: + uvreq = isc__nm_uvreq_get(rsock->mgr, rsock); + uvreq->uvbuf.base = (char *)region->base; + uvreq->uvbuf.len = region->length; + + isc_nmhandle_attach(handle, &uvreq->handle); + + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; if (isc_nm_tid() == rsock->tid) { + REQUIRE(rsock->tid == isc_nm_tid()); isc__netievent_udpsend_t ievent = { .sock = rsock, .req = uvreq, .peer = *peer }; @@ -544,6 +536,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); if (status < 0) { result = isc__nm_uverr2result(status); @@ -976,6 +969,8 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_nm_tid()); + bool last_child = false; + if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { return; @@ -985,8 +980,13 @@ stop_udp_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1001,24 +1001,10 @@ stop_udp_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_udp_child(csock); - continue; - } - ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[i], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/task.c b/lib/isc/task.c index b5dccb7b28..b8996a7f0e 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -61,14 +61,11 @@ */ #ifdef ISC_TASK_TRACE -#define XTRACE(m) \ - fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", task, \ - isc_thread_self(), (m)) -#define XTTRACE(t, m) \ - fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", (t), \ - isc_thread_self(), (m)) -#define XTHREADTRACE(m) \ - fprintf(stderr, "thread %" PRIuPTR ": %s\n", isc_thread_self(), (m)) +#define XTRACE(m) \ + fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m)) +#define XTTRACE(t, m) \ + fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m)) +#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m)) #else /* ifdef ISC_TASK_TRACE */ #define XTRACE(m) #define XTTRACE(t, m) @@ -97,13 +94,12 @@ static const char *statenames[] = { #define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') #define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) -typedef struct isc__taskqueue isc__taskqueue_t; - struct isc_task { /* Not locked. */ unsigned int magic; isc_taskmgr_t *manager; isc_mutex_t lock; + int threadid; /* Locked by task lock. */ task_state_t state; int pause_cnt; @@ -116,14 +112,11 @@ struct isc_task { isc_time_t tnow; char name[16]; void *tag; - unsigned int threadid; bool bound; /* Protected by atomics */ atomic_uint_fast32_t flags; /* Locked by task manager lock. */ LINK(isc_task_t) link; - LINK(isc_task_t) ready_link; - LINK(isc_task_t) ready_priority_link; }; #define TASK_F_SHUTTINGDOWN 0x01 @@ -140,39 +133,20 @@ struct isc_task { #define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') #define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) -typedef ISC_LIST(isc_task_t) isc__tasklist_t; - -struct isc__taskqueue { - /* Everything locked by lock */ - isc_mutex_t lock; - isc__tasklist_t ready_tasks; - isc__tasklist_t ready_priority_tasks; - isc_condition_t work_available; - isc_thread_t thread; - unsigned int threadid; - isc_taskmgr_t *manager; -}; - struct isc_taskmgr { /* Not locked. */ unsigned int magic; + isc_refcount_t references; isc_mem_t *mctx; isc_mutex_t lock; - isc_mutex_t halt_lock; - isc_condition_t halt_cond; - unsigned int workers; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; - atomic_uint_fast32_t curq; atomic_uint_fast32_t tasks_count; - isc__taskqueue_t *queues; isc_nm_t *nm; /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc_task_t) tasks; - atomic_uint_fast32_t mode; - atomic_bool pause_req; atomic_bool exclusive_req; atomic_bool exiting; @@ -188,11 +162,6 @@ struct isc_taskmgr { isc_task_t *excl; }; -void -isc__taskmgr_pause(isc_taskmgr_t *manager0); -void -isc__taskmgr_resume(isc_taskmgr_t *manager0); - #define DEFAULT_DEFAULT_QUANTUM 25 #define FINISHED(m) \ (atomic_load_relaxed(&((m)->exiting)) && \ @@ -210,34 +179,15 @@ void isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task); isc_result_t isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp); -static inline bool -empty_readyq(isc_taskmgr_t *manager, int c); - -static inline isc_task_t * -pop_readyq(isc_taskmgr_t *manager, int c); - -static inline void -push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c); - -static inline void -wake_all_queues(isc_taskmgr_t *manager); /*** *** Tasks. ***/ -static inline void -wake_all_queues(isc_taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { - LOCK(&manager->queues[i].lock); - BROADCAST(&manager->queues[i].work_available); - UNLOCK(&manager->queues[i].lock); - } -} - static void task_finished(isc_task_t *task) { isc_taskmgr_t *manager = task->manager; + isc_mem_t *mctx = manager->mctx; REQUIRE(EMPTY(task->events)); REQUIRE(task->nevents == 0); REQUIRE(EMPTY(task->on_shutdown)); @@ -251,39 +201,35 @@ task_finished(isc_task_t *task) { UNLINK(manager->tasks, task, link); atomic_fetch_sub(&manager->tasks_count, 1); UNLOCK(&manager->lock); - if (FINISHED(manager)) { - /* - * All tasks have completed and the - * task manager is exiting. Wake up - * any idle worker threads so they - * can exit. - */ - wake_all_queues(manager); - } + isc_mutex_destroy(&task->lock); task->magic = 0; - isc_mem_put(manager->mctx, task, sizeof(*task)); + isc_mem_put(mctx, task, sizeof(*task)); + + isc_taskmgr_detach(manager); } isc_result_t -isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, +isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp) { - return (isc_task_create_bound(manager0, quantum, taskp, -1)); + return (isc_task_create_bound(manager, quantum, taskp, -1)); } isc_result_t -isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, +isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp, int threadid) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; isc_task_t *task; bool exiting; REQUIRE(VALID_MANAGER(manager)); REQUIRE(taskp != NULL && *taskp == NULL); - task = isc_mem_get(manager->mctx, sizeof(*task)); XTRACE("isc_task_create"); - task->manager = manager; + + task = isc_mem_get(manager->mctx, sizeof(*task)); + *task = (isc_task_t){ 0 }; + + isc_taskmgr_attach(manager, &task->manager); if (threadid == -1) { /* @@ -292,14 +238,14 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, * randomly or specified by isc_task_sendto. */ task->bound = false; - task->threadid = 0; + task->threadid = -1; } else { /* * Task is pinned to a queue, it'll always be run * by a specific thread. */ task->bound = true; - task->threadid = threadid % manager->workers; + task->threadid = threadid; } isc_mutex_init(&task->lock); @@ -317,8 +263,6 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, memset(task->name, 0, sizeof(task->name)); task->tag = NULL; INIT_LINK(task, link); - INIT_LINK(task, ready_link); - INIT_LINK(task, ready_priority_link); exiting = false; LOCK(&manager->lock); @@ -405,18 +349,10 @@ task_shutdown(isc_task_t *task) { static inline void task_ready(isc_task_t *task) { isc_taskmgr_t *manager = task->manager; - bool has_privilege = isc_task_privilege(task); - REQUIRE(VALID_MANAGER(manager)); XTRACE("task_ready"); - LOCK(&manager->queues[task->threadid].lock); - push_readyq(manager, task, task->threadid); - if (atomic_load(&manager->mode) == isc_taskmgrmode_normal || - has_privilege) { - SIGNAL(&manager->queues[task->threadid].work_available); - } - UNLOCK(&manager->queues[task->threadid].lock); + isc_nm_task_enqueue(manager->nm, task, task->threadid); } static inline bool @@ -491,6 +427,12 @@ task_send(isc_task_t *task, isc_event_t **eventp, int c) { XTRACE("task_send"); + if (task->bound) { + c = task->threadid; + } else if (c < 0) { + c = -1; + } + if (task->state == task_state_idle) { was_idle = true; task->threadid = c; @@ -534,14 +476,6 @@ isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c) { * some processing is deferred until after the lock is released. */ LOCK(&task->lock); - /* If task is bound ignore provided cpu. */ - if (task->bound) { - c = task->threadid; - } else if (c < 0) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed); - } - c %= task->manager->workers; was_idle = task_send(task, eventp, c); UNLOCK(&task->lock); @@ -581,13 +515,6 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { XTRACE("isc_task_sendanddetach"); LOCK(&task->lock); - if (task->bound) { - c = task->threadid; - } else if (c < 0) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed); - } - c %= task->manager->workers; idle1 = task_send(task, eventp, c); idle2 = task_detach(task); UNLOCK(&task->lock); @@ -747,8 +674,7 @@ isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first, XTRACE("isc_task_unsendrange"); - return (dequeue_events((isc_task_t *)task, sender, first, last, tag, - events, false)); + return (dequeue_events(task, sender, first, last, tag, events, false)); } unsigned int @@ -760,8 +686,7 @@ isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, void *tag, XTRACE("isc_task_unsend"); - return (dequeue_events((isc_task_t *)task, sender, type, type, tag, - events, false)); + return (dequeue_events(task, sender, type, type, tag, events, false)); } isc_result_t @@ -881,499 +806,184 @@ isc_task_getcurrenttimex(isc_task_t *task, isc_time_t *t) { *** Task Manager. ***/ -/* - * Return true if the current ready list for the manager, which is - * either ready_tasks or the ready_priority_tasks, depending on whether - * the manager is currently in normal or privileged execution mode. - * - * Caller must hold the task manager lock. - */ -static inline bool -empty_readyq(isc_taskmgr_t *manager, int c) { - isc__tasklist_t queue; +static isc_result_t +task_run(isc_task_t *task) { + unsigned int dispatch_count = 0; + bool finished = false; + isc_event_t *event = NULL; + isc_result_t result = ISC_R_SUCCESS; - if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) { - queue = manager->queues[c].ready_tasks; - } else { - queue = manager->queues[c].ready_priority_tasks; - } - return (EMPTY(queue)); -} - -/* - * Dequeue and return a pointer to the first task on the current ready - * list for the manager. - * If the task is privileged, dequeue it from the other ready list - * as well. - * - * Caller must hold the task manager lock. - */ -static inline isc_task_t * -pop_readyq(isc_taskmgr_t *manager, int c) { - isc_task_t *task; - - if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) { - task = HEAD(manager->queues[c].ready_tasks); - } else { - task = HEAD(manager->queues[c].ready_priority_tasks); - } - - if (task != NULL) { - DEQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if (ISC_LINK_LINKED(task, ready_priority_link)) { - DEQUEUE(manager->queues[c].ready_priority_tasks, task, - ready_priority_link); - } - } - - return (task); -} - -/* - * Push 'task' onto the ready_tasks queue. If 'task' has the privilege - * flag set, then also push it onto the ready_priority_tasks queue. - * - * Caller must hold the task queue lock. - */ -static inline void -push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c) { - if (ISC_LINK_LINKED(task, ready_link)) { - return; - } - ENQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if (TASK_PRIVILEGED(task)) { - ENQUEUE(manager->queues[c].ready_priority_tasks, task, - ready_priority_link); - } - atomic_fetch_add_explicit(&manager->tasks_ready, 1, - memory_order_acquire); -} - -static void -dispatch(isc_taskmgr_t *manager, unsigned int threadid) { - isc_task_t *task; - - REQUIRE(VALID_MANAGER(manager)); - - /* Wait for everything to initialize */ - LOCK(&manager->lock); - UNLOCK(&manager->lock); + REQUIRE(VALID_TASK(task)); + LOCK(&task->lock); /* - * Again we're trying to hold the lock for as short a time as possible - * and to do as little locking and unlocking as possible. - * - * In both while loops, the appropriate lock must be held before the - * while body starts. Code which acquired the lock at the top of - * the loop would be more readable, but would result in a lot of - * extra locking. Compare: - * - * Straightforward: - * - * LOCK(); - * ... - * UNLOCK(); - * while (expression) { - * LOCK(); - * ... - * UNLOCK(); - * - * Unlocked part here... - * - * LOCK(); - * ... - * UNLOCK(); - * } - * - * Note how if the loop continues we unlock and then immediately lock. - * For N iterations of the loop, this code does 2N+1 locks and 2N+1 - * unlocks. Also note that the lock is not held when the while - * condition is tested, which may or may not be important, depending - * on the expression. - * - * As written: - * - * LOCK(); - * while (expression) { - * ... - * UNLOCK(); - * - * Unlocked part here... - * - * LOCK(); - * ... - * } - * UNLOCK(); - * - * For N iterations of the loop, this code does N+1 locks and N+1 - * unlocks. The while expression is always protected by the lock. + * It is possible because that we have a paused task in the queue - it + * might have been paused in the meantime and we never hold both queue + * and task lock to avoid deadlocks, just bail then. */ - LOCK(&manager->queues[threadid].lock); + if (task->state != task_state_ready) { + UNLOCK(&task->lock); + return (ISC_R_SUCCESS); + } - while (!FINISHED(manager)) { - /* - * For reasons similar to those given in the comment in - * isc_task_send() above, it is safe for us to dequeue - * the task while only holding the manager lock, and then - * change the task to running state while only holding the - * task lock. - * - * If a pause has been requested, don't do any work - * until it's been released. - */ - while ((empty_readyq(manager, threadid) && - !atomic_load_relaxed(&manager->pause_req) && - !atomic_load_relaxed(&manager->exclusive_req)) && - !FINISHED(manager)) - { - XTHREADTRACE("wait"); - XTHREADTRACE(atomic_load_relaxed(&manager->pause_req) - ? "paused" - : "notpaused"); - XTHREADTRACE( - atomic_load_relaxed(&manager->exclusive_req) - ? "excreq" - : "notexcreq"); - WAIT(&manager->queues[threadid].work_available, - &manager->queues[threadid].lock); - XTHREADTRACE("awake"); - } - XTHREADTRACE("working"); + INSIST(task->state == task_state_ready); + task->state = task_state_running; + XTRACE("running"); + XTRACE(task->name); + TIME_NOW(&task->tnow); + task->now = isc_time_seconds(&task->tnow); - if (atomic_load_relaxed(&manager->pause_req) || - atomic_load_relaxed(&manager->exclusive_req)) - { - UNLOCK(&manager->queues[threadid].lock); - XTHREADTRACE("halting"); + while (true) { + if (!EMPTY(task->events)) { + event = HEAD(task->events); + DEQUEUE(task->events, event, ev_link); + task->nevents--; /* - * Switching to exclusive mode is done as a - * 2-phase-lock, checking if we have to switch is - * done without any locks on pause_req and - * exclusive_req to save time - the worst - * thing that can happen is that we'll launch one - * task more and exclusive task will be postponed a - * bit. - * - * Broadcasting on halt_cond seems suboptimal, but - * exclusive tasks are rare enough that we don't - * care. + * Execute the event action. */ - LOCK(&manager->halt_lock); - manager->halted++; - BROADCAST(&manager->halt_cond); - while (atomic_load_relaxed(&manager->pause_req) || - atomic_load_relaxed(&manager->exclusive_req)) - { - WAIT(&manager->halt_cond, &manager->halt_lock); - } - manager->halted--; - SIGNAL(&manager->halt_cond); - UNLOCK(&manager->halt_lock); - - LOCK(&manager->queues[threadid].lock); - /* Restart the loop after */ - continue; - } - - task = pop_readyq(manager, threadid); - if (task != NULL) { - unsigned int dispatch_count = 0; - bool done = false; - bool requeue = false; - bool finished = false; - isc_event_t *event; - - INSIST(VALID_TASK(task)); - - /* - * Note we only unlock the queue lock if we actually - * have a task to do. We must reacquire the queue - * lock before exiting the 'if (task != NULL)' block. - */ - UNLOCK(&manager->queues[threadid].lock); - RUNTIME_CHECK(atomic_fetch_sub_explicit( - &manager->tasks_ready, 1, - memory_order_release) > 0); - atomic_fetch_add_explicit(&manager->tasks_running, 1, - memory_order_acquire); - - LOCK(&task->lock); - /* - * It is possible because that we have a paused task - * in the queue - it might have been paused in the - * meantime and we never hold both queue and task lock - * to avoid deadlocks, just bail then. - */ - if (task->state != task_state_ready) { - UNLOCK(&task->lock); - LOCK(&manager->queues[threadid].lock); - continue; - } - INSIST(task->state == task_state_ready); - task->state = task_state_running; - XTRACE("running"); + XTRACE("execute action"); XTRACE(task->name); - TIME_NOW(&task->tnow); - task->now = isc_time_seconds(&task->tnow); - do { - if (!EMPTY(task->events)) { - event = HEAD(task->events); - DEQUEUE(task->events, event, ev_link); - task->nevents--; + if (event->ev_action != NULL) { + UNLOCK(&task->lock); + (event->ev_action)(task, event); + LOCK(&task->lock); + } + XTRACE("execution complete"); + dispatch_count++; + } - /* - * Execute the event action. - */ - XTRACE("execute action"); - XTRACE(task->name); - if (event->ev_action != NULL) { - UNLOCK(&task->lock); - (event->ev_action)( - (isc_task_t *)task, - event); - LOCK(&task->lock); - } - XTRACE("execution complete"); - dispatch_count++; - } + if (isc_refcount_current(&task->references) == 0 && + EMPTY(task->events) && !TASK_SHUTTINGDOWN(task)) + { + /* + * There are no references and no pending events for + * this task, which means it will not become runnable + * again via an external action (such as sending an + * event or detaching). + * + * We initiate shutdown to prevent it from becoming a + * zombie. + * + * We do this here instead of in the "if + * EMPTY(task->events)" block below because: + * + * If we post no shutdown events, we want the task + * to finish. + * + * If we did post shutdown events, will still want + * the task's quantum to be applied. + */ + INSIST(!task_shutdown(task)); + } - if (isc_refcount_current(&task->references) == - 0 && - EMPTY(task->events) && - !TASK_SHUTTINGDOWN(task)) - { - bool was_idle; - - /* - * There are no references and no - * pending events for this task, - * which means it will not become - * runnable again via an external - * action (such as sending an event - * or detaching). - * - * We initiate shutdown to prevent - * it from becoming a zombie. - * - * We do this here instead of in - * the "if EMPTY(task->events)" block - * below because: - * - * If we post no shutdown events, - * we want the task to finish. - * - * If we did post shutdown events, - * will still want the task's - * quantum to be applied. - */ - was_idle = task_shutdown(task); - INSIST(!was_idle); - } - - if (EMPTY(task->events)) { - /* - * Nothing else to do for this task - * right now. - */ - XTRACE("empty"); - if (isc_refcount_current( - &task->references) == 0 && - TASK_SHUTTINGDOWN(task)) { - /* - * The task is done. - */ - XTRACE("done"); - finished = true; - task->state = task_state_done; - } else { - if (task->state == - task_state_running) { - task->state = - task_state_idle; - } else if (task->state == - task_state_pausing) { - task->state = - task_state_paused; - } - } - done = true; + if (EMPTY(task->events)) { + /* + * Nothing else to do for this task right now. + */ + XTRACE("empty"); + if (isc_refcount_current(&task->references) == 0 && + TASK_SHUTTINGDOWN(task)) { + /* + * The task is done. + */ + XTRACE("done"); + finished = true; + task->state = task_state_done; + } else { + if (task->state == task_state_running) { + XTRACE("idling"); + task->state = task_state_idle; } else if (task->state == task_state_pausing) { - /* - * We got a pause request on this task, - * stop working on it and switch the - * state to paused. - */ XTRACE("pausing"); task->state = task_state_paused; - done = true; - } else if (dispatch_count >= task->quantum) { - /* - * Our quantum has expired, but - * there is more work to be done. - * We'll requeue it to the ready - * queue later. - * - * We don't check quantum until - * dispatching at least one event, - * so the minimum quantum is one. - */ - XTRACE("quantum"); - task->state = task_state_ready; - requeue = true; - done = true; } - } while (!done); - UNLOCK(&task->lock); - - if (finished) { - task_finished(task); } - - RUNTIME_CHECK(atomic_fetch_sub_explicit( - &manager->tasks_running, 1, - memory_order_release) > 0); - LOCK(&manager->queues[threadid].lock); - if (requeue) { - /* - * We know we're awake, so we don't have - * to wakeup any sleeping threads if the - * ready queue is empty before we requeue. - * - * A possible optimization if the queue is - * empty is to 'goto' the 'if (task != NULL)' - * block, avoiding the ENQUEUE of the task - * and the subsequent immediate DEQUEUE - * (since it is the only executable task). - * We don't do this because then we'd be - * skipping the exit_requested check. The - * cost of ENQUEUE is low anyway, especially - * when you consider that we'd have to do - * an extra EMPTY check to see if we could - * do the optimization. If the ready queue - * were usually nonempty, the 'optimization' - * might even hurt rather than help. - */ - push_readyq(manager, task, threadid); - } - } - - /* - * If we are in privileged execution mode and there are no - * tasks remaining on the current ready queue, then - * we're stuck. Automatically drop privileges at that - * point and continue with the regular ready queue. - */ - if (atomic_load_relaxed(&manager->mode) != - isc_taskmgrmode_normal && - atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0) - { - UNLOCK(&manager->queues[threadid].lock); - LOCK(&manager->lock); + break; + } else if (task->state == task_state_pausing) { /* - * Check once again, under lock. Mode can only - * change from privileged to normal anyway, and - * if we enter this loop twice at the same time - * we'll end up in a deadlock over queue locks. - * + * We got a pause request on this task, stop working on + * it and switch the state to paused. */ - if (atomic_load(&manager->mode) != - isc_taskmgrmode_normal && - atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0) - { - bool empty = true; - unsigned int i; - for (i = 0; i < manager->workers && empty; i++) - { - LOCK(&manager->queues[i].lock); - empty &= empty_readyq(manager, i); - UNLOCK(&manager->queues[i].lock); - } - if (empty) { - atomic_store(&manager->mode, - isc_taskmgrmode_normal); - wake_all_queues(manager); - } - } - UNLOCK(&manager->lock); - LOCK(&manager->queues[threadid].lock); + XTRACE("pausing"); + task->state = task_state_paused; + break; + } else if (dispatch_count >= task->quantum) { + /* + * Our quantum has expired, but there is more work to be + * done. We'll requeue it to the ready queue later. + * + * We don't check quantum until dispatching at least one + * event, so the minimum quantum is one. + */ + XTRACE("quantum"); + task->state = task_state_ready; + result = ISC_R_QUOTA; + break; } } - UNLOCK(&manager->queues[threadid].lock); - /* - * There might be other dispatchers waiting on empty tasks, - * wake them up. - */ - wake_all_queues(manager); + UNLOCK(&task->lock); + + if (finished) { + task_finished(task); + } + + return (result); } -static isc_threadresult_t -#ifdef _WIN32 - WINAPI -#endif /* ifdef _WIN32 */ - run(void *queuep) { - isc__taskqueue_t *tq = queuep; - isc_taskmgr_t *manager = tq->manager; - int threadid = tq->threadid; - isc_thread_setaffinity(threadid); - - XTHREADTRACE("starting"); - - dispatch(manager, threadid); - - XTHREADTRACE("exiting"); - -#ifdef OPENSSL_LEAKS - ERR_remove_state(0); -#endif /* ifdef OPENSSL_LEAKS */ - - return ((isc_threadresult_t)0); +isc_result_t +isc_task_run(isc_task_t *task) { + return (task_run(task)); } static void manager_free(isc_taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { - isc_mutex_destroy(&manager->queues[i].lock); - isc_condition_destroy(&manager->queues[i].work_available); - } + isc_refcount_destroy(&manager->references); + isc_nm_detach(&manager->nm); + isc_mutex_destroy(&manager->lock); isc_mutex_destroy(&manager->excl_lock); - isc_mutex_destroy(&manager->halt_lock); - isc_condition_destroy(&manager->halt_cond); - isc_mem_put(manager->mctx, manager->queues, - manager->workers * sizeof(isc__taskqueue_t)); manager->magic = 0; isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); } +void +isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) { + REQUIRE(VALID_MANAGER(source)); + REQUIRE(targetp != NULL && *targetp == NULL); + + isc_refcount_increment(&source->references); + + *targetp = source; +} + +void +isc_taskmgr_detach(isc_taskmgr_t *manager) { + REQUIRE(VALID_MANAGER(manager)); + + if (isc_refcount_decrement(&manager->references) == 1) { + manager_free(manager); + } +} + isc_result_t -isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, - unsigned int default_quantum, isc_nm_t *nm, +isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, isc_taskmgr_t **managerp) { - unsigned int i; isc_taskmgr_t *manager; /* * Create a new task manager. */ - REQUIRE(workers > 0); REQUIRE(managerp != NULL && *managerp == NULL); + REQUIRE(nm != NULL); manager = isc_mem_get(mctx, sizeof(*manager)); *manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC }; - atomic_store(&manager->mode, isc_taskmgrmode_normal); isc_mutex_init(&manager->lock); isc_mutex_init(&manager->excl_lock); - isc_mutex_init(&manager->halt_lock); - isc_condition_init(&manager->halt_cond); - - manager->workers = workers; - if (default_quantum == 0) { default_quantum = DEFAULT_DEFAULT_QUANTUM; } @@ -1384,42 +994,17 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, } INIT_LIST(manager->tasks); - atomic_store(&manager->tasks_count, 0); - manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); - RUNTIME_CHECK(manager->queues != NULL); - + atomic_init(&manager->tasks_count, 0); atomic_init(&manager->tasks_running, 0); atomic_init(&manager->tasks_ready, 0); - atomic_init(&manager->curq, 0); atomic_init(&manager->exiting, false); atomic_store_relaxed(&manager->exclusive_req, false); - atomic_store_relaxed(&manager->pause_req, false); isc_mem_attach(mctx, &manager->mctx); - LOCK(&manager->lock); - /* - * Start workers. - */ - for (i = 0; i < workers; i++) { - INIT_LIST(manager->queues[i].ready_tasks); - INIT_LIST(manager->queues[i].ready_priority_tasks); - isc_mutex_init(&manager->queues[i].lock); - isc_condition_init(&manager->queues[i].work_available); + isc_refcount_init(&manager->references, 1); - manager->queues[i].manager = manager; - manager->queues[i].threadid = i; - isc_thread_create(run, &manager->queues[i], - &manager->queues[i].thread); - char name[21]; - snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->queues[i].thread, name); - } - UNLOCK(&manager->lock); - - isc_thread_setconcurrency(workers); - - *managerp = (isc_taskmgr_t *)manager; + *managerp = manager; return (ISC_R_SUCCESS); } @@ -1428,15 +1013,13 @@ void isc_taskmgr_destroy(isc_taskmgr_t **managerp) { isc_taskmgr_t *manager; isc_task_t *task; - unsigned int i; - bool exiting; /* * Destroy '*managerp'. */ REQUIRE(managerp != NULL); - manager = (isc_taskmgr_t *)*managerp; + manager = *managerp; REQUIRE(VALID_MANAGER(manager)); XTHREADTRACE("isc_taskmgr_destroy"); @@ -1471,117 +1054,39 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { /* * Make sure we only get called once. */ - exiting = false; - - INSIST(!!atomic_compare_exchange_strong(&manager->exiting, &exiting, - true)); - - /* - * If privileged mode was on, turn it off. - */ - atomic_store(&manager->mode, isc_taskmgrmode_normal); + INSIST(atomic_compare_exchange_strong(&manager->exiting, + &(bool){ false }, true)); /* * Post shutdown event(s) to every task (if they haven't already been - * posted). To make things easier post idle tasks to worker 0. + * posted). */ - LOCK(&manager->queues[0].lock); for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) { task->threadid = 0; - push_readyq(manager, task, 0); + task_ready(task); } UNLOCK(&task->lock); } - UNLOCK(&manager->queues[0].lock); - /* - * Wake up any sleeping workers. This ensures we get work done if - * there's work left to do, and if there are already no tasks left - * it will cause the workers to see manager->exiting. - */ - wake_all_queues(manager); UNLOCK(&manager->lock); - /* - * Wait for all the worker threads to exit. - */ - for (i = 0; i < manager->workers; i++) { - isc_thread_join(manager->queues[i].thread, NULL); - } - - /* - * Detach from the network manager if it was set. - */ - if (manager->nm != NULL) { - isc_nm_detach(&manager->nm); - } - - manager_free(manager); + isc_taskmgr_detach(manager); *managerp = NULL; } -void -isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - - atomic_store(&manager->mode, isc_taskmgrmode_privileged); -} - -isc_taskmgrmode_t -isc_taskmgr_mode(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - return (atomic_load(&manager->mode)); -} - -void -isc__taskmgr_pause(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - - LOCK(&manager->halt_lock); - while (atomic_load_relaxed(&manager->exclusive_req) || - atomic_load_relaxed(&manager->pause_req)) - { - UNLOCK(&manager->halt_lock); - /* This is ugly but pause is used EXCLUSIVELY in tests */ - isc_thread_yield(); - LOCK(&manager->halt_lock); - } - - atomic_store_relaxed(&manager->pause_req, true); - while (manager->halted < manager->workers) { - wake_all_queues(manager); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); -} - -void -isc__taskmgr_resume(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - LOCK(&manager->halt_lock); - if (atomic_load(&manager->pause_req)) { - atomic_store(&manager->pause_req, false); - while (manager->halted > 0) { - BROADCAST(&manager->halt_cond); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - } - UNLOCK(&manager->halt_lock); -} - void isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) { REQUIRE(VALID_MANAGER(mgr)); REQUIRE(VALID_TASK(task)); LOCK(&mgr->excl_lock); if (mgr->excl != NULL) { - isc_task_detach((isc_task_t **)&mgr->excl); + isc_task_detach(&mgr->excl); } - isc_task_attach(task, (isc_task_t **)&mgr->excl); + isc_task_attach(task, &mgr->excl); UNLOCK(&mgr->excl_lock); } @@ -1594,7 +1099,7 @@ isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp) { LOCK(&mgr->excl_lock); if (mgr->excl != NULL) { - isc_task_attach((isc_task_t *)mgr->excl, taskp); + isc_task_attach(mgr->excl, taskp); } else { result = ISC_R_NOTFOUND; } @@ -1619,24 +1124,14 @@ isc_task_beginexclusive(isc_task_t *task) { task->manager->excl == NULL)); UNLOCK(&manager->excl_lock); - if (atomic_load_relaxed(&manager->exclusive_req) || - atomic_load_relaxed(&manager->pause_req)) + if (!atomic_compare_exchange_strong(&manager->exclusive_req, + &(bool){ false }, true)) { return (ISC_R_LOCKBUSY); } - LOCK(&manager->halt_lock); - INSIST(!atomic_load_relaxed(&manager->exclusive_req) && - !atomic_load_relaxed(&manager->pause_req)); - atomic_store_relaxed(&manager->exclusive_req, true); - while (manager->halted + 1 < manager->workers) { - wake_all_queues(manager); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); - if (manager->nm != NULL) { - isc_nm_pause(manager->nm); - } + isc_nm_pause(manager->nm); + return (ISC_R_SUCCESS); } @@ -1648,17 +1143,9 @@ isc_task_endexclusive(isc_task_t *task) { REQUIRE(task->state == task_state_running); manager = task->manager; - if (manager->nm != NULL) { - isc_nm_resume(manager->nm); - } - LOCK(&manager->halt_lock); - REQUIRE(atomic_load_relaxed(&manager->exclusive_req)); - atomic_store_relaxed(&manager->exclusive_req, false); - while (manager->halted > 0) { - BROADCAST(&manager->halt_cond); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); + isc_nm_resume(manager->nm); + REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req, + &(bool){ true }, false)); } void @@ -1669,7 +1156,7 @@ isc_task_pause(isc_task_t *task) { task->pause_cnt++; if (task->pause_cnt > 1) { /* - * Someone already paused this thread, just increase + * Someone already paused this task, just increase * the number of pausing clients. */ UNLOCK(&task->lock); @@ -1723,7 +1210,6 @@ isc_task_unpause(isc_task_t *task) { void isc_task_setprivilege(isc_task_t *task, bool priv) { REQUIRE(VALID_TASK(task)); - isc_taskmgr_t *manager = task->manager; uint_fast32_t oldflags, newflags; oldflags = atomic_load_acquire(&task->flags); @@ -1738,16 +1224,6 @@ isc_task_setprivilege(isc_task_t *task, bool priv) { } } while (!atomic_compare_exchange_weak_acq_rel(&task->flags, &oldflags, newflags)); - - LOCK(&manager->queues[task->threadid].lock); - if (priv && ISC_LINK_LINKED(task, ready_link)) { - ENQUEUE(manager->queues[task->threadid].ready_priority_tasks, - task, ready_priority_link); - } else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) { - DEQUEUE(manager->queues[task->threadid].ready_priority_tasks, - task, ready_priority_link); - } - UNLOCK(&manager->queues[task->threadid].lock); } bool @@ -1758,8 +1234,7 @@ isc_task_privilege(isc_task_t *task) { } bool -isc_task_exiting(isc_task_t *t) { - isc_task_t *task = (isc_task_t *)t; +isc_task_exiting(isc_task_t *task) { REQUIRE(VALID_TASK(task)); return (TASK_SHUTTINGDOWN(task)); @@ -1789,10 +1264,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) { TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded")); TRY0(xmlTextWriterEndElement(writer)); /* type */ - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers)); - TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */ - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum")); TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->default_quantum)); @@ -1898,10 +1369,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) { CHECKMEM(obj); json_object_object_add(tasks, "thread-model", obj); - obj = json_object_new_int(mgr->workers); - CHECKMEM(obj); - json_object_object_add(tasks, "worker-threads", obj); - obj = json_object_new_int(mgr->default_quantum); CHECKMEM(obj); json_object_object_add(tasks, "default-quantum", obj); diff --git a/lib/isc/task_p.h b/lib/isc/task_p.h deleted file mode 100644 index ae8f095733..0000000000 --- a/lib/isc/task_p.h +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -#ifndef ISC_TASK_P_H -#define ISC_TASK_P_H - -/*! \file */ - -/*% - * These functions allow unit tests to manipulate the processing - * of the task queue. They are not intended as part of the public API. - */ -void -isc__taskmgr_pause(isc_taskmgr_t *taskmgr); -void -isc__taskmgr_resume(isc_taskmgr_t *taskmgr); - -#endif /* ISC_TASK_P_H */ diff --git a/lib/isc/taskpool.c b/lib/isc/taskpool.c index ca794c8a49..81652d7d3a 100644 --- a/lib/isc/taskpool.c +++ b/lib/isc/taskpool.c @@ -57,10 +57,9 @@ alloc_pool(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, isc_result_t isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, - unsigned int quantum, isc_taskpool_t **poolp) { + unsigned int quantum, bool priv, isc_taskpool_t **poolp) { unsigned int i; isc_taskpool_t *pool = NULL; - isc_result_t result; INSIST(ntasks > 0); @@ -69,11 +68,13 @@ isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, /* Create the tasks */ for (i = 0; i < ntasks; i++) { - result = isc_task_create(tmgr, quantum, &pool->tasks[i]); + isc_result_t result = isc_task_create_bound(tmgr, quantum, + &pool->tasks[i], i); if (result != ISC_R_SUCCESS) { isc_taskpool_destroy(&pool); return (result); } + isc_task_setprivilege(pool->tasks[i], priv); isc_task_setname(pool->tasks[i], "taskpool", NULL); } @@ -93,9 +94,8 @@ isc_taskpool_size(isc_taskpool_t *pool) { } isc_result_t -isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, +isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv, isc_taskpool_t **targetp) { - isc_result_t result; isc_taskpool_t *pool; REQUIRE(sourcep != NULL && *sourcep != NULL); @@ -119,13 +119,15 @@ isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, /* Create new tasks */ for (i = pool->ntasks; i < size; i++) { - result = isc_task_create(pool->tmgr, pool->quantum, - &newpool->tasks[i]); + isc_result_t result = + isc_task_create_bound(pool->tmgr, pool->quantum, + &newpool->tasks[i], i); if (result != ISC_R_SUCCESS) { *sourcep = pool; isc_taskpool_destroy(&newpool); return (result); } + isc_task_setprivilege(newpool->tasks[i], priv); isc_task_setname(newpool->tasks[i], "taskpool", NULL); } @@ -151,16 +153,3 @@ isc_taskpool_destroy(isc_taskpool_t **poolp) { pool->ntasks * sizeof(isc_task_t *)); isc_mem_putanddetach(&pool->mctx, pool, sizeof(*pool)); } - -void -isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv) { - unsigned int i; - - REQUIRE(pool != NULL); - - for (i = 0; i < pool->ntasks; i++) { - if (pool->tasks[i] != NULL) { - isc_task_setprivilege(pool->tasks[i], priv); - } - } -} diff --git a/lib/isc/tests/isctest.c b/lib/isc/tests/isctest.c index 0bd220c2fd..d86007903b 100644 --- a/lib/isc/tests/isctest.c +++ b/lib/isc/tests/isctest.c @@ -64,12 +64,12 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } - if (netmgr != NULL) { - isc_nm_detach(&netmgr); - } } static isc_result_t @@ -89,7 +89,7 @@ create_managers(unsigned int workers) { isc_hp_init(6 * workers); netmgr = isc_nm_start(test_mctx, workers); - CHECK(isc_taskmgr_create(test_mctx, workers, 0, netmgr, &taskmgr)); + CHECK(isc_taskmgr_create(test_mctx, 0, netmgr, &taskmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); isc_taskmgr_setexcltask(taskmgr, maintask); diff --git a/lib/isc/tests/task_test.c b/lib/isc/tests/task_test.c index 0aad028fb1..f6b40f203d 100644 --- a/lib/isc/tests/task_test.c +++ b/lib/isc/tests/task_test.c @@ -37,7 +37,6 @@ #include #include -#include "../task_p.h" #include "isctest.h" /* Set to true (or use -v option) for verbose output */ @@ -120,6 +119,8 @@ set(isc_task_t *task, isc_event_t *event) { atomic_store(value, atomic_fetch_add(&counter, 1)); } +#include + static void set_and_drop(isc_task_t *task, isc_event_t *event) { atomic_int_fast32_t *value = (atomic_int_fast32_t *)event->ev_arg; @@ -128,8 +129,7 @@ set_and_drop(isc_task_t *task, isc_event_t *event) { isc_event_free(&event); LOCK(&lock); - atomic_store(value, (int)isc_taskmgr_mode(taskmgr)); - atomic_fetch_add(&counter, 1); + atomic_store(value, atomic_fetch_add(&counter, 1)); UNLOCK(&lock); } @@ -204,17 +204,17 @@ privileged_events(void **state) { UNUSED(state); atomic_init(&counter, 1); - atomic_init(&a, 0); - atomic_init(&b, 0); - atomic_init(&c, 0); - atomic_init(&d, 0); - atomic_init(&e, 0); + atomic_init(&a, -1); + atomic_init(&b, -1); + atomic_init(&c, -1); + atomic_init(&d, -1); + atomic_init(&e, -1); /* - * Pause the task manager so we can fill up the work queue - * without things happening while we do it. + * Pause the net/task manager so we can fill up the work + * queue without things happening while we do it. */ - isc__taskmgr_pause(taskmgr); + isc_nm_pause(netmgr); result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -233,7 +233,7 @@ privileged_events(void **state) { &a, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&a), 0); + assert_int_equal(atomic_load(&a), -1); isc_task_send(task1, &event); /* Second event: not privileged */ @@ -241,7 +241,7 @@ privileged_events(void **state) { &b, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&b), 0); + assert_int_equal(atomic_load(&b), -1); isc_task_send(task2, &event); /* Third event: privileged */ @@ -249,7 +249,7 @@ privileged_events(void **state) { &c, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&c), 0); + assert_int_equal(atomic_load(&c), -1); isc_task_send(task1, &event); /* Fourth event: privileged */ @@ -257,7 +257,7 @@ privileged_events(void **state) { &d, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&d), 0); + assert_int_equal(atomic_load(&d), -1); isc_task_send(task1, &event); /* Fifth event: not privileged */ @@ -265,19 +265,15 @@ privileged_events(void **state) { &e, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&e), 0); + assert_int_equal(atomic_load(&e), -1); isc_task_send(task2, &event); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setprivilegedmode(taskmgr); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); - - isc__taskmgr_resume(taskmgr); + isc_nm_resume(netmgr); /* We're waiting for *all* variables to be set */ - while ((atomic_load(&a) == 0 || atomic_load(&b) == 0 || - atomic_load(&c) == 0 || atomic_load(&d) == 0 || - atomic_load(&e) == 0) && + while ((atomic_load(&a) < 0 || atomic_load(&b) < 0 || + atomic_load(&c) < 0 || atomic_load(&d) < 0 || + atomic_load(&e) < 0) && i++ < 5000) { isc_test_nap(1000); @@ -293,16 +289,14 @@ privileged_events(void **state) { assert_true(atomic_load(&d) <= 3); /* ...and the non-privileged tasks that set b and e, last */ - assert_true(atomic_load(&b) >= 4); - assert_true(atomic_load(&e) >= 4); + assert_true(atomic_load(&b) > 3); + assert_true(atomic_load(&e) > 3); assert_int_equal(atomic_load(&counter), 6); isc_task_setprivilege(task1, false); assert_false(isc_task_privilege(task1)); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_task_destroy(&task1); assert_null(task1); isc_task_destroy(&task2); @@ -331,10 +325,10 @@ privilege_drop(void **state) { atomic_init(&e, -1); /* - * Pause the task manager so we can fill up the work queue + * Pause the net/task manager so we can fill up the work queue * without things happening while we do it. */ - isc__taskmgr_pause(taskmgr); + isc_nm_pause(netmgr); result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -388,11 +382,7 @@ privilege_drop(void **state) { assert_int_equal(atomic_load(&e), -1); isc_task_send(task2, &event); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setprivilegedmode(taskmgr); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); - - isc__taskmgr_resume(taskmgr); + isc_nm_resume(netmgr); /* We're waiting for all variables to be set. */ while ((atomic_load(&a) == -1 || atomic_load(&b) == -1 || @@ -407,19 +397,17 @@ privilege_drop(void **state) { * We need to check that all privilege mode events were fired * in privileged mode, and non privileged in non-privileged. */ - assert_true(atomic_load(&a) == isc_taskmgrmode_privileged || - atomic_load(&c) == isc_taskmgrmode_privileged || - atomic_load(&d) == isc_taskmgrmode_privileged); + assert_true(atomic_load(&a) <= 3); + assert_true(atomic_load(&c) <= 3); + assert_true(atomic_load(&d) <= 3); /* ...and neither of the non-privileged tasks did... */ - assert_true(atomic_load(&b) == isc_taskmgrmode_normal || - atomic_load(&e) == isc_taskmgrmode_normal); + assert_true(atomic_load(&b) > 3); + assert_true(atomic_load(&e) > 3); /* ...but all five of them did run. */ assert_int_equal(atomic_load(&counter), 6); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_task_destroy(&task1); assert_null(task1); isc_task_destroy(&task2); @@ -695,6 +683,7 @@ exclusive_cb(isc_task_t *task, isc_event_t *event) { if (atomic_load(&done)) { isc_mem_put(event->ev_destroy_arg, event->ev_arg, sizeof(int)); isc_event_free(&event); + atomic_fetch_sub(&counter, 1); } else { isc_task_send(task, &event); } @@ -708,6 +697,8 @@ task_exclusive(void **state) { UNUSED(state); + atomic_init(&counter, 0); + for (i = 0; i < 10; i++) { isc_event_t *event = NULL; int *v; @@ -732,11 +723,16 @@ task_exclusive(void **state) { assert_non_null(event); isc_task_send(tasks[i], &event); + atomic_fetch_add(&counter, 1); } for (i = 0; i < 10; i++) { isc_task_detach(&tasks[i]); } + + while (atomic_load(&counter) > 0) { + isc_test_nap(1000); + } } /* @@ -805,7 +801,8 @@ manytasks(void **state) { isc_mem_debugging = ISC_MEM_DEBUGRECORD; isc_mem_create(&mctx); - result = isc_taskmgr_create(mctx, 4, 0, NULL, &taskmgr); + netmgr = isc_nm_start(mctx, 4); + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); assert_int_equal(result, ISC_R_SUCCESS); atomic_init(&done, false); @@ -822,6 +819,7 @@ manytasks(void **state) { UNLOCK(&lock); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_mem_destroy(&mctx); isc_condition_destroy(&cv); isc_mutex_destroy(&lock); diff --git a/lib/isc/tests/taskpool_test.c b/lib/isc/tests/taskpool_test.c index a22b0511a0..9878599721 100644 --- a/lib/isc/tests/taskpool_test.c +++ b/lib/isc/tests/taskpool_test.c @@ -28,6 +28,9 @@ #include "isctest.h" +#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') +#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) + static int _setup(void **state) { isc_result_t result; @@ -57,7 +60,7 @@ create_pool(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, false, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 8); @@ -73,13 +76,13 @@ expand_pool(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, &pool1); + result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, false, &pool1); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool1), 10); /* resizing to a smaller size should have no effect */ hold = pool1; - result = isc_taskpool_expand(&pool1, 5, &pool2); + result = isc_taskpool_expand(&pool1, 5, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 10); assert_ptr_equal(pool2, hold); @@ -89,7 +92,7 @@ expand_pool(void **state) { /* resizing to the same size should have no effect */ hold = pool1; - result = isc_taskpool_expand(&pool1, 10, &pool2); + result = isc_taskpool_expand(&pool1, 10, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 10); assert_ptr_equal(pool2, hold); @@ -99,7 +102,7 @@ expand_pool(void **state) { /* resizing to larger size should make a new pool */ hold = pool1; - result = isc_taskpool_expand(&pool1, 20, &pool2); + result = isc_taskpool_expand(&pool1, 20, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 20); assert_ptr_not_equal(pool2, hold); @@ -118,19 +121,19 @@ get_tasks(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, false, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 2); /* two tasks in pool; make sure we can access them more than twice */ isc_taskpool_gettask(pool, &task1); - assert_non_null(task1); + assert_true(VALID_TASK(task1)); isc_taskpool_gettask(pool, &task2); - assert_non_null(task2); + assert_true(VALID_TASK(task2)); isc_taskpool_gettask(pool, &task3); - assert_non_null(task3); + assert_true(VALID_TASK(task3)); isc_task_destroy(&task1); isc_task_destroy(&task2); @@ -149,30 +152,22 @@ set_privilege(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, true, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 2); - isc_taskpool_setprivilege(pool, true); - isc_taskpool_gettask(pool, &task1); isc_taskpool_gettask(pool, &task2); isc_taskpool_gettask(pool, &task3); - assert_non_null(task1); - assert_non_null(task2); - assert_non_null(task3); + assert_true(VALID_TASK(task1)); + assert_true(VALID_TASK(task2)); + assert_true(VALID_TASK(task3)); assert_true(isc_task_privilege(task1)); assert_true(isc_task_privilege(task2)); assert_true(isc_task_privilege(task3)); - isc_taskpool_setprivilege(pool, false); - - assert_false(isc_task_privilege(task1)); - assert_false(isc_task_privilege(task2)); - assert_false(isc_task_privilege(task3)); - isc_task_destroy(&task1); isc_task_destroy(&task2); isc_task_destroy(&task3); diff --git a/lib/isc/tests/timer_test.c b/lib/isc/tests/timer_test.c index 4d02ff3b74..21b5679de2 100644 --- a/lib/isc/tests/timer_test.c +++ b/lib/isc/tests/timer_test.c @@ -435,8 +435,8 @@ reset(void **state) { setup_test(isc_timertype_ticker, &expires, &interval, test_reset); } -static int startflag; -static int shutdownflag; +static atomic_bool startflag; +static atomic_bool shutdownflag; static isc_timer_t *tickertimer = NULL; static isc_timer_t *oncetimer = NULL; static isc_task_t *task1 = NULL; @@ -447,23 +447,6 @@ static isc_task_t *task2 = NULL; * in its queue, until signaled by task2. */ -static void -start_event(isc_task_t *task, isc_event_t *event) { - UNUSED(task); - - if (verbose) { - print_message("# start_event\n"); - } - - LOCK(&mx); - while (!startflag) { - (void)isc_condition_wait(&cv, &mx); - } - UNLOCK(&mx); - - isc_event_free(&event); -} - static void tick_event(isc_task_t *task, isc_event_t *event) { isc_result_t result; @@ -472,6 +455,14 @@ tick_event(isc_task_t *task, isc_event_t *event) { UNUSED(task); + if (!atomic_load(&startflag)) { + if (verbose) { + print_message("# tick_event %d\n", -1); + } + isc_event_free(&event); + return; + } + int tick = atomic_fetch_add(&eventcnt, 1); if (verbose) { print_message("# tick_event %d\n", tick); @@ -496,8 +487,6 @@ tick_event(isc_task_t *task, isc_event_t *event) { static void once_event(isc_task_t *task, isc_event_t *event) { - isc_result_t result; - if (verbose) { print_message("# once_event\n"); } @@ -505,12 +494,7 @@ once_event(isc_task_t *task, isc_event_t *event) { /* * Allow task1 to start processing events. */ - LOCK(&mx); - startflag = 1; - - result = isc_condition_broadcast(&cv); - subthread_assert_result_equal(result, ISC_R_SUCCESS); - UNLOCK(&mx); + atomic_store(&startflag, true); isc_event_free(&event); isc_task_shutdown(task); @@ -518,8 +502,6 @@ once_event(isc_task_t *task, isc_event_t *event) { static void shutdown_purge(isc_task_t *task, isc_event_t *event) { - isc_result_t result; - UNUSED(task); UNUSED(event); @@ -530,12 +512,7 @@ shutdown_purge(isc_task_t *task, isc_event_t *event) { /* * Signal shutdown processing complete. */ - LOCK(&mx); - shutdownflag = 1; - - result = isc_condition_signal(&cv); - subthread_assert_result_equal(result, ISC_R_SUCCESS); - UNLOCK(&mx); + atomic_store(&shutdownflag, 1); isc_event_free(&event); } @@ -544,22 +521,17 @@ shutdown_purge(isc_task_t *task, isc_event_t *event) { static void purge(void **state) { isc_result_t result; - isc_event_t *event = NULL; isc_time_t expires; isc_interval_t interval; UNUSED(state); - startflag = 0; - shutdownflag = 0; + atomic_init(&startflag, 0); + atomic_init(&shutdownflag, 0); atomic_init(&eventcnt, 0); seconds = 1; nanoseconds = 0; - isc_mutex_init(&mx); - - isc_condition_init(&cv); - result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -569,13 +541,6 @@ purge(void **state) { result = isc_task_create(taskmgr, 0, &task2); assert_int_equal(result, ISC_R_SUCCESS); - LOCK(&mx); - - event = isc_event_allocate(test_mctx, (void *)1, (isc_eventtype_t)1, - start_event, NULL, sizeof(*event)); - assert_non_null(event); - isc_task_send(task1, &event); - isc_time_settoepoch(&expires); isc_interval_set(&interval, seconds, 0); @@ -600,13 +565,10 @@ purge(void **state) { /* * Wait for shutdown processing to complete. */ - while (!shutdownflag) { - result = isc_condition_wait(&cv, &mx); - assert_int_equal(result, ISC_R_SUCCESS); + while (!atomic_load(&shutdownflag)) { + isc_test_nap(1000); } - UNLOCK(&mx); - assert_int_equal(atomic_load(&errcnt), ISC_R_SUCCESS); assert_int_equal(atomic_load(&eventcnt), 1); @@ -615,7 +577,6 @@ purge(void **state) { isc_timer_detach(&oncetimer); isc_task_destroy(&task1); isc_task_destroy(&task2); - isc_mutex_destroy(&mx); } int diff --git a/lib/isc/timer.c b/lib/isc/timer.c index 2798491d85..af49801660 100644 --- a/lib/isc/timer.c +++ b/lib/isc/timer.c @@ -94,14 +94,13 @@ struct isc_timermgr { }; void -isc_timermgr_poke(isc_timermgr_t *manager0); +isc_timermgr_poke(isc_timermgr_t *manager); static inline isc_result_t schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { isc_result_t result; isc_timermgr_t *manager; isc_time_t due; - int cmp; /*! * Note: the caller must ensure locking. @@ -145,7 +144,7 @@ schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { /* * Already scheduled. */ - cmp = isc_time_compare(&due, &timer->due); + int cmp = isc_time_compare(&due, &timer->due); timer->due = due; switch (cmp) { case -1: @@ -187,7 +186,6 @@ schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { static inline void deschedule(isc_timer_t *timer) { - bool need_wakeup = false; isc_timermgr_t *manager; /* @@ -196,6 +194,7 @@ deschedule(isc_timer_t *timer) { manager = timer->manager; if (timer->index > 0) { + bool need_wakeup = false; if (timer->index == 1) { need_wakeup = true; } @@ -223,6 +222,7 @@ destroy(isc_timer_t *timer) { (void)isc_task_purgerange(timer->task, timer, ISC_TIMEREVENT_FIRSTEVENT, ISC_TIMEREVENT_LASTEVENT, NULL); deschedule(timer); + UNLINK(manager->timers, timer, link); UNLOCK(&manager->lock); @@ -467,10 +467,6 @@ isc_timer_touch(isc_timer_t *timer) { void isc_timer_attach(isc_timer_t *timer, isc_timer_t **timerp) { - /* - * Attach *timerp to timer. - */ - REQUIRE(VALID_TIMER(timer)); REQUIRE(timerp != NULL && *timerp == NULL); isc_refcount_increment(&timer->references); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 6889fb4f0e..1404a8316c 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -104,10 +104,10 @@ isc_socketmgr_setreserved isc_socketmgr_setstats isc_task_getname isc_task_gettag +isc_task_run isc_task_unsendrange -isc_taskmgr_mode -isc__taskmgr_pause -isc__taskmgr_resume +isc_taskmgr_attach +isc_taskmgr_detach isc_aes128_crypt isc_aes192_crypt isc_aes256_crypt @@ -654,7 +654,6 @@ isc_task_unsend isc_taskmgr_create isc_taskmgr_destroy isc_taskmgr_excltask -isc_taskmgr_mode @IF NOTYET isc_taskmgr_renderjson @END NOTYET @@ -662,12 +661,10 @@ isc_taskmgr_renderjson isc_taskmgr_renderxml @END LIBXML2 isc_taskmgr_setexcltask -isc_taskmgr_setprivilegedmode isc_taskpool_create isc_taskpool_destroy isc_taskpool_expand isc_taskpool_gettask -isc_taskpool_setprivilege isc_taskpool_size isc_thread_create isc_thread_join diff --git a/lib/ns/tests/nstest.c b/lib/ns/tests/nstest.c index 5595a22c57..0f309f01a8 100644 --- a/lib/ns/tests/nstest.c +++ b/lib/ns/tests/nstest.c @@ -51,6 +51,7 @@ isc_mem_t *mctx = NULL; isc_log_t *lctx = NULL; +isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_task_t *maintask = NULL; isc_timermgr_t *timermgr = NULL; @@ -213,6 +214,9 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } @@ -237,7 +241,8 @@ create_managers(void) { isc_event_t *event = NULL; ncpus = isc_os_ncpus(); - CHECK(isc_taskmgr_create(mctx, ncpus, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, ncpus); + CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); isc_taskmgr_setexcltask(taskmgr, maintask); CHECK(isc_task_onshutdown(maintask, shutdown_managers, NULL)); diff --git a/util/copyrights b/util/copyrights index c69312fe83..e1c7d591a7 100644 --- a/util/copyrights +++ b/util/copyrights @@ -1981,7 +1981,6 @@ ./lib/isc/string.c C 1999,2000,2001,2003,2004,2005,2006,2007,2011,2012,2014,2015,2016,2018,2019,2020,2021 ./lib/isc/symtab.c C 1996,1997,1998,1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021 ./lib/isc/task.c C 1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021 -./lib/isc/task_p.h C 2018,2019,2020,2021 ./lib/isc/taskpool.c C 1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021 ./lib/isc/tests/aes_test.c C 2014,2016,2018,2019,2020,2021 ./lib/isc/tests/buffer_test.c C 2014,2015,2016,2017,2018,2019,2020,2021