diff --git a/bin/delv/delv.c b/bin/delv/delv.c index 79addc0254..fc974853c4 100644 --- a/bin/delv/delv.c +++ b/bin/delv/delv.c @@ -36,6 +36,7 @@ #include #include #include +#include #ifdef WIN32 #include #endif /* ifdef WIN32 */ @@ -1736,6 +1737,7 @@ main(int argc, char *argv[]) { dns_namelist_t namelist; unsigned int resopt; isc_appctx_t *actx = NULL; + isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_socketmgr_t *socketmgr = NULL; isc_timermgr_t *timermgr = NULL; @@ -1759,7 +1761,8 @@ main(int argc, char *argv[]) { isc_mem_create(&mctx); CHECK(isc_appctx_create(mctx, &actx)); - CHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); CHECK(isc_socketmgr_create(mctx, &socketmgr)); CHECK(isc_timermgr_create(mctx, &timermgr)); @@ -1867,6 +1870,9 @@ cleanup: if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } diff --git a/bin/dig/dighost.c b/bin/dig/dighost.c index 4d2aee207e..9473219c2a 100644 --- a/bin/dig/dighost.c +++ b/bin/dig/dighost.c @@ -1362,7 +1362,7 @@ setup_libs(void) { netmgr = isc_nm_start(mctx, 1); - result = isc_taskmgr_create(mctx, 1, 0, netmgr, &taskmgr); + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); check_result(result, "isc_taskmgr_create"); result = isc_task_create(taskmgr, 0, &global_task); diff --git a/bin/dnssec/dnssec-signzone.c b/bin/dnssec/dnssec-signzone.c index d97daa78f7..3a88e861e3 100644 --- a/bin/dnssec/dnssec-signzone.c +++ b/bin/dnssec/dnssec-signzone.c @@ -144,6 +144,7 @@ static unsigned int nsigned = 0, nretained = 0, ndropped = 0; static unsigned int nverified = 0, nverifyfailed = 0; static const char *directory = NULL, *dsdir = NULL; static isc_mutex_t namelock, statslock; +static isc_nm_t *netmgr = NULL; static isc_taskmgr_t *taskmgr = NULL; static dns_db_t *gdb; /* The database */ static dns_dbversion_t *gversion; /* The database version */ @@ -3953,7 +3954,9 @@ main(int argc, char *argv[]) { print_time(outfp); print_version(outfp); - result = isc_taskmgr_create(mctx, ntasks, 0, NULL, &taskmgr); + netmgr = isc_nm_start(mctx, ntasks); + + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); if (result != ISC_R_SUCCESS) { fatal("failed to create task manager: %s", isc_result_totext(result)); @@ -4009,6 +4012,7 @@ main(int argc, char *argv[]) { isc_task_detach(&tasks[i]); } isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_mem_put(mctx, tasks, ntasks * sizeof(isc_task_t *)); postsign(); TIME_NOW(&sign_finish); diff --git a/bin/named/main.c b/bin/named/main.c index c22cefc1b2..a6c6c3bd37 100644 --- a/bin/named/main.c +++ b/bin/named/main.c @@ -960,7 +960,7 @@ create_managers(void) { return (ISC_R_UNEXPECTED); } - result = isc_taskmgr_create(named_g_mctx, named_g_cpus, 0, named_g_nm, + result = isc_taskmgr_create(named_g_mctx, 0, named_g_nm, &named_g_taskmgr); if (result != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, @@ -1011,13 +1011,9 @@ destroy_managers(void) { * isc_taskmgr_destroy() will block until all tasks have exited. */ isc_taskmgr_destroy(&named_g_taskmgr); + isc_nm_destroy(&named_g_nm); isc_timermgr_destroy(&named_g_timermgr); isc_socketmgr_destroy(&named_g_socketmgr); - - /* - * At this point is safe to destroy the netmgr. - */ - isc_nm_destroy(&named_g_nm); } static void diff --git a/bin/named/server.c b/bin/named/server.c index 8ec8b4343a..6f050af36d 100644 --- a/bin/named/server.c +++ b/bin/named/server.c @@ -9866,7 +9866,7 @@ view_loaded(void *arg) { } static isc_result_t -load_zones(named_server_t *server, bool init, bool reconfig) { +load_zones(named_server_t *server, bool reconfig) { isc_result_t result; dns_view_t *view; ns_zoneload_t *zl; @@ -9921,16 +9921,6 @@ cleanup: if (isc_refcount_decrement(&zl->refs) == 1) { isc_refcount_destroy(&zl->refs); isc_mem_put(server->mctx, zl, sizeof(*zl)); - } else if (init) { - /* - * Place the task manager into privileged mode. This - * ensures that after we leave task-exclusive mode, no - * other tasks will be able to run except for the ones - * that are loading zones. (This should only be done during - * the initial server setup; it isn't necessary during - * a reload.) - */ - isc_taskmgr_setprivilegedmode(named_g_taskmgr); } isc_task_endexclusive(server->task); @@ -9998,7 +9988,7 @@ run_server(isc_task_t *task, isc_event_t *event) { CHECKFATAL(load_configuration(named_g_conffile, server, true), "loading configuration"); - CHECKFATAL(load_zones(server, true, false), "loading zones"); + CHECKFATAL(load_zones(server, false), "loading zones"); #ifdef ENABLE_AFL named_g_run_done = true; #endif /* ifdef ENABLE_AFL */ @@ -10511,7 +10501,7 @@ reload(named_server_t *server) { CHECK(loadconfig(server)); - result = load_zones(server, false, false); + result = load_zones(server, false); if (result == ISC_R_SUCCESS) { isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL, NAMED_LOGMODULE_SERVER, ISC_LOG_INFO, @@ -10880,7 +10870,7 @@ named_server_reconfigcommand(named_server_t *server) { CHECK(loadconfig(server)); - result = load_zones(server, false, true); + result = load_zones(server, true); if (result == ISC_R_SUCCESS) { isc_log_write(named_g_lctx, NAMED_LOGCATEGORY_GENERAL, NAMED_LOGMODULE_SERVER, ISC_LOG_INFO, diff --git a/bin/nsupdate/nsupdate.c b/bin/nsupdate/nsupdate.c index 73cb363fe3..3b505b9694 100644 --- a/bin/nsupdate/nsupdate.c +++ b/bin/nsupdate/nsupdate.c @@ -125,6 +125,7 @@ static bool usegsstsig = false; static bool use_win2k_gsstsig = false; static bool tried_other_gsstsig = false; static bool local_only = false; +static isc_nm_t *netmgr = NULL; static isc_taskmgr_t *taskmgr = NULL; static isc_task_t *global_task = NULL; static isc_event_t *global_event = NULL; @@ -927,7 +928,9 @@ setup_system(void) { result = isc_timermgr_create(gmctx, &timermgr); check_result(result, "dns_timermgr_create"); - result = isc_taskmgr_create(gmctx, 1, 0, NULL, &taskmgr); + netmgr = isc_nm_start(gmctx, 1); + + result = isc_taskmgr_create(gmctx, 0, netmgr, &taskmgr); check_result(result, "isc_taskmgr_create"); result = isc_task_create(taskmgr, 0, &global_task); @@ -3311,6 +3314,9 @@ cleanup(void) { ddebug("Shutting down task manager"); isc_taskmgr_destroy(&taskmgr); + ddebug("Shutting down network manager"); + isc_nm_destroy(&netmgr); + ddebug("Destroying event"); isc_event_free(&global_event); diff --git a/bin/rndc/rndc.c b/bin/rndc/rndc.c index 47c862e24f..624910a02d 100644 --- a/bin/rndc/rndc.c +++ b/bin/rndc/rndc.c @@ -1032,7 +1032,7 @@ main(int argc, char **argv) { isc_mem_create(&rndc_mctx); netmgr = isc_nm_start(rndc_mctx, 1); DO("create task manager", - isc_taskmgr_create(rndc_mctx, 1, 0, netmgr, &taskmgr)); + isc_taskmgr_create(rndc_mctx, 0, netmgr, &taskmgr)); DO("create task", isc_task_create(taskmgr, 0, &rndc_task)); isc_log_create(rndc_mctx, &log, &logconfig); isc_log_setcontext(log); diff --git a/bin/tests/system/.gitignore b/bin/tests/system/.gitignore index d8ac7e9268..82dbeafec9 100644 --- a/bin/tests/system/.gitignore +++ b/bin/tests/system/.gitignore @@ -13,6 +13,7 @@ named.run parallel.mk /*.log /*.trs +/resolve /run.sh /run.log /start.sh diff --git a/bin/tests/system/pipelined/pipequeries.c b/bin/tests/system/pipelined/pipequeries.c index 38335a148e..02f312ee2f 100644 --- a/bin/tests/system/pipelined/pipequeries.c +++ b/bin/tests/system/pipelined/pipequeries.c @@ -206,7 +206,8 @@ main(int argc, char *argv[]) { isc_result_t result; isc_log_t *lctx; isc_logconfig_t *lcfg; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_task_t *task; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; @@ -276,8 +277,9 @@ main(int argc, char *argv[]) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -322,6 +324,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); dst_lib_destroy(); diff --git a/bin/tests/system/resolve.c b/bin/tests/system/resolve.c index 88f95986c3..b2fc5225a8 100644 --- a/bin/tests/system/resolve.c +++ b/bin/tests/system/resolve.c @@ -53,6 +53,84 @@ #include +/* + * Global contexts + */ + +isc_mem_t *ctxs_mctx = NULL; +isc_appctx_t *ctxs_actx = NULL; +isc_nm_t *ctxs_netmgr = NULL; +isc_taskmgr_t *ctxs_taskmgr = NULL; +isc_socketmgr_t *ctxs_socketmgr = NULL; +isc_timermgr_t *ctxs_timermgr = NULL; + +static void +ctxs_destroy(void) { + if (ctxs_netmgr != NULL) { + isc_nm_closedown(ctxs_netmgr); + } + + if (ctxs_taskmgr != NULL) { + isc_taskmgr_destroy(&ctxs_taskmgr); + } + + if (ctxs_netmgr != NULL) { + isc_nm_destroy(&ctxs_netmgr); + } + + if (ctxs_timermgr != NULL) { + isc_timermgr_destroy(&ctxs_timermgr); + } + + if (ctxs_socketmgr != NULL) { + isc_socketmgr_destroy(&ctxs_socketmgr); + } + + if (ctxs_actx != NULL) { + isc_appctx_destroy(&ctxs_actx); + } + + if (ctxs_mctx != NULL) { + isc_mem_destroy(&ctxs_mctx); + } +} + +static isc_result_t +ctxs_init(void) { + isc_result_t result; + + isc_mem_create(&ctxs_mctx); + + result = isc_appctx_create(ctxs_mctx, &ctxs_actx); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + ctxs_netmgr = isc_nm_start(ctxs_mctx, 1); + + result = isc_taskmgr_create(ctxs_mctx, 0, ctxs_netmgr, &ctxs_taskmgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + result = isc_socketmgr_create(ctxs_mctx, &ctxs_socketmgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + result = isc_timermgr_create(ctxs_mctx, &ctxs_timermgr); + if (result != ISC_R_SUCCESS) { + goto fail; + } + + return (ISC_R_SUCCESS); + +fail: + ctxs_destroy(); + + return (result); +} + static char *algname; static isc_result_t @@ -93,8 +171,7 @@ usage(void) { } static void -set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep, - isc_mem_t **mctxp) { +set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep) { isc_result_t result; dns_fixedname_t fkeyname; unsigned int namelen; @@ -109,8 +186,6 @@ set_key(dns_client_t *client, char *keynamestr, char *keystr, bool is_sep, isc_region_t r; dns_secalg_t alg; - isc_mem_create(mctxp); - if (algname != NULL) { tr.base = algname; tr.length = strlen(algname); @@ -176,7 +251,6 @@ addserver(dns_client_t *client, const char *addrstr, const char *port, isc_sockaddr_t sa; isc_sockaddrlist_t servers; isc_result_t result; - unsigned int namelen; isc_buffer_t b; dns_fixedname_t fname; dns_name_t *name = NULL; @@ -201,7 +275,7 @@ addserver(dns_client_t *client, const char *addrstr, const char *port, ISC_LIST_APPEND(servers, &sa, link); if (name_space != NULL) { - namelen = strlen(name_space); + unsigned int namelen = strlen(name_space); isc_buffer_constinit(&b, name_space, namelen); isc_buffer_add(&b, namelen); name = dns_fixedname_initname(&fname); @@ -240,15 +314,9 @@ main(int argc, char *argv[]) { dns_rdatatype_t type = dns_rdatatype_a; dns_rdataset_t *rdataset; dns_namelist_t namelist; - isc_mem_t *keymctx = NULL; unsigned int clientopt, resopt = 0; bool is_sep = false; const char *port = "53"; - isc_mem_t *mctx = NULL; - isc_appctx_t *actx = NULL; - isc_taskmgr_t *taskmgr = NULL; - isc_socketmgr_t *socketmgr = NULL; - isc_timermgr_t *timermgr = NULL; struct in_addr in4; struct in6_addr in6; isc_sockaddr_t a4, a6; @@ -361,32 +429,15 @@ main(int argc, char *argv[]) { exit(1); } - isc_mem_create(&mctx); - - result = isc_appctx_create(mctx, &actx); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_app_ctxstart(actx); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_socketmgr_create(mctx, &socketmgr); - if (result != ISC_R_SUCCESS) { - goto cleanup; - } - result = isc_timermgr_create(mctx, &timermgr); + result = ctxs_init(); if (result != ISC_R_SUCCESS) { goto cleanup; } clientopt = 0; - result = dns_client_create(mctx, actx, taskmgr, socketmgr, timermgr, - clientopt, &client, addr4, addr6); + result = dns_client_create(ctxs_mctx, ctxs_actx, ctxs_taskmgr, + ctxs_socketmgr, ctxs_timermgr, clientopt, + &client, addr4, addr6); if (result != ISC_R_SUCCESS) { fprintf(stderr, "dns_client_create failed: %u, %s\n", result, isc_result_totext(result)); @@ -398,7 +449,8 @@ main(int argc, char *argv[]) { irs_resconf_t *resconf = NULL; isc_sockaddrlist_t *nameservers; - result = irs_resconf_load(mctx, "/etc/resolv.conf", &resconf); + result = irs_resconf_load(ctxs_mctx, "/etc/resolv.conf", + &resconf); if (result != ISC_R_SUCCESS && result != ISC_R_FILENOTFOUND) { fprintf(stderr, "irs_resconf_load failed: %u\n", result); @@ -430,7 +482,7 @@ main(int argc, char *argv[]) { "while key name is provided\n"); exit(1); } - set_key(client, keynamestr, keystr, is_sep, &keymctx); + set_key(client, keynamestr, keystr, is_sep); } /* Construct qname */ @@ -470,25 +522,11 @@ main(int argc, char *argv[]) { /* Cleanup */ cleanup: - dns_client_destroy(&client); + if (client != NULL) { + dns_client_destroy(&client); + } - if (taskmgr != NULL) { - isc_taskmgr_destroy(&taskmgr); - } - if (timermgr != NULL) { - isc_timermgr_destroy(&timermgr); - } - if (socketmgr != NULL) { - isc_socketmgr_destroy(&socketmgr); - } - if (actx != NULL) { - isc_appctx_destroy(&actx); - } - isc_mem_detach(&mctx); - - if (keynamestr != NULL) { - isc_mem_destroy(&keymctx); - } + ctxs_destroy(); dns_lib_shutdown(); return (0); diff --git a/bin/tests/system/tkey/keycreate.c b/bin/tests/system/tkey/keycreate.c index e2ecc74761..51ed1212dd 100644 --- a/bin/tests/system/tkey/keycreate.c +++ b/bin/tests/system/tkey/keycreate.c @@ -192,7 +192,8 @@ sendquery(isc_task_t *task, isc_event_t *event) { int main(int argc, char *argv[]) { char *ourkeyname; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; isc_socket_t *sock; @@ -234,8 +235,9 @@ main(int argc, char *argv[]) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -292,6 +294,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_socket_detach(&sock); isc_socketmgr_destroy(&socketmgr); isc_timermgr_destroy(&timermgr); diff --git a/bin/tests/system/tkey/keydelete.c b/bin/tests/system/tkey/keydelete.c index 2fb173bd39..cc92df049c 100644 --- a/bin/tests/system/tkey/keydelete.c +++ b/bin/tests/system/tkey/keydelete.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -135,7 +136,8 @@ sendquery(isc_task_t *task, isc_event_t *event) { int main(int argc, char **argv) { char *keyname; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr; + isc_taskmgr_t *taskmgr = NULL; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; isc_socket_t *sock; @@ -177,8 +179,9 @@ main(int argc, char **argv) { RUNCHECK(dst_lib_init(mctx, NULL)); - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -235,6 +238,7 @@ main(int argc, char **argv) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_socket_detach(&sock); isc_socketmgr_destroy(&socketmgr); isc_timermgr_destroy(&timermgr); diff --git a/bin/tools/mdig.c b/bin/tools/mdig.c index d48b3e995b..b35292f51e 100644 --- a/bin/tools/mdig.c +++ b/bin/tools/mdig.c @@ -2083,7 +2083,8 @@ main(int argc, char *argv[]) { isc_sockaddr_t bind_any; isc_log_t *lctx; isc_logconfig_t *lcfg; - isc_taskmgr_t *taskmgr; + isc_nm_t *netmgr = NULL; + isc_taskmgr_t *taskmgr = NULL; isc_task_t *task; isc_timermgr_t *timermgr; isc_socketmgr_t *socketmgr; @@ -2144,8 +2145,9 @@ main(int argc, char *argv[]) { fatal("can't choose between IPv4 and IPv6"); } - taskmgr = NULL; - RUNCHECK(isc_taskmgr_create(mctx, 1, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, 1); + + RUNCHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); task = NULL; RUNCHECK(isc_task_create(taskmgr, 0, &task)); timermgr = NULL; @@ -2225,6 +2227,7 @@ main(int argc, char *argv[]) { isc_task_shutdown(task); isc_task_detach(&task); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); dst_lib_destroy(); diff --git a/doc/dev/style.md b/doc/dev/style.md index a4fddefb6e..50b767da27 100644 --- a/doc/dev/style.md +++ b/doc/dev/style.md @@ -136,7 +136,7 @@ format. Private header files, describing interfaces that are for internal use within a library but not for public use, are kept in the source tree at the same level as their related C files, and often have `"_p"` in their names, -e.g. `lib/isc/task_p.h`. +e.g. `lib/isc/mem_p.h`. Header files that define modules should have a structure like the following. Note that `` MUST be included by any public header diff --git a/lib/dns/tests/dnstest.c b/lib/dns/tests/dnstest.c index a913b56a24..9562846dd2 100644 --- a/lib/dns/tests/dnstest.c +++ b/lib/dns/tests/dnstest.c @@ -63,6 +63,7 @@ isc_mem_t *dt_mctx = NULL; isc_log_t *lctx = NULL; +isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_task_t *maintask = NULL; isc_timermgr_t *timermgr = NULL; @@ -100,6 +101,9 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } @@ -113,7 +117,8 @@ create_managers(void) { isc_result_t result; ncpus = isc_os_ncpus(); - CHECK(isc_taskmgr_create(dt_mctx, ncpus, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(dt_mctx, ncpus); + CHECK(isc_taskmgr_create(dt_mctx, 0, netmgr, &taskmgr)); CHECK(isc_timermgr_create(dt_mctx, &timermgr)); CHECK(isc_socketmgr_create(dt_mctx, &socketmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); diff --git a/lib/dns/zone.c b/lib/dns/zone.c index 935b7b53c6..261c6242a7 100644 --- a/lib/dns/zone.c +++ b/lib/dns/zone.c @@ -18385,41 +18385,34 @@ dns_zonemgr_setsize(dns_zonemgr_t *zmgr, int num_zones) { /* Create or resize the zone task pools. */ if (zmgr->zonetasks == NULL) { result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, - 2, &pool); + 2, false, &pool); } else { - result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, &pool); + result = isc_taskpool_expand(&zmgr->zonetasks, ntasks, false, + &pool); } if (result == ISC_R_SUCCESS) { zmgr->zonetasks = pool; } - pool = NULL; - if (zmgr->loadtasks == NULL) { - result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, - 2, &pool); - } else { - result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, &pool); - } - - if (result == ISC_R_SUCCESS) { - zmgr->loadtasks = pool; - } - /* * We always set all tasks in the zone-load task pool to * privileged. This prevents other tasks in the system from * running while the server task manager is in privileged * mode. - * - * NOTE: If we start using task privileges for any other - * part of the system than zone tasks, then this will need to be - * revisted. In that case we'd want to turn on privileges for - * zone tasks only when we were loading, and turn them off the - * rest of the time. For now, however, it's okay to just - * set it and forget it. */ - isc_taskpool_setprivilege(zmgr->loadtasks, true); + pool = NULL; + if (zmgr->loadtasks == NULL) { + result = isc_taskpool_create(zmgr->taskmgr, zmgr->mctx, ntasks, + 2, true, &pool); + } else { + result = isc_taskpool_expand(&zmgr->loadtasks, ntasks, true, + &pool); + } + + if (result == ISC_R_SUCCESS) { + zmgr->loadtasks = pool; + } /* Create or resize the zone memory context pool. */ if (zmgr->mctxpool == NULL) { diff --git a/lib/isc/Makefile.am b/lib/isc/Makefile.am index 71712754c1..22212d9e38 100644 --- a/lib/isc/Makefile.am +++ b/lib/isc/Makefile.am @@ -224,7 +224,6 @@ libisc_la_SOURCES = \ fsaccess_common_p.h \ lib_p.h \ mem_p.h \ - task_p.h \ tls_p.h libisc_la_CPPFLAGS = \ diff --git a/lib/isc/app.c b/lib/isc/app.c index 4c30af6d70..04f4712434 100644 --- a/lib/isc/app.c +++ b/lib/isc/app.c @@ -516,11 +516,10 @@ isc_appctx_create(isc_mem_t *mctx, isc_appctx_t **ctxp) { REQUIRE(ctxp != NULL && *ctxp == NULL); ctx = isc_mem_get(mctx, sizeof(*ctx)); + *ctx = (isc_appctx_t){ .magic = 0 }; - ctx->magic = APPCTX_MAGIC; - - ctx->mctx = NULL; isc_mem_attach(mctx, &ctx->mctx); + ctx->magic = APPCTX_MAGIC; *ctxp = ctx; diff --git a/lib/isc/include/isc/netmgr.h b/lib/isc/include/isc/netmgr.h index a89436345f..5ecb92d022 100644 --- a/lib/isc/include/isc/netmgr.h +++ b/lib/isc/include/isc/netmgr.h @@ -522,3 +522,16 @@ isc_nm_listenhttp(isc_nm_t *mgr, isc_nmiface_t *iface, int backlog, isc_result_t isc_nm_http_endpoint(isc_nmsocket_t *sock, const char *uri, isc_nm_recv_cb_t cb, void *cbarg, size_t extrahandlesize); + +void +isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid); +/*%< + * Enqueue the 'task' onto the netmgr ievents queue. + * + * Requires: + * \li 'mgr' is a valid netmgr object + * \li 'task' is a valid task + * \li 'threadid' is either the preferred netmgr tid or -1, in which case + * tid will be picked randomly. The threadid is capped (by modulo) to + * maximum number of 'workers' as specifed in isc_nm_start() + */ diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index a78cec1a09..29e7c99544 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -102,12 +102,11 @@ typedef enum { isc_result_t isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp); - isc_result_t isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp, int threadid); /*%< - * Create a task. + * Create a task, optionally bound to a particular threadid. * * Notes: * @@ -138,6 +137,23 @@ isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, *\li #ISC_R_SHUTTINGDOWN */ +isc_result_t +isc_task_run(isc_task_t *task); +/*%< + * Run all the queued events for the 'task', returning + * when the queue is empty or the number of events executed + * exceeds the 'quantum' specified when the task was created. + * + * Requires: + * + *\li 'task' is a valid task. + * + * Returns: + * + *\li #ISC_R_SUCCESS + *\li #ISC_R_QUOTA + */ + void isc_task_attach(isc_task_t *source, isc_task_t **targetp); /*%< @@ -611,20 +627,13 @@ isc_task_privilege(isc_task_t *task); *****/ isc_result_t -isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, - unsigned int default_quantum, isc_nm_t *nm, +isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, isc_taskmgr_t **managerp); /*%< * Create a new task manager. * * Notes: * - *\li 'workers' in the number of worker threads to create. In general, - * the value should be close to the number of processors in the system. - * The 'workers' value is advisory only. An attempt will be made to - * create 'workers' threads, but if at least one thread creation - * succeeds, isc_taskmgr_create() may return ISC_R_SUCCESS. - * *\li If 'default_quantum' is non-zero, then it will be used as the default * quantum value when tasks are created. If zero, then an implementation * defined default quantum will be used. @@ -636,8 +645,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * *\li 'mctx' is a valid memory context. * - *\li workers > 0 - * *\li managerp != NULL && *managerp == NULL * * Ensures: @@ -651,34 +658,14 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, *\li #ISC_R_NOMEMORY *\li #ISC_R_NOTHREADS No threads could be created. *\li #ISC_R_UNEXPECTED An unexpected error occurred. - *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task + *\li #ISC_R_SHUTTINGDOWN The non-threaded, shared, task * manager shutting down. */ void -isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager); - -isc_taskmgrmode_t -isc_taskmgr_mode(isc_taskmgr_t *manager); -/*%< - * Set/get the current operating mode of the task manager. Valid modes are: - * - *\li isc_taskmgrmode_normal - *\li isc_taskmgrmode_privileged - * - * In privileged execution mode, only tasks that have had the "privilege" - * flag set via isc_task_setprivilege() can be executed. When all such - * tasks are complete, the manager automatically returns to normal mode - * and proceeds with running non-privileged ready tasks. This means it is - * necessary to have at least one privileged task waiting on the ready - * queue *before* setting the manager into privileged execution mode, - * which in turn means the task which calls this function should be in - * task-exclusive mode when it does so. - * - * Requires: - * - *\li 'manager' is a valid task manager. - */ +isc_taskmgr_attach(isc_taskmgr_t *, isc_taskmgr_t **); +void +isc_taskmgr_detach(isc_taskmgr_t *); void isc_taskmgr_destroy(isc_taskmgr_t **managerp); diff --git a/lib/isc/include/isc/taskpool.h b/lib/isc/include/isc/taskpool.h index 5a468bf8e3..94bd0c7873 100644 --- a/lib/isc/include/isc/taskpool.h +++ b/lib/isc/include/isc/taskpool.h @@ -51,7 +51,7 @@ typedef struct isc_taskpool isc_taskpool_t; isc_result_t isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, - unsigned int quantum, isc_taskpool_t **poolp); + unsigned int quantum, bool priv, isc_taskpool_t **poolp); /*%< * Create a task pool of "ntasks" tasks, each with quantum * "quantum". @@ -90,7 +90,7 @@ isc_taskpool_size(isc_taskpool_t *pool); */ isc_result_t -isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, +isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv, isc_taskpool_t **targetp); /*%< @@ -131,19 +131,6 @@ isc_taskpool_destroy(isc_taskpool_t **poolp); * \li '*poolp' is a valid task pool. */ -void -isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv); -/*%< - * Set the privilege flag on all tasks in 'pool' to 'priv'. If 'priv' is - * true, then when the task manager is set into privileged mode, only - * tasks wihin this pool will be able to execute. (Note: It is important - * to turn the pool tasks' privilege back off before the last task finishes - * executing.) - * - * Requires: - * \li 'pool' is a valid task pool. - */ - ISC_LANG_ENDDECLS #endif /* ISC_TASKPOOL_H */ diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index ed285f8367..521ed58d54 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -41,6 +41,9 @@ #define ISC_NETMGR_TID_UNKNOWN -1 +/* Must be different from ISC_NETMGR_TID_UNKNOWN */ +#define ISC_NETMGR_NON_INTERLOCKED -2 + #define ISC_NETMGR_TLSBUF_SIZE 65536 #if !defined(WIN32) @@ -174,6 +177,8 @@ typedef struct isc__networker { bool finished; isc_thread_t thread; isc_queue_t *ievents; /* incoming async events */ + isc_queue_t *ievents_priv; /* privileged async tasks */ + isc_queue_t *ievents_task; /* async tasks */ isc_queue_t *ievents_prio; /* priority async events * used for listening etc. * can be processed while @@ -236,27 +241,27 @@ struct isc_nmiface { typedef enum isc__netievent_type { netievent_udpconnect, + netievent_udpclose, netievent_udpsend, netievent_udpread, netievent_udpstop, netievent_udpcancel, - netievent_udpclose, netievent_tcpconnect, + netievent_tcpclose, netievent_tcpsend, netievent_tcpstartread, netievent_tcppauseread, netievent_tcpaccept, netievent_tcpstop, netievent_tcpcancel, - netievent_tcpclose, netievent_tcpdnsaccept, netievent_tcpdnsconnect, + netievent_tcpdnsclose, netievent_tcpdnssend, netievent_tcpdnsread, netievent_tcpdnscancel, - netievent_tcpdnsclose, netievent_tcpdnsstop, netievent_tlsclose, @@ -268,19 +273,18 @@ typedef enum isc__netievent_type { netievent_tlsdnsaccept, netievent_tlsdnsconnect, + netievent_tlsdnsclose, netievent_tlsdnssend, netievent_tlsdnsread, netievent_tlsdnscancel, - netievent_tlsdnsclose, netievent_tlsdnsstop, netievent_tlsdnscycle, netievent_tlsdnsshutdown, + netievent_httpclose, netievent_httpstop, netievent_httpsend, - netievent_httpclose, - netievent_close, netievent_shutdown, netievent_stop, netievent_pause, @@ -289,6 +293,9 @@ typedef enum isc__netievent_type { netievent_readcb, netievent_sendcb, + netievent_task, + netievent_privilegedtask, + netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -300,6 +307,8 @@ typedef enum isc__netievent_type { netievent_tlsdnslisten, netievent_resume, netievent_detach, + netievent_close, + } isc__netievent_type; typedef union { @@ -556,6 +565,36 @@ typedef struct isc__netievent__socket_quota { isc__nm_put_netievent(nm, ievent); \ } +typedef struct isc__netievent__task { + isc__netievent_type type; + isc_task_t *task; +} isc__netievent__task_t; + +#define NETIEVENT_TASK_TYPE(type) \ + typedef isc__netievent__task_t isc__netievent_##type##_t; + +#define NETIEVENT_TASK_DECL(type) \ + isc__netievent_##type##_t *isc__nm_get_netievent_##type( \ + isc_nm_t *nm, isc_task_t *task); \ + void isc__nm_put_netievent_##type(isc_nm_t *nm, \ + isc__netievent_##type##_t *ievent); + +#define NETIEVENT_TASK_DEF(type) \ + isc__netievent_##type##_t *isc__nm_get_netievent_##type( \ + isc_nm_t *nm, isc_task_t *task) { \ + isc__netievent_##type##_t *ievent = \ + isc__nm_get_netievent(nm, netievent_##type); \ + ievent->task = task; \ + \ + return (ievent); \ + } \ + \ + void isc__nm_put_netievent_##type(isc_nm_t *nm, \ + isc__netievent_##type##_t *ievent) { \ + ievent->task = NULL; \ + isc__nm_put_netievent(nm, ievent); \ + } + typedef struct isc__netievent_udpsend { NETIEVENT__SOCKET; isc_sockaddr_t peer; @@ -617,6 +656,7 @@ struct isc_nm { uint32_t nworkers; isc_mutex_t lock; isc_condition_t wkstatecond; + isc_condition_t wkpausecond; isc__networker_t *workers; isc_stats_t *stats; @@ -631,6 +671,8 @@ struct isc_nm { uint_fast32_t workers_paused; atomic_uint_fast32_t maxudp; + atomic_bool paused; + /* * Active connections are being closed and new connections are * no longer allowed. @@ -643,7 +685,7 @@ struct isc_nm { * or pause, or we'll deadlock. We have to either re-enqueue our * event or wait for the other one to finish if we want to pause. */ - atomic_bool interlocked; + atomic_int interlocked; /* * Timeout values for TCP connections, corresponding to @@ -1783,6 +1825,9 @@ NETIEVENT_TYPE(resume); NETIEVENT_TYPE(shutdown); NETIEVENT_TYPE(stop); +NETIEVENT_TASK_TYPE(task); +NETIEVENT_TASK_TYPE(privilegedtask); + /* Now declared the helper functions */ NETIEVENT_SOCKET_DECL(close); @@ -1846,6 +1891,9 @@ NETIEVENT_DECL(resume); NETIEVENT_DECL(shutdown); NETIEVENT_DECL(stop); +NETIEVENT_TASK_DECL(task); +NETIEVENT_TASK_DECL(privilegedtask); + void isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); void diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 34828e0c2b..c2891f1a63 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -145,10 +146,12 @@ static bool process_queue(isc__networker_t *worker, isc_queue_t *queue); static bool process_priority_queue(isc__networker_t *worker); -static bool -process_normal_queue(isc__networker_t *worker); static void -process_queues(isc__networker_t *worker); +process_privilege_queue(isc__networker_t *worker); +static void +process_tasks_queue(isc__networker_t *worker); +static void +process_normal_queue(isc__networker_t *worker); static void isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0); @@ -217,6 +220,8 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_nm_t *mgr = NULL; char name[32]; + REQUIRE(workers > 0); + #ifdef WIN32 isc__nm_winsock_initialize(); #endif /* WIN32 */ @@ -227,9 +232,10 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_mem_attach(mctx, &mgr->mctx); isc_mutex_init(&mgr->lock); isc_condition_init(&mgr->wkstatecond); + isc_condition_init(&mgr->wkpausecond); isc_refcount_init(&mgr->references, 1); atomic_init(&mgr->maxudp, 0); - atomic_init(&mgr->interlocked, false); + atomic_init(&mgr->interlocked, ISC_NETMGR_NON_INTERLOCKED); #ifdef NETMGR_TRACE ISC_LIST_INIT(mgr->active_sockets); @@ -280,6 +286,8 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc_condition_init(&worker->cond); worker->ievents = isc_queue_new(mgr->mctx, 128); + worker->ievents_priv = isc_queue_new(mgr->mctx, 128); + worker->ievents_task = isc_queue_new(mgr->mctx, 128); worker->ievents_prio = isc_queue_new(mgr->mctx, 128); worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE); worker->sendbuf = isc_mem_get(mctx, ISC_NETMGR_SENDBUF_SIZE); @@ -339,6 +347,11 @@ nm_destroy(isc_nm_t **mgr0) { isc_mempool_put(mgr->evpool, ievent); } + INSIST(isc_queue_dequeue(worker->ievents_priv) == + (uintptr_t)NULL); + INSIST(isc_queue_dequeue(worker->ievents_task) == + (uintptr_t)NULL); + while ((ievent = (isc__netievent_t *)isc_queue_dequeue( worker->ievents_prio)) != NULL) { @@ -349,6 +362,8 @@ nm_destroy(isc_nm_t **mgr0) { INSIST(r == 0); isc_queue_destroy(worker->ievents); + isc_queue_destroy(worker->ievents_priv); + isc_queue_destroy(worker->ievents_task); isc_queue_destroy(worker->ievents_prio); isc_mutex_destroy(&worker->lock); isc_condition_destroy(&worker->cond); @@ -365,6 +380,7 @@ nm_destroy(isc_nm_t **mgr0) { } isc_condition_destroy(&mgr->wkstatecond); + isc_condition_destroy(&mgr->wkpausecond); isc_mutex_destroy(&mgr->lock); isc_mempool_destroy(&mgr->evpool); @@ -385,42 +401,58 @@ nm_destroy(isc_nm_t **mgr0) { void isc_nm_pause(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); - REQUIRE(!isc__nm_in_netthread()); + uint_fast32_t pausing = 0; + REQUIRE(!atomic_load(&mgr->paused)); isc__nm_acquire_interlocked_force(mgr); for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - isc__netievent_resume_t *event = - isc__nm_get_netievent_pause(mgr); - isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); + if (i != (size_t)isc_nm_tid()) { + isc__netievent_resume_t *event = + isc__nm_get_netievent_pause(mgr); + pausing++; + isc__nm_enqueue_ievent(worker, + (isc__netievent_t *)event); + } else { + isc__nm_async_pause(worker, NULL); + } } LOCK(&mgr->lock); - while (mgr->workers_paused != mgr->workers_running) { + while (mgr->workers_paused != pausing) { WAIT(&mgr->wkstatecond, &mgr->lock); } + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ false }, + true)); UNLOCK(&mgr->lock); } void isc_nm_resume(isc_nm_t *mgr) { REQUIRE(VALID_NM(mgr)); - REQUIRE(!isc__nm_in_netthread()); + REQUIRE(atomic_load(&mgr->paused)); for (size_t i = 0; i < mgr->nworkers; i++) { isc__networker_t *worker = &mgr->workers[i]; - isc__netievent_resume_t *event = - isc__nm_get_netievent_resume(mgr); - isc__nm_enqueue_ievent(worker, (isc__netievent_t *)event); + if (i != (size_t)isc_nm_tid()) { + isc__netievent_resume_t *event = + isc__nm_get_netievent_resume(mgr); + isc__nm_enqueue_ievent(worker, + (isc__netievent_t *)event); + } else { + isc__nm_async_resume(worker, NULL); + } } LOCK(&mgr->lock); while (mgr->workers_paused != 0) { WAIT(&mgr->wkstatecond, &mgr->lock); } + REQUIRE(atomic_compare_exchange_strong(&mgr->paused, &(bool){ true }, + false)); + BROADCAST(&mgr->wkpausecond); UNLOCK(&mgr->lock); - isc__nm_drop_interlocked(mgr); } @@ -465,7 +497,6 @@ void isc_nm_destroy(isc_nm_t **mgr0) { isc_nm_t *mgr = NULL; int counter = 0; - uint_fast32_t references; REQUIRE(mgr0 != NULL); REQUIRE(VALID_NM(*mgr0)); @@ -480,9 +511,7 @@ isc_nm_destroy(isc_nm_t **mgr0) { /* * Wait for the manager to be dereferenced elsewhere. */ - while ((references = isc_refcount_current(&mgr->references)) > 1 && - counter++ < 1000) - { + while (isc_refcount_current(&mgr->references) > 1 && counter++ < 1000) { #ifdef WIN32 _sleep(10); #else /* ifdef WIN32 */ @@ -491,10 +520,23 @@ isc_nm_destroy(isc_nm_t **mgr0) { } #ifdef NETMGR_TRACE - isc__nm_dump_active(mgr); + if (isc_refcount_current(&mgr->references) > 1) { + isc__nm_dump_active(mgr); + INSIST(0); + ISC_UNREACHABLE(); + } #endif - INSIST(references == 1); + /* + * Now just patiently wait + */ + while (isc_refcount_current(&mgr->references) > 1) { +#ifdef WIN32 + _sleep(10); +#else /* ifdef WIN32 */ + usleep(10000); +#endif /* ifdef WIN32 */ + } /* * Detach final reference. @@ -545,7 +587,30 @@ isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle, /* * nm_thread is a single worker thread, that runs uv_run event loop * until asked to stop. + * + * There are four queues for asynchronous events: + * + * 1. priority queue - netievents on the priority queue are run even when + * the taskmgr enters exclusive mode and the netmgr is paused. This + * is needed to properly start listening on the interfaces, free + * resources on shutdown, or resume from a pause. + * + * 2. privileged task queue - only privileged tasks are queued here and + * this is the first queue that gets processed when network manager + * is unpaused using isc_nm_resume(). All netmgr workers need to + * clean the privileged task queue before they all proceed to normal + * operation. Both task queues are processed when the workers are + * shutting down. + * + * 3. task queue - only (traditional) tasks are scheduled here, and this + * queue and the privileged task queue are both processed when the + * netmgr workers are finishing. This is needed to process the task + * shutdown events. + * + * 4. normal queue - this is the queue with netmgr events, e.g. reading, + * sending, callbacks, etc. */ + static isc_threadresult_t nm_thread(isc_threadarg_t worker0) { isc__networker_t *worker = (isc__networker_t *)worker0; @@ -555,18 +620,26 @@ nm_thread(isc_threadarg_t worker0) { isc_thread_setaffinity(isc__nm_tid_v); while (true) { + /* + * uv_run() runs async_cb() in a loop, which processes + * all four event queues until a "pause" or "stop" event + * is encountered. On pause, we process only priority and + * privileged events until resuming. + */ int r = uv_run(&worker->loop, UV_RUN_DEFAULT); - /* There's always the async handle until we are done */ INSIST(r > 0 || worker->finished); if (worker->paused) { - LOCK(&worker->lock); - /* We need to lock the worker first otherwise - * isc_nm_resume() might slip in before WAIT() in the - * while loop starts and the signal never gets delivered - * and we are forever stuck in the paused loop. - */ + INSIST(atomic_load(&mgr->interlocked) != isc_nm_tid()); + /* + * We need to lock the worker first; otherwise + * isc_nm_resume() might slip in before WAIT() in + * the while loop starts, then the signal never + * gets delivered and we are stuck forever in the + * paused loop. + */ + LOCK(&worker->lock); LOCK(&mgr->lock); mgr->workers_paused++; SIGNAL(&mgr->wkstatecond); @@ -574,15 +647,28 @@ nm_thread(isc_threadarg_t worker0) { while (worker->paused) { WAIT(&worker->cond, &worker->lock); + UNLOCK(&worker->lock); (void)process_priority_queue(worker); + LOCK(&worker->lock); } LOCK(&mgr->lock); mgr->workers_paused--; SIGNAL(&mgr->wkstatecond); UNLOCK(&mgr->lock); - UNLOCK(&worker->lock); + + /* + * All workers must run the privileged event + * queue before we resume from pause. + */ + process_privilege_queue(worker); + + LOCK(&mgr->lock); + while (atomic_load(&mgr->paused)) { + WAIT(&mgr->wkpausecond, &mgr->lock); + } + UNLOCK(&mgr->lock); } if (r == 0) { @@ -593,11 +679,24 @@ nm_thread(isc_threadarg_t worker0) { INSIST(!worker->finished); /* - * Empty the async queue. + * We've fully resumed from pause. Drain the normal + * asynchronous event queues before resuming the uv_run() + * loop. (This is not strictly necessary, it just ensures + * that all pending events are processed before another + * pause can slip in.) */ - process_queues(worker); + process_tasks_queue(worker); + process_normal_queue(worker); } + /* + * We are shutting down. Process the task queues + * (they may include shutdown events) but do not process + * the netmgr event queue. + */ + process_privilege_queue(worker); + process_tasks_queue(worker); + LOCK(&mgr->lock); mgr->workers_running--; SIGNAL(&mgr->wkstatecond); @@ -607,15 +706,26 @@ nm_thread(isc_threadarg_t worker0) { } /* - * async_cb is a universal callback for 'async' events sent to event loop. + * async_cb() is a universal callback for 'async' events sent to event loop. * It's the only way to safely pass data to the libuv event loop. We use a - * single async event and a lockless queue of 'isc__netievent_t' structures - * passed from other threads. + * single async event and a set of lockless queues of 'isc__netievent_t' + * structures passed from other threads. */ static void async_cb(uv_async_t *handle) { isc__networker_t *worker = (isc__networker_t *)handle->loop->data; - process_queues(worker); + + /* + * process_priority_queue() returns false when pausing or stopping, + * so we don't want to process the other queues in that case. + */ + if (!process_priority_queue(worker)) { + return; + } + + process_privilege_queue(worker); + process_tasks_queue(worker); + process_normal_queue(worker); } static void @@ -624,13 +734,13 @@ isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0) { worker->finished = true; /* Close the async handler */ uv_close((uv_handle_t *)&worker->async, NULL); - /* uv_stop(&worker->loop); */ } static void isc__nm_async_pause(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == false); + worker->paused = true; uv_stop(&worker->loop); } @@ -639,25 +749,78 @@ static void isc__nm_async_resume(isc__networker_t *worker, isc__netievent_t *ev0) { UNUSED(ev0); REQUIRE(worker->paused == true); + worker->paused = false; } +void +isc_nm_task_enqueue(isc_nm_t *nm, isc_task_t *task, int threadid) { + isc__netievent_t *event = NULL; + int tid; + isc__networker_t *worker = NULL; + + if (threadid == -1) { + tid = (int)isc_random_uniform(nm->nworkers); + } else { + tid = threadid % nm->nworkers; + } + + worker = &nm->workers[tid]; + + if (isc_task_privilege(task)) { + event = (isc__netievent_t *) + isc__nm_get_netievent_privilegedtask(nm, task); + } else { + event = (isc__netievent_t *)isc__nm_get_netievent_task(nm, + task); + } + + isc__nm_enqueue_ievent(worker, event); +} + +#define isc__nm_async_privilegedtask(worker, ev0) \ + isc__nm_async_task(worker, ev0) + +static void +isc__nm_async_task(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_task_t *ievent = (isc__netievent_task_t *)ev0; + isc_result_t result; + + UNUSED(worker); + + result = isc_task_run(ievent->task); + + switch (result) { + case ISC_R_QUOTA: + isc_nm_task_enqueue(worker->mgr, (isc_task_t *)ievent->task, + isc_nm_tid()); + return; + case ISC_R_SUCCESS: + return; + default: + INSIST(0); + ISC_UNREACHABLE(); + } +} + static bool process_priority_queue(isc__networker_t *worker) { return (process_queue(worker, worker->ievents_prio)); } -static bool -process_normal_queue(isc__networker_t *worker) { - return (process_queue(worker, worker->ievents)); +static void +process_privilege_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents_priv); } static void -process_queues(isc__networker_t *worker) { - if (!process_priority_queue(worker)) { - return; - } - (void)process_normal_queue(worker); +process_tasks_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents_task); +} + +static void +process_normal_queue(isc__networker_t *worker) { + (void)process_queue(worker, worker->ievents); } /* @@ -690,6 +853,9 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { /* Don't process more ievents when we are stopping */ NETIEVENT_CASE_NOMORE(stop); + NETIEVENT_CASE(privilegedtask); + NETIEVENT_CASE(task); + NETIEVENT_CASE(udpconnect); NETIEVENT_CASE(udplisten); NETIEVENT_CASE(udpstop); @@ -749,7 +915,6 @@ process_netievent(isc__networker_t *worker, isc__netievent_t *ievent) { NETIEVENT_CASE(shutdown); NETIEVENT_CASE(resume); NETIEVENT_CASE_NOMORE(pause); - default: INSIST(0); ISC_UNREACHABLE(); @@ -843,6 +1008,9 @@ NETIEVENT_DEF(resume); NETIEVENT_DEF(shutdown); NETIEVENT_DEF(stop); +NETIEVENT_TASK_DEF(task); +NETIEVENT_TASK_DEF(privilegedtask); + void isc__nm_maybe_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { @@ -869,6 +1037,10 @@ isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) { isc_queue_enqueue(worker->ievents_prio, (uintptr_t)event); SIGNAL(&worker->cond); UNLOCK(&worker->lock); + } else if (event->type == netievent_privilegedtask) { + isc_queue_enqueue(worker->ievents_priv, (uintptr_t)event); + } else if (event->type == netievent_task) { + isc_queue_enqueue(worker->ievents_task, (uintptr_t)event); } else { isc_queue_enqueue(worker->ievents, (uintptr_t)event); } @@ -2537,8 +2709,9 @@ isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) { bool isc__nm_acquire_interlocked(isc_nm_t *mgr) { LOCK(&mgr->lock); - bool success = atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ false }, true); + bool success = atomic_compare_exchange_strong( + &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, + isc_nm_tid()); UNLOCK(&mgr->lock); return (success); } @@ -2546,9 +2719,9 @@ isc__nm_acquire_interlocked(isc_nm_t *mgr) { void isc__nm_drop_interlocked(isc_nm_t *mgr) { LOCK(&mgr->lock); - bool success = atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ true }, false); - INSIST(success); + int tid = atomic_exchange(&mgr->interlocked, + ISC_NETMGR_NON_INTERLOCKED); + INSIST(tid != ISC_NETMGR_NON_INTERLOCKED); BROADCAST(&mgr->wkstatecond); UNLOCK(&mgr->lock); } @@ -2556,8 +2729,9 @@ isc__nm_drop_interlocked(isc_nm_t *mgr) { void isc__nm_acquire_interlocked_force(isc_nm_t *mgr) { LOCK(&mgr->lock); - while (!atomic_compare_exchange_strong(&mgr->interlocked, - &(bool){ false }, true)) + while (!atomic_compare_exchange_strong( + &mgr->interlocked, &(int){ ISC_NETMGR_NON_INTERLOCKED }, + isc_nm_tid())) { WAIT(&mgr->wkstatecond, &mgr->lock); } diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index e4d7182999..d86e1866e5 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -433,8 +433,8 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcplisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -651,15 +651,7 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tcp_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tcp_parent(sock); } static void @@ -1200,6 +1192,8 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tcp_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1212,8 +1206,13 @@ stop_tcp_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1228,24 +1227,10 @@ stop_tcp_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tcp_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tcpstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 7b2efa419e..f8005bb4d9 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -407,8 +407,8 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -626,15 +626,7 @@ isc__nm_async_tcpdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tcpdns_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tcpdns_parent(sock); } void @@ -1230,6 +1222,8 @@ timer_close_cb(uv_handle_t *timer) { static void stop_tcpdns_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tcpdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1242,8 +1236,13 @@ stop_tcpdns_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1258,24 +1257,10 @@ stop_tcpdns_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tcpdns_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tcpdnsstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/tlsdns.c b/lib/isc/netmgr/tlsdns.c index 7b80dcc43a..ff83fa4624 100644 --- a/lib/isc/netmgr/tlsdns.c +++ b/lib/isc/netmgr/tlsdns.c @@ -475,8 +475,8 @@ isc_nm_listentlsdns(isc_nm_t *mgr, isc_nmiface_t *iface, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tlsdnslisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -770,16 +770,7 @@ isc__nm_async_tlsdnsstop(isc__networker_t *worker, isc__netievent_t *ev0) { return; } - /* - * If network manager is interlocked, re-enqueue the event for - * later. - */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_tlsdns_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_tlsdns_parent(sock); } void @@ -1777,6 +1768,8 @@ timer_close_cb(uv_handle_t *handle) { static void stop_tlsdns_child(isc_nmsocket_t *sock) { + bool last_child = false; + REQUIRE(sock->type == isc_nm_tlsdnssocket); REQUIRE(sock->tid == isc_nm_tid()); @@ -1789,8 +1782,13 @@ stop_tlsdns_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1806,24 +1804,10 @@ stop_tlsdns_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_tlsdns_child(csock); - continue; - } - ievent = isc__nm_get_netievent_tlsdnsstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[csock->tid], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index b7bbe6598e..eb57c954be 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -140,8 +140,8 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_udplisten(mgr, csock); - isc__nm_enqueue_ievent(&mgr->workers[i], - (isc__netievent_t *)ievent); + isc__nm_maybe_enqueue_ievent(&mgr->workers[i], + (isc__netievent_t *)ievent); } #if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) @@ -324,12 +324,7 @@ isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0) { /* * If network manager is paused, re-enqueue the event for later. */ - if (!isc__nm_acquire_interlocked(sock->mgr)) { - enqueue_stoplistening(sock); - } else { - stop_udp_parent(sock); - isc__nm_drop_interlocked(sock->mgr); - } + stop_udp_parent(sock); } /* @@ -435,20 +430,13 @@ void isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, isc_nm_cb_t cb, void *cbarg) { isc_nmsocket_t *sock = handle->sock; - isc_nmsocket_t *psock = NULL, *rsock = sock; + isc_nmsocket_t *rsock = NULL; isc_sockaddr_t *peer = &handle->peer; isc__nm_uvreq_t *uvreq = NULL; uint32_t maxudp = atomic_load(&sock->mgr->maxudp); int ntid; - uvreq = isc__nm_uvreq_get(sock->mgr, sock); - uvreq->uvbuf.base = (char *)region->base; - uvreq->uvbuf.len = region->length; - - isc_nmhandle_attach(handle, &uvreq->handle); - - uvreq->cb.send = cb; - uvreq->cbarg = cbarg; + INSIST(sock->type == isc_nm_udpsocket); /* * We're simulating a firewall blocking UDP packets bigger than @@ -459,41 +447,45 @@ isc__nm_udp_send(isc_nmhandle_t *handle, const isc_region_t *region, * we need to do so here. */ if (maxudp != 0 && region->length > maxudp) { - isc__nm_uvreq_put(&uvreq, sock); - isc_nmhandle_detach(&handle); /* FIXME? */ + isc_nmhandle_detach(&handle); return; } - if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) { - INSIST(sock->parent != NULL); - psock = sock->parent; - } else if (sock->type == isc_nm_udplistener) { - psock = sock; - } else if (!atomic_load(&sock->client)) { - INSIST(0); - ISC_UNREACHABLE(); - } - - /* - * If we're in the network thread, we can send directly. If the - * handle is associated with a UDP socket, we can reuse its - * thread (assuming CPU affinity). Otherwise, pick a thread at - * random. - */ - if (isc__nm_in_netthread()) { - ntid = isc_nm_tid(); - } else if (sock->type == isc_nm_udpsocket && - !atomic_load(&sock->client)) { - ntid = sock->tid; + if (atomic_load(&sock->client)) { + /* + * When we are sending from the client socket, we directly use + * the socket provided. + */ + rsock = sock; + goto send; } else { - ntid = (int)isc_random_uniform(sock->nchildren); + /* + * When we are sending from the server socket, we either use the + * socket associated with the network thread we are in, or we + * use the thread from the socket associated with the handle. + */ + INSIST(sock->parent != NULL); + + if (isc__nm_in_netthread()) { + ntid = isc_nm_tid(); + } else { + ntid = sock->tid; + } + rsock = &sock->parent->children[ntid]; } - if (psock != NULL) { - rsock = &psock->children[ntid]; - } +send: + uvreq = isc__nm_uvreq_get(rsock->mgr, rsock); + uvreq->uvbuf.base = (char *)region->base; + uvreq->uvbuf.len = region->length; + + isc_nmhandle_attach(handle, &uvreq->handle); + + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; if (isc_nm_tid() == rsock->tid) { + REQUIRE(rsock->tid == isc_nm_tid()); isc__netievent_udpsend_t ievent = { .sock = rsock, .req = uvreq, .peer = *peer }; @@ -544,6 +536,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); if (status < 0) { result = isc__nm_uverr2result(status); @@ -976,6 +969,8 @@ stop_udp_child(isc_nmsocket_t *sock) { REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->tid == isc_nm_tid()); + bool last_child = false; + if (!atomic_compare_exchange_strong(&sock->closing, &(bool){ false }, true)) { return; @@ -985,8 +980,13 @@ stop_udp_child(isc_nmsocket_t *sock) { LOCK(&sock->parent->lock); sock->parent->rchildren -= 1; + last_child = (sock->parent->rchildren == 0); UNLOCK(&sock->parent->lock); - BROADCAST(&sock->parent->cond); + + if (last_child) { + atomic_store(&sock->parent->closed, true); + isc__nmsocket_prep_destroy(sock->parent); + } } static void @@ -1001,24 +1001,10 @@ stop_udp_parent(isc_nmsocket_t *sock) { atomic_store(&csock->active, false); - if (csock->tid == isc_nm_tid()) { - stop_udp_child(csock); - continue; - } - ievent = isc__nm_get_netievent_udpstop(sock->mgr, csock); isc__nm_enqueue_ievent(&sock->mgr->workers[i], (isc__netievent_t *)ievent); } - - LOCK(&sock->lock); - while (sock->rchildren > 0) { - WAIT(&sock->cond, &sock->lock); - } - atomic_store(&sock->closed, true); - UNLOCK(&sock->lock); - - isc__nmsocket_prep_destroy(sock); } static void diff --git a/lib/isc/task.c b/lib/isc/task.c index b5dccb7b28..b8996a7f0e 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -61,14 +61,11 @@ */ #ifdef ISC_TASK_TRACE -#define XTRACE(m) \ - fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", task, \ - isc_thread_self(), (m)) -#define XTTRACE(t, m) \ - fprintf(stderr, "task %p thread %" PRIuPTR ": %s\n", (t), \ - isc_thread_self(), (m)) -#define XTHREADTRACE(m) \ - fprintf(stderr, "thread %" PRIuPTR ": %s\n", isc_thread_self(), (m)) +#define XTRACE(m) \ + fprintf(stderr, "task %p thread %zu: %s\n", task, isc_tid_v, (m)) +#define XTTRACE(t, m) \ + fprintf(stderr, "task %p thread %zu: %s\n", (t), isc_tid_v, (m)) +#define XTHREADTRACE(m) fprintf(stderr, "thread %zu: %s\n", isc_tid_v, (m)) #else /* ifdef ISC_TASK_TRACE */ #define XTRACE(m) #define XTTRACE(t, m) @@ -97,13 +94,12 @@ static const char *statenames[] = { #define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') #define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) -typedef struct isc__taskqueue isc__taskqueue_t; - struct isc_task { /* Not locked. */ unsigned int magic; isc_taskmgr_t *manager; isc_mutex_t lock; + int threadid; /* Locked by task lock. */ task_state_t state; int pause_cnt; @@ -116,14 +112,11 @@ struct isc_task { isc_time_t tnow; char name[16]; void *tag; - unsigned int threadid; bool bound; /* Protected by atomics */ atomic_uint_fast32_t flags; /* Locked by task manager lock. */ LINK(isc_task_t) link; - LINK(isc_task_t) ready_link; - LINK(isc_task_t) ready_priority_link; }; #define TASK_F_SHUTTINGDOWN 0x01 @@ -140,39 +133,20 @@ struct isc_task { #define TASK_MANAGER_MAGIC ISC_MAGIC('T', 'S', 'K', 'M') #define VALID_MANAGER(m) ISC_MAGIC_VALID(m, TASK_MANAGER_MAGIC) -typedef ISC_LIST(isc_task_t) isc__tasklist_t; - -struct isc__taskqueue { - /* Everything locked by lock */ - isc_mutex_t lock; - isc__tasklist_t ready_tasks; - isc__tasklist_t ready_priority_tasks; - isc_condition_t work_available; - isc_thread_t thread; - unsigned int threadid; - isc_taskmgr_t *manager; -}; - struct isc_taskmgr { /* Not locked. */ unsigned int magic; + isc_refcount_t references; isc_mem_t *mctx; isc_mutex_t lock; - isc_mutex_t halt_lock; - isc_condition_t halt_cond; - unsigned int workers; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; - atomic_uint_fast32_t curq; atomic_uint_fast32_t tasks_count; - isc__taskqueue_t *queues; isc_nm_t *nm; /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc_task_t) tasks; - atomic_uint_fast32_t mode; - atomic_bool pause_req; atomic_bool exclusive_req; atomic_bool exiting; @@ -188,11 +162,6 @@ struct isc_taskmgr { isc_task_t *excl; }; -void -isc__taskmgr_pause(isc_taskmgr_t *manager0); -void -isc__taskmgr_resume(isc_taskmgr_t *manager0); - #define DEFAULT_DEFAULT_QUANTUM 25 #define FINISHED(m) \ (atomic_load_relaxed(&((m)->exiting)) && \ @@ -210,34 +179,15 @@ void isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task); isc_result_t isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp); -static inline bool -empty_readyq(isc_taskmgr_t *manager, int c); - -static inline isc_task_t * -pop_readyq(isc_taskmgr_t *manager, int c); - -static inline void -push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c); - -static inline void -wake_all_queues(isc_taskmgr_t *manager); /*** *** Tasks. ***/ -static inline void -wake_all_queues(isc_taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { - LOCK(&manager->queues[i].lock); - BROADCAST(&manager->queues[i].work_available); - UNLOCK(&manager->queues[i].lock); - } -} - static void task_finished(isc_task_t *task) { isc_taskmgr_t *manager = task->manager; + isc_mem_t *mctx = manager->mctx; REQUIRE(EMPTY(task->events)); REQUIRE(task->nevents == 0); REQUIRE(EMPTY(task->on_shutdown)); @@ -251,39 +201,35 @@ task_finished(isc_task_t *task) { UNLINK(manager->tasks, task, link); atomic_fetch_sub(&manager->tasks_count, 1); UNLOCK(&manager->lock); - if (FINISHED(manager)) { - /* - * All tasks have completed and the - * task manager is exiting. Wake up - * any idle worker threads so they - * can exit. - */ - wake_all_queues(manager); - } + isc_mutex_destroy(&task->lock); task->magic = 0; - isc_mem_put(manager->mctx, task, sizeof(*task)); + isc_mem_put(mctx, task, sizeof(*task)); + + isc_taskmgr_detach(manager); } isc_result_t -isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, +isc_task_create(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp) { - return (isc_task_create_bound(manager0, quantum, taskp, -1)); + return (isc_task_create_bound(manager, quantum, taskp, -1)); } isc_result_t -isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, +isc_task_create_bound(isc_taskmgr_t *manager, unsigned int quantum, isc_task_t **taskp, int threadid) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; isc_task_t *task; bool exiting; REQUIRE(VALID_MANAGER(manager)); REQUIRE(taskp != NULL && *taskp == NULL); - task = isc_mem_get(manager->mctx, sizeof(*task)); XTRACE("isc_task_create"); - task->manager = manager; + + task = isc_mem_get(manager->mctx, sizeof(*task)); + *task = (isc_task_t){ 0 }; + + isc_taskmgr_attach(manager, &task->manager); if (threadid == -1) { /* @@ -292,14 +238,14 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, * randomly or specified by isc_task_sendto. */ task->bound = false; - task->threadid = 0; + task->threadid = -1; } else { /* * Task is pinned to a queue, it'll always be run * by a specific thread. */ task->bound = true; - task->threadid = threadid % manager->workers; + task->threadid = threadid; } isc_mutex_init(&task->lock); @@ -317,8 +263,6 @@ isc_task_create_bound(isc_taskmgr_t *manager0, unsigned int quantum, memset(task->name, 0, sizeof(task->name)); task->tag = NULL; INIT_LINK(task, link); - INIT_LINK(task, ready_link); - INIT_LINK(task, ready_priority_link); exiting = false; LOCK(&manager->lock); @@ -405,18 +349,10 @@ task_shutdown(isc_task_t *task) { static inline void task_ready(isc_task_t *task) { isc_taskmgr_t *manager = task->manager; - bool has_privilege = isc_task_privilege(task); - REQUIRE(VALID_MANAGER(manager)); XTRACE("task_ready"); - LOCK(&manager->queues[task->threadid].lock); - push_readyq(manager, task, task->threadid); - if (atomic_load(&manager->mode) == isc_taskmgrmode_normal || - has_privilege) { - SIGNAL(&manager->queues[task->threadid].work_available); - } - UNLOCK(&manager->queues[task->threadid].lock); + isc_nm_task_enqueue(manager->nm, task, task->threadid); } static inline bool @@ -491,6 +427,12 @@ task_send(isc_task_t *task, isc_event_t **eventp, int c) { XTRACE("task_send"); + if (task->bound) { + c = task->threadid; + } else if (c < 0) { + c = -1; + } + if (task->state == task_state_idle) { was_idle = true; task->threadid = c; @@ -534,14 +476,6 @@ isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c) { * some processing is deferred until after the lock is released. */ LOCK(&task->lock); - /* If task is bound ignore provided cpu. */ - if (task->bound) { - c = task->threadid; - } else if (c < 0) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed); - } - c %= task->manager->workers; was_idle = task_send(task, eventp, c); UNLOCK(&task->lock); @@ -581,13 +515,6 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { XTRACE("isc_task_sendanddetach"); LOCK(&task->lock); - if (task->bound) { - c = task->threadid; - } else if (c < 0) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed); - } - c %= task->manager->workers; idle1 = task_send(task, eventp, c); idle2 = task_detach(task); UNLOCK(&task->lock); @@ -747,8 +674,7 @@ isc_task_unsendrange(isc_task_t *task, void *sender, isc_eventtype_t first, XTRACE("isc_task_unsendrange"); - return (dequeue_events((isc_task_t *)task, sender, first, last, tag, - events, false)); + return (dequeue_events(task, sender, first, last, tag, events, false)); } unsigned int @@ -760,8 +686,7 @@ isc_task_unsend(isc_task_t *task, void *sender, isc_eventtype_t type, void *tag, XTRACE("isc_task_unsend"); - return (dequeue_events((isc_task_t *)task, sender, type, type, tag, - events, false)); + return (dequeue_events(task, sender, type, type, tag, events, false)); } isc_result_t @@ -881,499 +806,184 @@ isc_task_getcurrenttimex(isc_task_t *task, isc_time_t *t) { *** Task Manager. ***/ -/* - * Return true if the current ready list for the manager, which is - * either ready_tasks or the ready_priority_tasks, depending on whether - * the manager is currently in normal or privileged execution mode. - * - * Caller must hold the task manager lock. - */ -static inline bool -empty_readyq(isc_taskmgr_t *manager, int c) { - isc__tasklist_t queue; +static isc_result_t +task_run(isc_task_t *task) { + unsigned int dispatch_count = 0; + bool finished = false; + isc_event_t *event = NULL; + isc_result_t result = ISC_R_SUCCESS; - if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) { - queue = manager->queues[c].ready_tasks; - } else { - queue = manager->queues[c].ready_priority_tasks; - } - return (EMPTY(queue)); -} - -/* - * Dequeue and return a pointer to the first task on the current ready - * list for the manager. - * If the task is privileged, dequeue it from the other ready list - * as well. - * - * Caller must hold the task manager lock. - */ -static inline isc_task_t * -pop_readyq(isc_taskmgr_t *manager, int c) { - isc_task_t *task; - - if (atomic_load_relaxed(&manager->mode) == isc_taskmgrmode_normal) { - task = HEAD(manager->queues[c].ready_tasks); - } else { - task = HEAD(manager->queues[c].ready_priority_tasks); - } - - if (task != NULL) { - DEQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if (ISC_LINK_LINKED(task, ready_priority_link)) { - DEQUEUE(manager->queues[c].ready_priority_tasks, task, - ready_priority_link); - } - } - - return (task); -} - -/* - * Push 'task' onto the ready_tasks queue. If 'task' has the privilege - * flag set, then also push it onto the ready_priority_tasks queue. - * - * Caller must hold the task queue lock. - */ -static inline void -push_readyq(isc_taskmgr_t *manager, isc_task_t *task, int c) { - if (ISC_LINK_LINKED(task, ready_link)) { - return; - } - ENQUEUE(manager->queues[c].ready_tasks, task, ready_link); - if (TASK_PRIVILEGED(task)) { - ENQUEUE(manager->queues[c].ready_priority_tasks, task, - ready_priority_link); - } - atomic_fetch_add_explicit(&manager->tasks_ready, 1, - memory_order_acquire); -} - -static void -dispatch(isc_taskmgr_t *manager, unsigned int threadid) { - isc_task_t *task; - - REQUIRE(VALID_MANAGER(manager)); - - /* Wait for everything to initialize */ - LOCK(&manager->lock); - UNLOCK(&manager->lock); + REQUIRE(VALID_TASK(task)); + LOCK(&task->lock); /* - * Again we're trying to hold the lock for as short a time as possible - * and to do as little locking and unlocking as possible. - * - * In both while loops, the appropriate lock must be held before the - * while body starts. Code which acquired the lock at the top of - * the loop would be more readable, but would result in a lot of - * extra locking. Compare: - * - * Straightforward: - * - * LOCK(); - * ... - * UNLOCK(); - * while (expression) { - * LOCK(); - * ... - * UNLOCK(); - * - * Unlocked part here... - * - * LOCK(); - * ... - * UNLOCK(); - * } - * - * Note how if the loop continues we unlock and then immediately lock. - * For N iterations of the loop, this code does 2N+1 locks and 2N+1 - * unlocks. Also note that the lock is not held when the while - * condition is tested, which may or may not be important, depending - * on the expression. - * - * As written: - * - * LOCK(); - * while (expression) { - * ... - * UNLOCK(); - * - * Unlocked part here... - * - * LOCK(); - * ... - * } - * UNLOCK(); - * - * For N iterations of the loop, this code does N+1 locks and N+1 - * unlocks. The while expression is always protected by the lock. + * It is possible because that we have a paused task in the queue - it + * might have been paused in the meantime and we never hold both queue + * and task lock to avoid deadlocks, just bail then. */ - LOCK(&manager->queues[threadid].lock); + if (task->state != task_state_ready) { + UNLOCK(&task->lock); + return (ISC_R_SUCCESS); + } - while (!FINISHED(manager)) { - /* - * For reasons similar to those given in the comment in - * isc_task_send() above, it is safe for us to dequeue - * the task while only holding the manager lock, and then - * change the task to running state while only holding the - * task lock. - * - * If a pause has been requested, don't do any work - * until it's been released. - */ - while ((empty_readyq(manager, threadid) && - !atomic_load_relaxed(&manager->pause_req) && - !atomic_load_relaxed(&manager->exclusive_req)) && - !FINISHED(manager)) - { - XTHREADTRACE("wait"); - XTHREADTRACE(atomic_load_relaxed(&manager->pause_req) - ? "paused" - : "notpaused"); - XTHREADTRACE( - atomic_load_relaxed(&manager->exclusive_req) - ? "excreq" - : "notexcreq"); - WAIT(&manager->queues[threadid].work_available, - &manager->queues[threadid].lock); - XTHREADTRACE("awake"); - } - XTHREADTRACE("working"); + INSIST(task->state == task_state_ready); + task->state = task_state_running; + XTRACE("running"); + XTRACE(task->name); + TIME_NOW(&task->tnow); + task->now = isc_time_seconds(&task->tnow); - if (atomic_load_relaxed(&manager->pause_req) || - atomic_load_relaxed(&manager->exclusive_req)) - { - UNLOCK(&manager->queues[threadid].lock); - XTHREADTRACE("halting"); + while (true) { + if (!EMPTY(task->events)) { + event = HEAD(task->events); + DEQUEUE(task->events, event, ev_link); + task->nevents--; /* - * Switching to exclusive mode is done as a - * 2-phase-lock, checking if we have to switch is - * done without any locks on pause_req and - * exclusive_req to save time - the worst - * thing that can happen is that we'll launch one - * task more and exclusive task will be postponed a - * bit. - * - * Broadcasting on halt_cond seems suboptimal, but - * exclusive tasks are rare enough that we don't - * care. + * Execute the event action. */ - LOCK(&manager->halt_lock); - manager->halted++; - BROADCAST(&manager->halt_cond); - while (atomic_load_relaxed(&manager->pause_req) || - atomic_load_relaxed(&manager->exclusive_req)) - { - WAIT(&manager->halt_cond, &manager->halt_lock); - } - manager->halted--; - SIGNAL(&manager->halt_cond); - UNLOCK(&manager->halt_lock); - - LOCK(&manager->queues[threadid].lock); - /* Restart the loop after */ - continue; - } - - task = pop_readyq(manager, threadid); - if (task != NULL) { - unsigned int dispatch_count = 0; - bool done = false; - bool requeue = false; - bool finished = false; - isc_event_t *event; - - INSIST(VALID_TASK(task)); - - /* - * Note we only unlock the queue lock if we actually - * have a task to do. We must reacquire the queue - * lock before exiting the 'if (task != NULL)' block. - */ - UNLOCK(&manager->queues[threadid].lock); - RUNTIME_CHECK(atomic_fetch_sub_explicit( - &manager->tasks_ready, 1, - memory_order_release) > 0); - atomic_fetch_add_explicit(&manager->tasks_running, 1, - memory_order_acquire); - - LOCK(&task->lock); - /* - * It is possible because that we have a paused task - * in the queue - it might have been paused in the - * meantime and we never hold both queue and task lock - * to avoid deadlocks, just bail then. - */ - if (task->state != task_state_ready) { - UNLOCK(&task->lock); - LOCK(&manager->queues[threadid].lock); - continue; - } - INSIST(task->state == task_state_ready); - task->state = task_state_running; - XTRACE("running"); + XTRACE("execute action"); XTRACE(task->name); - TIME_NOW(&task->tnow); - task->now = isc_time_seconds(&task->tnow); - do { - if (!EMPTY(task->events)) { - event = HEAD(task->events); - DEQUEUE(task->events, event, ev_link); - task->nevents--; + if (event->ev_action != NULL) { + UNLOCK(&task->lock); + (event->ev_action)(task, event); + LOCK(&task->lock); + } + XTRACE("execution complete"); + dispatch_count++; + } - /* - * Execute the event action. - */ - XTRACE("execute action"); - XTRACE(task->name); - if (event->ev_action != NULL) { - UNLOCK(&task->lock); - (event->ev_action)( - (isc_task_t *)task, - event); - LOCK(&task->lock); - } - XTRACE("execution complete"); - dispatch_count++; - } + if (isc_refcount_current(&task->references) == 0 && + EMPTY(task->events) && !TASK_SHUTTINGDOWN(task)) + { + /* + * There are no references and no pending events for + * this task, which means it will not become runnable + * again via an external action (such as sending an + * event or detaching). + * + * We initiate shutdown to prevent it from becoming a + * zombie. + * + * We do this here instead of in the "if + * EMPTY(task->events)" block below because: + * + * If we post no shutdown events, we want the task + * to finish. + * + * If we did post shutdown events, will still want + * the task's quantum to be applied. + */ + INSIST(!task_shutdown(task)); + } - if (isc_refcount_current(&task->references) == - 0 && - EMPTY(task->events) && - !TASK_SHUTTINGDOWN(task)) - { - bool was_idle; - - /* - * There are no references and no - * pending events for this task, - * which means it will not become - * runnable again via an external - * action (such as sending an event - * or detaching). - * - * We initiate shutdown to prevent - * it from becoming a zombie. - * - * We do this here instead of in - * the "if EMPTY(task->events)" block - * below because: - * - * If we post no shutdown events, - * we want the task to finish. - * - * If we did post shutdown events, - * will still want the task's - * quantum to be applied. - */ - was_idle = task_shutdown(task); - INSIST(!was_idle); - } - - if (EMPTY(task->events)) { - /* - * Nothing else to do for this task - * right now. - */ - XTRACE("empty"); - if (isc_refcount_current( - &task->references) == 0 && - TASK_SHUTTINGDOWN(task)) { - /* - * The task is done. - */ - XTRACE("done"); - finished = true; - task->state = task_state_done; - } else { - if (task->state == - task_state_running) { - task->state = - task_state_idle; - } else if (task->state == - task_state_pausing) { - task->state = - task_state_paused; - } - } - done = true; + if (EMPTY(task->events)) { + /* + * Nothing else to do for this task right now. + */ + XTRACE("empty"); + if (isc_refcount_current(&task->references) == 0 && + TASK_SHUTTINGDOWN(task)) { + /* + * The task is done. + */ + XTRACE("done"); + finished = true; + task->state = task_state_done; + } else { + if (task->state == task_state_running) { + XTRACE("idling"); + task->state = task_state_idle; } else if (task->state == task_state_pausing) { - /* - * We got a pause request on this task, - * stop working on it and switch the - * state to paused. - */ XTRACE("pausing"); task->state = task_state_paused; - done = true; - } else if (dispatch_count >= task->quantum) { - /* - * Our quantum has expired, but - * there is more work to be done. - * We'll requeue it to the ready - * queue later. - * - * We don't check quantum until - * dispatching at least one event, - * so the minimum quantum is one. - */ - XTRACE("quantum"); - task->state = task_state_ready; - requeue = true; - done = true; } - } while (!done); - UNLOCK(&task->lock); - - if (finished) { - task_finished(task); } - - RUNTIME_CHECK(atomic_fetch_sub_explicit( - &manager->tasks_running, 1, - memory_order_release) > 0); - LOCK(&manager->queues[threadid].lock); - if (requeue) { - /* - * We know we're awake, so we don't have - * to wakeup any sleeping threads if the - * ready queue is empty before we requeue. - * - * A possible optimization if the queue is - * empty is to 'goto' the 'if (task != NULL)' - * block, avoiding the ENQUEUE of the task - * and the subsequent immediate DEQUEUE - * (since it is the only executable task). - * We don't do this because then we'd be - * skipping the exit_requested check. The - * cost of ENQUEUE is low anyway, especially - * when you consider that we'd have to do - * an extra EMPTY check to see if we could - * do the optimization. If the ready queue - * were usually nonempty, the 'optimization' - * might even hurt rather than help. - */ - push_readyq(manager, task, threadid); - } - } - - /* - * If we are in privileged execution mode and there are no - * tasks remaining on the current ready queue, then - * we're stuck. Automatically drop privileges at that - * point and continue with the regular ready queue. - */ - if (atomic_load_relaxed(&manager->mode) != - isc_taskmgrmode_normal && - atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0) - { - UNLOCK(&manager->queues[threadid].lock); - LOCK(&manager->lock); + break; + } else if (task->state == task_state_pausing) { /* - * Check once again, under lock. Mode can only - * change from privileged to normal anyway, and - * if we enter this loop twice at the same time - * we'll end up in a deadlock over queue locks. - * + * We got a pause request on this task, stop working on + * it and switch the state to paused. */ - if (atomic_load(&manager->mode) != - isc_taskmgrmode_normal && - atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0) - { - bool empty = true; - unsigned int i; - for (i = 0; i < manager->workers && empty; i++) - { - LOCK(&manager->queues[i].lock); - empty &= empty_readyq(manager, i); - UNLOCK(&manager->queues[i].lock); - } - if (empty) { - atomic_store(&manager->mode, - isc_taskmgrmode_normal); - wake_all_queues(manager); - } - } - UNLOCK(&manager->lock); - LOCK(&manager->queues[threadid].lock); + XTRACE("pausing"); + task->state = task_state_paused; + break; + } else if (dispatch_count >= task->quantum) { + /* + * Our quantum has expired, but there is more work to be + * done. We'll requeue it to the ready queue later. + * + * We don't check quantum until dispatching at least one + * event, so the minimum quantum is one. + */ + XTRACE("quantum"); + task->state = task_state_ready; + result = ISC_R_QUOTA; + break; } } - UNLOCK(&manager->queues[threadid].lock); - /* - * There might be other dispatchers waiting on empty tasks, - * wake them up. - */ - wake_all_queues(manager); + UNLOCK(&task->lock); + + if (finished) { + task_finished(task); + } + + return (result); } -static isc_threadresult_t -#ifdef _WIN32 - WINAPI -#endif /* ifdef _WIN32 */ - run(void *queuep) { - isc__taskqueue_t *tq = queuep; - isc_taskmgr_t *manager = tq->manager; - int threadid = tq->threadid; - isc_thread_setaffinity(threadid); - - XTHREADTRACE("starting"); - - dispatch(manager, threadid); - - XTHREADTRACE("exiting"); - -#ifdef OPENSSL_LEAKS - ERR_remove_state(0); -#endif /* ifdef OPENSSL_LEAKS */ - - return ((isc_threadresult_t)0); +isc_result_t +isc_task_run(isc_task_t *task) { + return (task_run(task)); } static void manager_free(isc_taskmgr_t *manager) { - for (unsigned int i = 0; i < manager->workers; i++) { - isc_mutex_destroy(&manager->queues[i].lock); - isc_condition_destroy(&manager->queues[i].work_available); - } + isc_refcount_destroy(&manager->references); + isc_nm_detach(&manager->nm); + isc_mutex_destroy(&manager->lock); isc_mutex_destroy(&manager->excl_lock); - isc_mutex_destroy(&manager->halt_lock); - isc_condition_destroy(&manager->halt_cond); - isc_mem_put(manager->mctx, manager->queues, - manager->workers * sizeof(isc__taskqueue_t)); manager->magic = 0; isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); } +void +isc_taskmgr_attach(isc_taskmgr_t *source, isc_taskmgr_t **targetp) { + REQUIRE(VALID_MANAGER(source)); + REQUIRE(targetp != NULL && *targetp == NULL); + + isc_refcount_increment(&source->references); + + *targetp = source; +} + +void +isc_taskmgr_detach(isc_taskmgr_t *manager) { + REQUIRE(VALID_MANAGER(manager)); + + if (isc_refcount_decrement(&manager->references) == 1) { + manager_free(manager); + } +} + isc_result_t -isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, - unsigned int default_quantum, isc_nm_t *nm, +isc_taskmgr_create(isc_mem_t *mctx, unsigned int default_quantum, isc_nm_t *nm, isc_taskmgr_t **managerp) { - unsigned int i; isc_taskmgr_t *manager; /* * Create a new task manager. */ - REQUIRE(workers > 0); REQUIRE(managerp != NULL && *managerp == NULL); + REQUIRE(nm != NULL); manager = isc_mem_get(mctx, sizeof(*manager)); *manager = (isc_taskmgr_t){ .magic = TASK_MANAGER_MAGIC }; - atomic_store(&manager->mode, isc_taskmgrmode_normal); isc_mutex_init(&manager->lock); isc_mutex_init(&manager->excl_lock); - isc_mutex_init(&manager->halt_lock); - isc_condition_init(&manager->halt_cond); - - manager->workers = workers; - if (default_quantum == 0) { default_quantum = DEFAULT_DEFAULT_QUANTUM; } @@ -1384,42 +994,17 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, } INIT_LIST(manager->tasks); - atomic_store(&manager->tasks_count, 0); - manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); - RUNTIME_CHECK(manager->queues != NULL); - + atomic_init(&manager->tasks_count, 0); atomic_init(&manager->tasks_running, 0); atomic_init(&manager->tasks_ready, 0); - atomic_init(&manager->curq, 0); atomic_init(&manager->exiting, false); atomic_store_relaxed(&manager->exclusive_req, false); - atomic_store_relaxed(&manager->pause_req, false); isc_mem_attach(mctx, &manager->mctx); - LOCK(&manager->lock); - /* - * Start workers. - */ - for (i = 0; i < workers; i++) { - INIT_LIST(manager->queues[i].ready_tasks); - INIT_LIST(manager->queues[i].ready_priority_tasks); - isc_mutex_init(&manager->queues[i].lock); - isc_condition_init(&manager->queues[i].work_available); + isc_refcount_init(&manager->references, 1); - manager->queues[i].manager = manager; - manager->queues[i].threadid = i; - isc_thread_create(run, &manager->queues[i], - &manager->queues[i].thread); - char name[21]; - snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->queues[i].thread, name); - } - UNLOCK(&manager->lock); - - isc_thread_setconcurrency(workers); - - *managerp = (isc_taskmgr_t *)manager; + *managerp = manager; return (ISC_R_SUCCESS); } @@ -1428,15 +1013,13 @@ void isc_taskmgr_destroy(isc_taskmgr_t **managerp) { isc_taskmgr_t *manager; isc_task_t *task; - unsigned int i; - bool exiting; /* * Destroy '*managerp'. */ REQUIRE(managerp != NULL); - manager = (isc_taskmgr_t *)*managerp; + manager = *managerp; REQUIRE(VALID_MANAGER(manager)); XTHREADTRACE("isc_taskmgr_destroy"); @@ -1471,117 +1054,39 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { /* * Make sure we only get called once. */ - exiting = false; - - INSIST(!!atomic_compare_exchange_strong(&manager->exiting, &exiting, - true)); - - /* - * If privileged mode was on, turn it off. - */ - atomic_store(&manager->mode, isc_taskmgrmode_normal); + INSIST(atomic_compare_exchange_strong(&manager->exiting, + &(bool){ false }, true)); /* * Post shutdown event(s) to every task (if they haven't already been - * posted). To make things easier post idle tasks to worker 0. + * posted). */ - LOCK(&manager->queues[0].lock); for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) { task->threadid = 0; - push_readyq(manager, task, 0); + task_ready(task); } UNLOCK(&task->lock); } - UNLOCK(&manager->queues[0].lock); - /* - * Wake up any sleeping workers. This ensures we get work done if - * there's work left to do, and if there are already no tasks left - * it will cause the workers to see manager->exiting. - */ - wake_all_queues(manager); UNLOCK(&manager->lock); - /* - * Wait for all the worker threads to exit. - */ - for (i = 0; i < manager->workers; i++) { - isc_thread_join(manager->queues[i].thread, NULL); - } - - /* - * Detach from the network manager if it was set. - */ - if (manager->nm != NULL) { - isc_nm_detach(&manager->nm); - } - - manager_free(manager); + isc_taskmgr_detach(manager); *managerp = NULL; } -void -isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - - atomic_store(&manager->mode, isc_taskmgrmode_privileged); -} - -isc_taskmgrmode_t -isc_taskmgr_mode(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - return (atomic_load(&manager->mode)); -} - -void -isc__taskmgr_pause(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - - LOCK(&manager->halt_lock); - while (atomic_load_relaxed(&manager->exclusive_req) || - atomic_load_relaxed(&manager->pause_req)) - { - UNLOCK(&manager->halt_lock); - /* This is ugly but pause is used EXCLUSIVELY in tests */ - isc_thread_yield(); - LOCK(&manager->halt_lock); - } - - atomic_store_relaxed(&manager->pause_req, true); - while (manager->halted < manager->workers) { - wake_all_queues(manager); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); -} - -void -isc__taskmgr_resume(isc_taskmgr_t *manager0) { - isc_taskmgr_t *manager = (isc_taskmgr_t *)manager0; - LOCK(&manager->halt_lock); - if (atomic_load(&manager->pause_req)) { - atomic_store(&manager->pause_req, false); - while (manager->halted > 0) { - BROADCAST(&manager->halt_cond); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - } - UNLOCK(&manager->halt_lock); -} - void isc_taskmgr_setexcltask(isc_taskmgr_t *mgr, isc_task_t *task) { REQUIRE(VALID_MANAGER(mgr)); REQUIRE(VALID_TASK(task)); LOCK(&mgr->excl_lock); if (mgr->excl != NULL) { - isc_task_detach((isc_task_t **)&mgr->excl); + isc_task_detach(&mgr->excl); } - isc_task_attach(task, (isc_task_t **)&mgr->excl); + isc_task_attach(task, &mgr->excl); UNLOCK(&mgr->excl_lock); } @@ -1594,7 +1099,7 @@ isc_taskmgr_excltask(isc_taskmgr_t *mgr, isc_task_t **taskp) { LOCK(&mgr->excl_lock); if (mgr->excl != NULL) { - isc_task_attach((isc_task_t *)mgr->excl, taskp); + isc_task_attach(mgr->excl, taskp); } else { result = ISC_R_NOTFOUND; } @@ -1619,24 +1124,14 @@ isc_task_beginexclusive(isc_task_t *task) { task->manager->excl == NULL)); UNLOCK(&manager->excl_lock); - if (atomic_load_relaxed(&manager->exclusive_req) || - atomic_load_relaxed(&manager->pause_req)) + if (!atomic_compare_exchange_strong(&manager->exclusive_req, + &(bool){ false }, true)) { return (ISC_R_LOCKBUSY); } - LOCK(&manager->halt_lock); - INSIST(!atomic_load_relaxed(&manager->exclusive_req) && - !atomic_load_relaxed(&manager->pause_req)); - atomic_store_relaxed(&manager->exclusive_req, true); - while (manager->halted + 1 < manager->workers) { - wake_all_queues(manager); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); - if (manager->nm != NULL) { - isc_nm_pause(manager->nm); - } + isc_nm_pause(manager->nm); + return (ISC_R_SUCCESS); } @@ -1648,17 +1143,9 @@ isc_task_endexclusive(isc_task_t *task) { REQUIRE(task->state == task_state_running); manager = task->manager; - if (manager->nm != NULL) { - isc_nm_resume(manager->nm); - } - LOCK(&manager->halt_lock); - REQUIRE(atomic_load_relaxed(&manager->exclusive_req)); - atomic_store_relaxed(&manager->exclusive_req, false); - while (manager->halted > 0) { - BROADCAST(&manager->halt_cond); - WAIT(&manager->halt_cond, &manager->halt_lock); - } - UNLOCK(&manager->halt_lock); + isc_nm_resume(manager->nm); + REQUIRE(atomic_compare_exchange_strong(&manager->exclusive_req, + &(bool){ true }, false)); } void @@ -1669,7 +1156,7 @@ isc_task_pause(isc_task_t *task) { task->pause_cnt++; if (task->pause_cnt > 1) { /* - * Someone already paused this thread, just increase + * Someone already paused this task, just increase * the number of pausing clients. */ UNLOCK(&task->lock); @@ -1723,7 +1210,6 @@ isc_task_unpause(isc_task_t *task) { void isc_task_setprivilege(isc_task_t *task, bool priv) { REQUIRE(VALID_TASK(task)); - isc_taskmgr_t *manager = task->manager; uint_fast32_t oldflags, newflags; oldflags = atomic_load_acquire(&task->flags); @@ -1738,16 +1224,6 @@ isc_task_setprivilege(isc_task_t *task, bool priv) { } } while (!atomic_compare_exchange_weak_acq_rel(&task->flags, &oldflags, newflags)); - - LOCK(&manager->queues[task->threadid].lock); - if (priv && ISC_LINK_LINKED(task, ready_link)) { - ENQUEUE(manager->queues[task->threadid].ready_priority_tasks, - task, ready_priority_link); - } else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) { - DEQUEUE(manager->queues[task->threadid].ready_priority_tasks, - task, ready_priority_link); - } - UNLOCK(&manager->queues[task->threadid].lock); } bool @@ -1758,8 +1234,7 @@ isc_task_privilege(isc_task_t *task) { } bool -isc_task_exiting(isc_task_t *t) { - isc_task_t *task = (isc_task_t *)t; +isc_task_exiting(isc_task_t *task) { REQUIRE(VALID_TASK(task)); return (TASK_SHUTTINGDOWN(task)); @@ -1789,10 +1264,6 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr, void *writer0) { TRY0(xmlTextWriterWriteString(writer, ISC_XMLCHAR "threaded")); TRY0(xmlTextWriterEndElement(writer)); /* type */ - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "worker-threads")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->workers)); - TRY0(xmlTextWriterEndElement(writer)); /* worker-threads */ - TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "default-quantum")); TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->default_quantum)); @@ -1898,10 +1369,6 @@ isc_taskmgr_renderjson(isc_taskmgr_t *mgr, void *tasks0) { CHECKMEM(obj); json_object_object_add(tasks, "thread-model", obj); - obj = json_object_new_int(mgr->workers); - CHECKMEM(obj); - json_object_object_add(tasks, "worker-threads", obj); - obj = json_object_new_int(mgr->default_quantum); CHECKMEM(obj); json_object_object_add(tasks, "default-quantum", obj); diff --git a/lib/isc/task_p.h b/lib/isc/task_p.h deleted file mode 100644 index ae8f095733..0000000000 --- a/lib/isc/task_p.h +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) Internet Systems Consortium, Inc. ("ISC") - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, you can obtain one at https://mozilla.org/MPL/2.0/. - * - * See the COPYRIGHT file distributed with this work for additional - * information regarding copyright ownership. - */ - -#ifndef ISC_TASK_P_H -#define ISC_TASK_P_H - -/*! \file */ - -/*% - * These functions allow unit tests to manipulate the processing - * of the task queue. They are not intended as part of the public API. - */ -void -isc__taskmgr_pause(isc_taskmgr_t *taskmgr); -void -isc__taskmgr_resume(isc_taskmgr_t *taskmgr); - -#endif /* ISC_TASK_P_H */ diff --git a/lib/isc/taskpool.c b/lib/isc/taskpool.c index ca794c8a49..81652d7d3a 100644 --- a/lib/isc/taskpool.c +++ b/lib/isc/taskpool.c @@ -57,10 +57,9 @@ alloc_pool(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, isc_result_t isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, - unsigned int quantum, isc_taskpool_t **poolp) { + unsigned int quantum, bool priv, isc_taskpool_t **poolp) { unsigned int i; isc_taskpool_t *pool = NULL; - isc_result_t result; INSIST(ntasks > 0); @@ -69,11 +68,13 @@ isc_taskpool_create(isc_taskmgr_t *tmgr, isc_mem_t *mctx, unsigned int ntasks, /* Create the tasks */ for (i = 0; i < ntasks; i++) { - result = isc_task_create(tmgr, quantum, &pool->tasks[i]); + isc_result_t result = isc_task_create_bound(tmgr, quantum, + &pool->tasks[i], i); if (result != ISC_R_SUCCESS) { isc_taskpool_destroy(&pool); return (result); } + isc_task_setprivilege(pool->tasks[i], priv); isc_task_setname(pool->tasks[i], "taskpool", NULL); } @@ -93,9 +94,8 @@ isc_taskpool_size(isc_taskpool_t *pool) { } isc_result_t -isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, +isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, bool priv, isc_taskpool_t **targetp) { - isc_result_t result; isc_taskpool_t *pool; REQUIRE(sourcep != NULL && *sourcep != NULL); @@ -119,13 +119,15 @@ isc_taskpool_expand(isc_taskpool_t **sourcep, unsigned int size, /* Create new tasks */ for (i = pool->ntasks; i < size; i++) { - result = isc_task_create(pool->tmgr, pool->quantum, - &newpool->tasks[i]); + isc_result_t result = + isc_task_create_bound(pool->tmgr, pool->quantum, + &newpool->tasks[i], i); if (result != ISC_R_SUCCESS) { *sourcep = pool; isc_taskpool_destroy(&newpool); return (result); } + isc_task_setprivilege(newpool->tasks[i], priv); isc_task_setname(newpool->tasks[i], "taskpool", NULL); } @@ -151,16 +153,3 @@ isc_taskpool_destroy(isc_taskpool_t **poolp) { pool->ntasks * sizeof(isc_task_t *)); isc_mem_putanddetach(&pool->mctx, pool, sizeof(*pool)); } - -void -isc_taskpool_setprivilege(isc_taskpool_t *pool, bool priv) { - unsigned int i; - - REQUIRE(pool != NULL); - - for (i = 0; i < pool->ntasks; i++) { - if (pool->tasks[i] != NULL) { - isc_task_setprivilege(pool->tasks[i], priv); - } - } -} diff --git a/lib/isc/tests/isctest.c b/lib/isc/tests/isctest.c index 0bd220c2fd..d86007903b 100644 --- a/lib/isc/tests/isctest.c +++ b/lib/isc/tests/isctest.c @@ -64,12 +64,12 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } - if (netmgr != NULL) { - isc_nm_detach(&netmgr); - } } static isc_result_t @@ -89,7 +89,7 @@ create_managers(unsigned int workers) { isc_hp_init(6 * workers); netmgr = isc_nm_start(test_mctx, workers); - CHECK(isc_taskmgr_create(test_mctx, workers, 0, netmgr, &taskmgr)); + CHECK(isc_taskmgr_create(test_mctx, 0, netmgr, &taskmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); isc_taskmgr_setexcltask(taskmgr, maintask); diff --git a/lib/isc/tests/task_test.c b/lib/isc/tests/task_test.c index 0aad028fb1..f6b40f203d 100644 --- a/lib/isc/tests/task_test.c +++ b/lib/isc/tests/task_test.c @@ -37,7 +37,6 @@ #include #include -#include "../task_p.h" #include "isctest.h" /* Set to true (or use -v option) for verbose output */ @@ -120,6 +119,8 @@ set(isc_task_t *task, isc_event_t *event) { atomic_store(value, atomic_fetch_add(&counter, 1)); } +#include + static void set_and_drop(isc_task_t *task, isc_event_t *event) { atomic_int_fast32_t *value = (atomic_int_fast32_t *)event->ev_arg; @@ -128,8 +129,7 @@ set_and_drop(isc_task_t *task, isc_event_t *event) { isc_event_free(&event); LOCK(&lock); - atomic_store(value, (int)isc_taskmgr_mode(taskmgr)); - atomic_fetch_add(&counter, 1); + atomic_store(value, atomic_fetch_add(&counter, 1)); UNLOCK(&lock); } @@ -204,17 +204,17 @@ privileged_events(void **state) { UNUSED(state); atomic_init(&counter, 1); - atomic_init(&a, 0); - atomic_init(&b, 0); - atomic_init(&c, 0); - atomic_init(&d, 0); - atomic_init(&e, 0); + atomic_init(&a, -1); + atomic_init(&b, -1); + atomic_init(&c, -1); + atomic_init(&d, -1); + atomic_init(&e, -1); /* - * Pause the task manager so we can fill up the work queue - * without things happening while we do it. + * Pause the net/task manager so we can fill up the work + * queue without things happening while we do it. */ - isc__taskmgr_pause(taskmgr); + isc_nm_pause(netmgr); result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -233,7 +233,7 @@ privileged_events(void **state) { &a, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&a), 0); + assert_int_equal(atomic_load(&a), -1); isc_task_send(task1, &event); /* Second event: not privileged */ @@ -241,7 +241,7 @@ privileged_events(void **state) { &b, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&b), 0); + assert_int_equal(atomic_load(&b), -1); isc_task_send(task2, &event); /* Third event: privileged */ @@ -249,7 +249,7 @@ privileged_events(void **state) { &c, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&c), 0); + assert_int_equal(atomic_load(&c), -1); isc_task_send(task1, &event); /* Fourth event: privileged */ @@ -257,7 +257,7 @@ privileged_events(void **state) { &d, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&d), 0); + assert_int_equal(atomic_load(&d), -1); isc_task_send(task1, &event); /* Fifth event: not privileged */ @@ -265,19 +265,15 @@ privileged_events(void **state) { &e, sizeof(isc_event_t)); assert_non_null(event); - assert_int_equal(atomic_load(&e), 0); + assert_int_equal(atomic_load(&e), -1); isc_task_send(task2, &event); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setprivilegedmode(taskmgr); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); - - isc__taskmgr_resume(taskmgr); + isc_nm_resume(netmgr); /* We're waiting for *all* variables to be set */ - while ((atomic_load(&a) == 0 || atomic_load(&b) == 0 || - atomic_load(&c) == 0 || atomic_load(&d) == 0 || - atomic_load(&e) == 0) && + while ((atomic_load(&a) < 0 || atomic_load(&b) < 0 || + atomic_load(&c) < 0 || atomic_load(&d) < 0 || + atomic_load(&e) < 0) && i++ < 5000) { isc_test_nap(1000); @@ -293,16 +289,14 @@ privileged_events(void **state) { assert_true(atomic_load(&d) <= 3); /* ...and the non-privileged tasks that set b and e, last */ - assert_true(atomic_load(&b) >= 4); - assert_true(atomic_load(&e) >= 4); + assert_true(atomic_load(&b) > 3); + assert_true(atomic_load(&e) > 3); assert_int_equal(atomic_load(&counter), 6); isc_task_setprivilege(task1, false); assert_false(isc_task_privilege(task1)); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_task_destroy(&task1); assert_null(task1); isc_task_destroy(&task2); @@ -331,10 +325,10 @@ privilege_drop(void **state) { atomic_init(&e, -1); /* - * Pause the task manager so we can fill up the work queue + * Pause the net/task manager so we can fill up the work queue * without things happening while we do it. */ - isc__taskmgr_pause(taskmgr); + isc_nm_pause(netmgr); result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -388,11 +382,7 @@ privilege_drop(void **state) { assert_int_equal(atomic_load(&e), -1); isc_task_send(task2, &event); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setprivilegedmode(taskmgr); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); - - isc__taskmgr_resume(taskmgr); + isc_nm_resume(netmgr); /* We're waiting for all variables to be set. */ while ((atomic_load(&a) == -1 || atomic_load(&b) == -1 || @@ -407,19 +397,17 @@ privilege_drop(void **state) { * We need to check that all privilege mode events were fired * in privileged mode, and non privileged in non-privileged. */ - assert_true(atomic_load(&a) == isc_taskmgrmode_privileged || - atomic_load(&c) == isc_taskmgrmode_privileged || - atomic_load(&d) == isc_taskmgrmode_privileged); + assert_true(atomic_load(&a) <= 3); + assert_true(atomic_load(&c) <= 3); + assert_true(atomic_load(&d) <= 3); /* ...and neither of the non-privileged tasks did... */ - assert_true(atomic_load(&b) == isc_taskmgrmode_normal || - atomic_load(&e) == isc_taskmgrmode_normal); + assert_true(atomic_load(&b) > 3); + assert_true(atomic_load(&e) > 3); /* ...but all five of them did run. */ assert_int_equal(atomic_load(&counter), 6); - assert_int_equal(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_task_destroy(&task1); assert_null(task1); isc_task_destroy(&task2); @@ -695,6 +683,7 @@ exclusive_cb(isc_task_t *task, isc_event_t *event) { if (atomic_load(&done)) { isc_mem_put(event->ev_destroy_arg, event->ev_arg, sizeof(int)); isc_event_free(&event); + atomic_fetch_sub(&counter, 1); } else { isc_task_send(task, &event); } @@ -708,6 +697,8 @@ task_exclusive(void **state) { UNUSED(state); + atomic_init(&counter, 0); + for (i = 0; i < 10; i++) { isc_event_t *event = NULL; int *v; @@ -732,11 +723,16 @@ task_exclusive(void **state) { assert_non_null(event); isc_task_send(tasks[i], &event); + atomic_fetch_add(&counter, 1); } for (i = 0; i < 10; i++) { isc_task_detach(&tasks[i]); } + + while (atomic_load(&counter) > 0) { + isc_test_nap(1000); + } } /* @@ -805,7 +801,8 @@ manytasks(void **state) { isc_mem_debugging = ISC_MEM_DEBUGRECORD; isc_mem_create(&mctx); - result = isc_taskmgr_create(mctx, 4, 0, NULL, &taskmgr); + netmgr = isc_nm_start(mctx, 4); + result = isc_taskmgr_create(mctx, 0, netmgr, &taskmgr); assert_int_equal(result, ISC_R_SUCCESS); atomic_init(&done, false); @@ -822,6 +819,7 @@ manytasks(void **state) { UNLOCK(&lock); isc_taskmgr_destroy(&taskmgr); + isc_nm_destroy(&netmgr); isc_mem_destroy(&mctx); isc_condition_destroy(&cv); isc_mutex_destroy(&lock); diff --git a/lib/isc/tests/taskpool_test.c b/lib/isc/tests/taskpool_test.c index a22b0511a0..9878599721 100644 --- a/lib/isc/tests/taskpool_test.c +++ b/lib/isc/tests/taskpool_test.c @@ -28,6 +28,9 @@ #include "isctest.h" +#define TASK_MAGIC ISC_MAGIC('T', 'A', 'S', 'K') +#define VALID_TASK(t) ISC_MAGIC_VALID(t, TASK_MAGIC) + static int _setup(void **state) { isc_result_t result; @@ -57,7 +60,7 @@ create_pool(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 8, 2, false, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 8); @@ -73,13 +76,13 @@ expand_pool(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, &pool1); + result = isc_taskpool_create(taskmgr, test_mctx, 10, 2, false, &pool1); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool1), 10); /* resizing to a smaller size should have no effect */ hold = pool1; - result = isc_taskpool_expand(&pool1, 5, &pool2); + result = isc_taskpool_expand(&pool1, 5, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 10); assert_ptr_equal(pool2, hold); @@ -89,7 +92,7 @@ expand_pool(void **state) { /* resizing to the same size should have no effect */ hold = pool1; - result = isc_taskpool_expand(&pool1, 10, &pool2); + result = isc_taskpool_expand(&pool1, 10, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 10); assert_ptr_equal(pool2, hold); @@ -99,7 +102,7 @@ expand_pool(void **state) { /* resizing to larger size should make a new pool */ hold = pool1; - result = isc_taskpool_expand(&pool1, 20, &pool2); + result = isc_taskpool_expand(&pool1, 20, false, &pool2); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool2), 20); assert_ptr_not_equal(pool2, hold); @@ -118,19 +121,19 @@ get_tasks(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, false, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 2); /* two tasks in pool; make sure we can access them more than twice */ isc_taskpool_gettask(pool, &task1); - assert_non_null(task1); + assert_true(VALID_TASK(task1)); isc_taskpool_gettask(pool, &task2); - assert_non_null(task2); + assert_true(VALID_TASK(task2)); isc_taskpool_gettask(pool, &task3); - assert_non_null(task3); + assert_true(VALID_TASK(task3)); isc_task_destroy(&task1); isc_task_destroy(&task2); @@ -149,30 +152,22 @@ set_privilege(void **state) { UNUSED(state); - result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, &pool); + result = isc_taskpool_create(taskmgr, test_mctx, 2, 2, true, &pool); assert_int_equal(result, ISC_R_SUCCESS); assert_int_equal(isc_taskpool_size(pool), 2); - isc_taskpool_setprivilege(pool, true); - isc_taskpool_gettask(pool, &task1); isc_taskpool_gettask(pool, &task2); isc_taskpool_gettask(pool, &task3); - assert_non_null(task1); - assert_non_null(task2); - assert_non_null(task3); + assert_true(VALID_TASK(task1)); + assert_true(VALID_TASK(task2)); + assert_true(VALID_TASK(task3)); assert_true(isc_task_privilege(task1)); assert_true(isc_task_privilege(task2)); assert_true(isc_task_privilege(task3)); - isc_taskpool_setprivilege(pool, false); - - assert_false(isc_task_privilege(task1)); - assert_false(isc_task_privilege(task2)); - assert_false(isc_task_privilege(task3)); - isc_task_destroy(&task1); isc_task_destroy(&task2); isc_task_destroy(&task3); diff --git a/lib/isc/tests/timer_test.c b/lib/isc/tests/timer_test.c index 4d02ff3b74..21b5679de2 100644 --- a/lib/isc/tests/timer_test.c +++ b/lib/isc/tests/timer_test.c @@ -435,8 +435,8 @@ reset(void **state) { setup_test(isc_timertype_ticker, &expires, &interval, test_reset); } -static int startflag; -static int shutdownflag; +static atomic_bool startflag; +static atomic_bool shutdownflag; static isc_timer_t *tickertimer = NULL; static isc_timer_t *oncetimer = NULL; static isc_task_t *task1 = NULL; @@ -447,23 +447,6 @@ static isc_task_t *task2 = NULL; * in its queue, until signaled by task2. */ -static void -start_event(isc_task_t *task, isc_event_t *event) { - UNUSED(task); - - if (verbose) { - print_message("# start_event\n"); - } - - LOCK(&mx); - while (!startflag) { - (void)isc_condition_wait(&cv, &mx); - } - UNLOCK(&mx); - - isc_event_free(&event); -} - static void tick_event(isc_task_t *task, isc_event_t *event) { isc_result_t result; @@ -472,6 +455,14 @@ tick_event(isc_task_t *task, isc_event_t *event) { UNUSED(task); + if (!atomic_load(&startflag)) { + if (verbose) { + print_message("# tick_event %d\n", -1); + } + isc_event_free(&event); + return; + } + int tick = atomic_fetch_add(&eventcnt, 1); if (verbose) { print_message("# tick_event %d\n", tick); @@ -496,8 +487,6 @@ tick_event(isc_task_t *task, isc_event_t *event) { static void once_event(isc_task_t *task, isc_event_t *event) { - isc_result_t result; - if (verbose) { print_message("# once_event\n"); } @@ -505,12 +494,7 @@ once_event(isc_task_t *task, isc_event_t *event) { /* * Allow task1 to start processing events. */ - LOCK(&mx); - startflag = 1; - - result = isc_condition_broadcast(&cv); - subthread_assert_result_equal(result, ISC_R_SUCCESS); - UNLOCK(&mx); + atomic_store(&startflag, true); isc_event_free(&event); isc_task_shutdown(task); @@ -518,8 +502,6 @@ once_event(isc_task_t *task, isc_event_t *event) { static void shutdown_purge(isc_task_t *task, isc_event_t *event) { - isc_result_t result; - UNUSED(task); UNUSED(event); @@ -530,12 +512,7 @@ shutdown_purge(isc_task_t *task, isc_event_t *event) { /* * Signal shutdown processing complete. */ - LOCK(&mx); - shutdownflag = 1; - - result = isc_condition_signal(&cv); - subthread_assert_result_equal(result, ISC_R_SUCCESS); - UNLOCK(&mx); + atomic_store(&shutdownflag, 1); isc_event_free(&event); } @@ -544,22 +521,17 @@ shutdown_purge(isc_task_t *task, isc_event_t *event) { static void purge(void **state) { isc_result_t result; - isc_event_t *event = NULL; isc_time_t expires; isc_interval_t interval; UNUSED(state); - startflag = 0; - shutdownflag = 0; + atomic_init(&startflag, 0); + atomic_init(&shutdownflag, 0); atomic_init(&eventcnt, 0); seconds = 1; nanoseconds = 0; - isc_mutex_init(&mx); - - isc_condition_init(&cv); - result = isc_task_create(taskmgr, 0, &task1); assert_int_equal(result, ISC_R_SUCCESS); @@ -569,13 +541,6 @@ purge(void **state) { result = isc_task_create(taskmgr, 0, &task2); assert_int_equal(result, ISC_R_SUCCESS); - LOCK(&mx); - - event = isc_event_allocate(test_mctx, (void *)1, (isc_eventtype_t)1, - start_event, NULL, sizeof(*event)); - assert_non_null(event); - isc_task_send(task1, &event); - isc_time_settoepoch(&expires); isc_interval_set(&interval, seconds, 0); @@ -600,13 +565,10 @@ purge(void **state) { /* * Wait for shutdown processing to complete. */ - while (!shutdownflag) { - result = isc_condition_wait(&cv, &mx); - assert_int_equal(result, ISC_R_SUCCESS); + while (!atomic_load(&shutdownflag)) { + isc_test_nap(1000); } - UNLOCK(&mx); - assert_int_equal(atomic_load(&errcnt), ISC_R_SUCCESS); assert_int_equal(atomic_load(&eventcnt), 1); @@ -615,7 +577,6 @@ purge(void **state) { isc_timer_detach(&oncetimer); isc_task_destroy(&task1); isc_task_destroy(&task2); - isc_mutex_destroy(&mx); } int diff --git a/lib/isc/timer.c b/lib/isc/timer.c index 2798491d85..af49801660 100644 --- a/lib/isc/timer.c +++ b/lib/isc/timer.c @@ -94,14 +94,13 @@ struct isc_timermgr { }; void -isc_timermgr_poke(isc_timermgr_t *manager0); +isc_timermgr_poke(isc_timermgr_t *manager); static inline isc_result_t schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { isc_result_t result; isc_timermgr_t *manager; isc_time_t due; - int cmp; /*! * Note: the caller must ensure locking. @@ -145,7 +144,7 @@ schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { /* * Already scheduled. */ - cmp = isc_time_compare(&due, &timer->due); + int cmp = isc_time_compare(&due, &timer->due); timer->due = due; switch (cmp) { case -1: @@ -187,7 +186,6 @@ schedule(isc_timer_t *timer, isc_time_t *now, bool signal_ok) { static inline void deschedule(isc_timer_t *timer) { - bool need_wakeup = false; isc_timermgr_t *manager; /* @@ -196,6 +194,7 @@ deschedule(isc_timer_t *timer) { manager = timer->manager; if (timer->index > 0) { + bool need_wakeup = false; if (timer->index == 1) { need_wakeup = true; } @@ -223,6 +222,7 @@ destroy(isc_timer_t *timer) { (void)isc_task_purgerange(timer->task, timer, ISC_TIMEREVENT_FIRSTEVENT, ISC_TIMEREVENT_LASTEVENT, NULL); deschedule(timer); + UNLINK(manager->timers, timer, link); UNLOCK(&manager->lock); @@ -467,10 +467,6 @@ isc_timer_touch(isc_timer_t *timer) { void isc_timer_attach(isc_timer_t *timer, isc_timer_t **timerp) { - /* - * Attach *timerp to timer. - */ - REQUIRE(VALID_TIMER(timer)); REQUIRE(timerp != NULL && *timerp == NULL); isc_refcount_increment(&timer->references); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 6889fb4f0e..1404a8316c 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -104,10 +104,10 @@ isc_socketmgr_setreserved isc_socketmgr_setstats isc_task_getname isc_task_gettag +isc_task_run isc_task_unsendrange -isc_taskmgr_mode -isc__taskmgr_pause -isc__taskmgr_resume +isc_taskmgr_attach +isc_taskmgr_detach isc_aes128_crypt isc_aes192_crypt isc_aes256_crypt @@ -654,7 +654,6 @@ isc_task_unsend isc_taskmgr_create isc_taskmgr_destroy isc_taskmgr_excltask -isc_taskmgr_mode @IF NOTYET isc_taskmgr_renderjson @END NOTYET @@ -662,12 +661,10 @@ isc_taskmgr_renderjson isc_taskmgr_renderxml @END LIBXML2 isc_taskmgr_setexcltask -isc_taskmgr_setprivilegedmode isc_taskpool_create isc_taskpool_destroy isc_taskpool_expand isc_taskpool_gettask -isc_taskpool_setprivilege isc_taskpool_size isc_thread_create isc_thread_join diff --git a/lib/ns/tests/nstest.c b/lib/ns/tests/nstest.c index 5595a22c57..0f309f01a8 100644 --- a/lib/ns/tests/nstest.c +++ b/lib/ns/tests/nstest.c @@ -51,6 +51,7 @@ isc_mem_t *mctx = NULL; isc_log_t *lctx = NULL; +isc_nm_t *netmgr = NULL; isc_taskmgr_t *taskmgr = NULL; isc_task_t *maintask = NULL; isc_timermgr_t *timermgr = NULL; @@ -213,6 +214,9 @@ cleanup_managers(void) { if (taskmgr != NULL) { isc_taskmgr_destroy(&taskmgr); } + if (netmgr != NULL) { + isc_nm_destroy(&netmgr); + } if (timermgr != NULL) { isc_timermgr_destroy(&timermgr); } @@ -237,7 +241,8 @@ create_managers(void) { isc_event_t *event = NULL; ncpus = isc_os_ncpus(); - CHECK(isc_taskmgr_create(mctx, ncpus, 0, NULL, &taskmgr)); + netmgr = isc_nm_start(mctx, ncpus); + CHECK(isc_taskmgr_create(mctx, 0, netmgr, &taskmgr)); CHECK(isc_task_create(taskmgr, 0, &maintask)); isc_taskmgr_setexcltask(taskmgr, maintask); CHECK(isc_task_onshutdown(maintask, shutdown_managers, NULL)); diff --git a/util/copyrights b/util/copyrights index c69312fe83..e1c7d591a7 100644 --- a/util/copyrights +++ b/util/copyrights @@ -1981,7 +1981,6 @@ ./lib/isc/string.c C 1999,2000,2001,2003,2004,2005,2006,2007,2011,2012,2014,2015,2016,2018,2019,2020,2021 ./lib/isc/symtab.c C 1996,1997,1998,1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021 ./lib/isc/task.c C 1998,1999,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017,2018,2019,2020,2021 -./lib/isc/task_p.h C 2018,2019,2020,2021 ./lib/isc/taskpool.c C 1999,2000,2001,2004,2005,2007,2011,2012,2013,2016,2018,2019,2020,2021 ./lib/isc/tests/aes_test.c C 2014,2016,2018,2019,2020,2021 ./lib/isc/tests/buffer_test.c C 2014,2015,2016,2017,2018,2019,2020,2021