From 7b6721b27f39b0feeb74a74d8b5b065439c90260 Mon Sep 17 00:00:00 2001 From: Witold Krecicki Date: Wed, 3 Oct 2018 23:20:22 +0200 Subject: [PATCH 01/18] isc_thread_setaffinity() --- config.h.in | 21 +++++++++++ configure | 51 ++++++++++++++++++++++++++- configure.ac | 4 +++ lib/isc/pthreads/include/isc/thread.h | 3 ++ lib/isc/pthreads/thread.c | 39 ++++++++++++++++++++ lib/isc/win32/include/isc/thread.h | 3 ++ lib/isc/win32/libisc.def.in | 1 + lib/isc/win32/thread.c | 6 ++++ 8 files changed, 127 insertions(+), 1 deletion(-) diff --git a/config.h.in b/config.h.in index 08e2f9c483..8fccd17b65 100644 --- a/config.h.in +++ b/config.h.in @@ -75,6 +75,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_CMOCKA_H +/* Define to 1 if you have the `cpuset_setaffinty' function. */ +#undef HAVE_CPUSET_SETAFFINITY + /* Define to 1 if you have the `CRYPTO_zalloc' function. */ #undef HAVE_CRYPTO_ZALLOC @@ -285,6 +288,9 @@ /* define if OpenSSL supports Ed25519 */ #undef HAVE_OPENSSL_ED25519 +/* Define to 1 if you have the `processor_bind' function. */ +#undef HAVE_PROCESSOR_BIND + /* Define if you have POSIX threads libraries and header files. */ #undef HAVE_PTHREAD @@ -303,6 +309,9 @@ /* Have PTHREAD_PRIO_INHERIT. */ #undef HAVE_PTHREAD_PRIO_INHERIT +/* Define to 1 if you have the `pthread_setaffinity_np' function. */ +#undef HAVE_PTHREAD_SETAFFINITY_NP + /* Define to 1 if you have the `pthread_setname_np' function. */ #undef HAVE_PTHREAD_SETNAME_NP @@ -333,6 +342,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_SCHED_H +/* Define to 1 if you have the `sched_setaffinity' function. */ +#undef HAVE_SCHED_SETAFFINITY + /* Define to 1 if you have the `sched_yield' function. */ #undef HAVE_SCHED_YIELD @@ -387,6 +399,9 @@ /* Define to 1 if you have the header file. */ #undef HAVE_SYS_CAPABILITY_H +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_CPUSET_H + /* Define to 1 if you have the header file. */ #undef HAVE_SYS_DEVPOLL_H @@ -396,6 +411,12 @@ /* Define to 1 if you have the header file. */ #undef HAVE_SYS_PARAM_H +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_PRCTL_H + +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_PROCSET_H + /* Define to 1 if you have the header file. */ #undef HAVE_SYS_SELECT_H diff --git a/configure b/configure index 6ed2702470..b6a780f78f 100755 --- a/configure +++ b/configure @@ -844,6 +844,7 @@ infodir docdir oldincludedir includedir +runstatedir localstatedir sharedstatedir sysconfdir @@ -1003,6 +1004,7 @@ datadir='${datarootdir}' sysconfdir='${prefix}/etc' sharedstatedir='${prefix}/com' localstatedir='${prefix}/var' +runstatedir='${localstatedir}/run' includedir='${prefix}/include' oldincludedir='/usr/include' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' @@ -1255,6 +1257,15 @@ do | -silent | --silent | --silen | --sile | --sil) silent=yes ;; + -runstatedir | --runstatedir | --runstatedi | --runstated \ + | --runstate | --runstat | --runsta | --runst | --runs \ + | --run | --ru | --r) + ac_prev=runstatedir ;; + -runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \ + | --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \ + | --run=* | --ru=* | --r=*) + runstatedir=$ac_optarg ;; + -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) ac_prev=sbindir ;; -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ @@ -1392,7 +1403,7 @@ fi for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ datadir sysconfdir sharedstatedir localstatedir includedir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ - libdir localedir mandir + libdir localedir mandir runstatedir do eval ac_val=\$$ac_var # Remove trailing slashes. @@ -1545,6 +1556,7 @@ Fine tuning of the installation directories: --sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --localstatedir=DIR modifiable single-machine data [PREFIX/var] + --runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run] --libdir=DIR object code libraries [EPREFIX/lib] --includedir=DIR C header files [PREFIX/include] --oldincludedir=DIR C header files for non-gcc [/usr/include] @@ -15235,6 +15247,43 @@ fi done +for ac_header in sys/cpuset.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "sys/cpuset.h" "ac_cv_header_sys_cpuset_h" "$ac_includes_default" +if test "x$ac_cv_header_sys_cpuset_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_SYS_CPUSET_H 1 +_ACEOF + +fi + +done + +for ac_header in sys/procset.h +do : + ac_fn_c_check_header_mongrel "$LINENO" "sys/procset.h" "ac_cv_header_sys_procset_h" "$ac_includes_default" +if test "x$ac_cv_header_sys_procset_h" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_SYS_PROCSET_H 1 +_ACEOF + +fi + +done + +for ac_func in pthread_setaffinity_np cpuset_setaffinity processor_bind sched_setaffinity +do : + as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` +ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var" +if eval test \"x\$"$as_ac_var"\" = x"yes"; then : + cat >>confdefs.h <<_ACEOF +#define `$as_echo "HAVE_$ac_func" | $as_tr_cpp` 1 +_ACEOF + +fi +done + + # Look for functions relating to thread naming for ac_func in pthread_setname_np pthread_set_name_np do : diff --git a/configure.ac b/configure.ac index 941053b8a5..2b27558a6e 100644 --- a/configure.ac +++ b/configure.ac @@ -701,6 +701,10 @@ AC_CHECK_HEADERS([sched.h]) AC_SEARCH_LIBS([sched_yield],[rt]) AC_CHECK_FUNCS([sched_yield pthread_yield pthread_yield_np]) +AC_CHECK_HEADERS([sys/cpuset.h]) +AC_CHECK_HEADERS([sys/procset.h]) +AC_CHECK_FUNCS([pthread_setaffinity_np cpuset_setaffinity processor_bind sched_setaffinity]) + # Look for functions relating to thread naming AC_CHECK_FUNCS([pthread_setname_np pthread_set_name_np]) AC_CHECK_HEADERS([pthread_np.h], [], [], [#include ]) diff --git a/lib/isc/pthreads/include/isc/thread.h b/lib/isc/pthreads/include/isc/thread.h index 798af338ac..6d96950028 100644 --- a/lib/isc/pthreads/include/isc/thread.h +++ b/lib/isc/pthreads/include/isc/thread.h @@ -44,6 +44,9 @@ isc_thread_yield(void); void isc_thread_setname(isc_thread_t thread, const char *name); +isc_result_t +isc_thread_setaffinity(int cpu); + /* XXX We could do fancier error handling... */ #define isc_thread_join(t, rp) \ diff --git a/lib/isc/pthreads/thread.c b/lib/isc/pthreads/thread.c index ba7ae52daa..f8029e26ed 100644 --- a/lib/isc/pthreads/thread.c +++ b/lib/isc/pthreads/thread.c @@ -18,6 +18,17 @@ #include #endif +#if defined(HAVE_CPUSET_H) +#include +#include +#endif + +#if defined(HAVE_SYS_PROCESET_H) +#include +#include +#include +#endif + #include #include @@ -91,3 +102,31 @@ isc_thread_yield(void) { pthread_yield_np(); #endif } + +isc_result_t +isc_thread_setaffinity(int cpu) { +#if defined(HAVE_CPUSET_SETAFFINITY) + cpuset_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(cpu, &cpuset); + if (cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, + &cpuset, sizeof(cpuset)) != 0) { + return (ISC_R_FAILURE); + } +#elif defined(HAVE_PTHREAD_SETAFFINITY_NP) + cpu_set_t set; + CPU_ZERO(&set); + CPU_SET(cpu, &set); + if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), + &set) != 0) { + return (ISC_R_FAILURE); + } +#elif defined(HAVE_PROCESSOR_BIND) + if (processor_bind(P_LWPID, P_MYID, cpu, NULL) != 0) { + return (ISC_R_FAILURE); + } +#else + UNUSED(cpu); +#endif + return (ISC_R_SUCCESS); +} diff --git a/lib/isc/win32/include/isc/thread.h b/lib/isc/win32/include/isc/thread.h index be1343afcb..25b83af073 100644 --- a/lib/isc/win32/include/isc/thread.h +++ b/lib/isc/win32/include/isc/thread.h @@ -79,6 +79,9 @@ isc_thread_setconcurrency(unsigned int level); void isc_thread_setname(isc_thread_t, const char *); +isc_result_t +isc_thread_setaffinity(int cpu); + int isc_thread_key_create(isc_thread_key_t *key, void (*func)(void *)); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 88faffb193..c468a89fe2 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -609,6 +609,7 @@ isc_thread_key_create isc_thread_key_delete isc_thread_key_getspecific isc_thread_key_setspecific +isc_thread_setaffinity isc_thread_setconcurrency isc_thread_setname isc_time_add diff --git a/lib/isc/win32/thread.c b/lib/isc/win32/thread.c index ebc2f2936f..7c147164d2 100644 --- a/lib/isc/win32/thread.c +++ b/lib/isc/win32/thread.c @@ -66,6 +66,12 @@ isc_thread_setname(isc_thread_t thread, const char *name) { UNUSED(name); } +isc_result_t +isc_thread_setaffinity(int cpu) { + /* no-op on Windows for now */ + return (ISC_R_SUCCESS); +} + void * isc_thread_key_getspecific(isc_thread_key_t key) { return(TlsGetValue(key)); From 81a85070c591668ec97348641527a33c5deb473b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 11 Oct 2018 13:39:04 +0000 Subject: [PATCH 02/18] Multiple worker queues --- lib/isc/include/isc/task.h | 6 + lib/isc/task.c | 245 +++++++++++++++++++++++------------- lib/isc/win32/libisc.def.in | 2 + 3 files changed, 165 insertions(+), 88 deletions(-) diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index c1bc09047a..7746f1c12d 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -208,6 +208,9 @@ isc_task_detach(isc_task_t **taskp); void isc_task_send(isc_task_t *task, isc_event_t **eventp); + +void +isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c); /*%< * Send '*event' to 'task'. * @@ -221,6 +224,9 @@ isc_task_send(isc_task_t *task, isc_event_t **eventp); *\li *eventp == NULL. */ +void +isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c); + void isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp); /*%< diff --git a/lib/isc/task.c b/lib/isc/task.c index 1d03e79127..0338e0f743 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -31,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -104,6 +106,7 @@ struct isc__task { isc_time_t tnow; char name[16]; void * tag; + int threadid; /* Locked by task manager lock. */ LINK(isc__task_t) link; LINK(isc__task_t) ready_link; @@ -126,22 +129,26 @@ struct isc__taskmgr { isc_taskmgr_t common; isc_mem_t * mctx; isc_mutex_t lock; + isc_mutex_t **locks; unsigned int workers; + unsigned int queues; isc_thread_t * threads; + atomic_uint_fast32_t tasks_running; + atomic_uint_fast32_t tasks_ready; + atomic_uint_fast32_t curq; + /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc__task_t) tasks; - isc__tasklist_t ready_tasks; - isc__tasklist_t ready_priority_tasks; + isc__tasklist_t *ready_tasks; + isc__tasklist_t *ready_priority_tasks; isc_taskmgrmode_t mode; - isc_condition_t work_available; + isc_condition_t *work_available; isc_condition_t exclusive_granted; isc_condition_t paused; - unsigned int tasks_running; - unsigned int tasks_ready; - bool pause_requested; - bool exclusive_requested; - bool exiting; + bool pause_requested; + bool exclusive_requested; + bool exiting; /* * Multiple threads can read/write 'excl' at the same time, so we need @@ -175,13 +182,13 @@ isc_taskmgr_setexcltask(isc_taskmgr_t *mgr0, isc_task_t *task0); isc_result_t isc_taskmgr_excltask(isc_taskmgr_t *mgr0, isc_task_t **taskp); static inline bool -empty_readyq(isc__taskmgr_t *manager); +empty_readyq(isc__taskmgr_t *manager, int c); static inline isc__task_t * -pop_readyq(isc__taskmgr_t *manager); +pop_readyq(isc__taskmgr_t *manager, int c); static inline void -push_readyq(isc__taskmgr_t *manager, isc__task_t *task); +push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c); /*** *** Tasks. @@ -190,7 +197,6 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task); static void task_finished(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; - REQUIRE(EMPTY(task->events)); REQUIRE(task->nevents == 0); REQUIRE(EMPTY(task->on_shutdown)); @@ -201,6 +207,7 @@ task_finished(isc__task_t *task) { LOCK(&manager->lock); UNLINK(manager->tasks, task, link); + UNLOCK(&manager->lock); if (FINISHED(manager)) { /* * All tasks have completed and the @@ -208,10 +215,10 @@ task_finished(isc__task_t *task) { * any idle worker threads so they * can exit. */ - BROADCAST(&manager->work_available); + for (unsigned int i=0; iqueues; i++) { + BROADCAST(&manager->work_available[i]); + } } - UNLOCK(&manager->lock); - DESTROYLOCK(&task->lock); task->common.impmagic = 0; task->common.magic = 0; @@ -235,6 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, return (ISC_R_NOMEMORY); XTRACE("isc_task_create"); task->manager = manager; + task->threadid = -1; result = isc_mutex_init(&task->lock); if (result != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, task, sizeof(*task)); @@ -346,17 +354,17 @@ static inline void task_ready(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; bool has_privilege = isc_task_privilege((isc_task_t *) task); + int queue = task->threadid % manager->queues; REQUIRE(VALID_MANAGER(manager)); REQUIRE(task->state == task_state_ready); XTRACE("task_ready"); - - LOCK(&manager->lock); - push_readyq(manager, task); + LOCK(manager->locks[queue]); + push_readyq(manager, task, queue); if (manager->mode == isc_taskmgrmode_normal || has_privilege) - SIGNAL(&manager->work_available); - UNLOCK(&manager->lock); + SIGNAL(&manager->work_available[queue]); + UNLOCK(manager->locks[queue]); } static inline bool @@ -414,7 +422,7 @@ isc_task_detach(isc_task_t **taskp) { } static inline bool -task_send(isc__task_t *task, isc_event_t **eventp) { +task_send(isc__task_t *task, isc_event_t **eventp, int c) { bool was_idle = false; isc_event_t *event; @@ -433,6 +441,7 @@ task_send(isc__task_t *task, isc_event_t **eventp) { if (task->state == task_state_idle) { was_idle = true; + task->threadid = c; INSIST(EMPTY(task->events)); task->state = task_state_ready; } @@ -447,8 +456,21 @@ task_send(isc__task_t *task, isc_event_t **eventp) { void isc_task_send(isc_task_t *task0, isc_event_t **eventp) { + isc_task_sendto(task0, eventp, -1); +} + +void +isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { + isc_task_sendtoanddetach(taskp, eventp, -1); +} + +void +isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { isc__task_t *task = (isc__task_t *)task0; bool was_idle; + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + } /* * Send '*event' to 'task'. @@ -464,7 +486,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) { * some processing is deferred until after the lock is released. */ LOCK(&task->lock); - was_idle = task_send(task, eventp); + was_idle = task_send(task, eventp, c); UNLOCK(&task->lock); if (was_idle) { @@ -488,7 +510,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) { } void -isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { +isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { bool idle1, idle2; isc__task_t *task; @@ -500,11 +522,13 @@ isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) { REQUIRE(taskp != NULL); task = (isc__task_t *)*taskp; REQUIRE(VALID_TASK(task)); + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + } XTRACE("isc_task_sendanddetach"); - LOCK(&task->lock); - idle1 = task_send(task, eventp); + idle1 = task_send(task, eventp, c); idle2 = task_detach(task); UNLOCK(&task->lock); @@ -829,13 +853,13 @@ isc_task_getcurrenttimex(isc_task_t *task0, isc_time_t *t) { * Caller must hold the task manager lock. */ static inline bool -empty_readyq(isc__taskmgr_t *manager) { +empty_readyq(isc__taskmgr_t *manager, int c) { isc__tasklist_t queue; if (manager->mode == isc_taskmgrmode_normal) - queue = manager->ready_tasks; + queue = manager->ready_tasks[c]; else - queue = manager->ready_priority_tasks; + queue = manager->ready_priority_tasks[c]; return (EMPTY(queue)); } @@ -849,18 +873,18 @@ empty_readyq(isc__taskmgr_t *manager) { * Caller must hold the task manager lock. */ static inline isc__task_t * -pop_readyq(isc__taskmgr_t *manager) { +pop_readyq(isc__taskmgr_t *manager, int c) { isc__task_t *task; if (manager->mode == isc_taskmgrmode_normal) - task = HEAD(manager->ready_tasks); + task = HEAD(manager->ready_tasks[c]); else - task = HEAD(manager->ready_priority_tasks); + task = HEAD(manager->ready_priority_tasks[c]); if (task != NULL) { - DEQUEUE(manager->ready_tasks, task, ready_link); + DEQUEUE(manager->ready_tasks[c], task, ready_link); if (ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks, task, + DEQUEUE(manager->ready_priority_tasks[c], task, ready_priority_link); } @@ -874,20 +898,26 @@ pop_readyq(isc__taskmgr_t *manager) { * Caller must hold the task manager lock. */ static inline void -push_readyq(isc__taskmgr_t *manager, isc__task_t *task) { - ENQUEUE(manager->ready_tasks, task, ready_link); +push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { + ENQUEUE(manager->ready_tasks[c], task, ready_link); if ((task->flags & TASK_F_PRIVILEGED) != 0) - ENQUEUE(manager->ready_priority_tasks, task, + ENQUEUE(manager->ready_priority_tasks[c], task, ready_priority_link); - manager->tasks_ready++; + atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed); } static void -dispatch(isc__taskmgr_t *manager) { +dispatch(isc__taskmgr_t *manager, int threadid) { isc__task_t *task; REQUIRE(VALID_MANAGER(manager)); + /* Wait for everything to initialize */ + LOCK(&manager->lock); + UNLOCK(&manager->lock); + + int queue = threadid % manager->queues; + /* * Again we're trying to hold the lock for as short a time as possible * and to do as little locking and unlocking as possible. @@ -937,8 +967,7 @@ dispatch(isc__taskmgr_t *manager) { * For N iterations of the loop, this code does N+1 locks and N+1 * unlocks. The while expression is always protected by the lock. */ - - LOCK(&manager->lock); + LOCK(manager->locks[queue]); while (!FINISHED(manager)) { /* @@ -951,13 +980,19 @@ dispatch(isc__taskmgr_t *manager) { * If a pause has been requested, don't do any work * until it's been released. */ - while ((empty_readyq(manager) || manager->pause_requested || + while ((empty_readyq(manager, queue) || manager->pause_requested || manager->exclusive_requested) && !FINISHED(manager)) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_WAIT, "wait")); - WAIT(&manager->work_available, &manager->lock); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, + ISC_MSGSET_GENERAL, + ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused")); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, + ISC_MSGSET_GENERAL, + ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq")); + WAIT(&manager->work_available[queue], manager->locks[queue]); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_AWAKE, "awake")); @@ -965,7 +1000,7 @@ dispatch(isc__taskmgr_t *manager) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "working")); - task = pop_readyq(manager); + task = pop_readyq(manager, queue); if (task != NULL) { unsigned int dispatch_count = 0; bool done = false; @@ -980,15 +1015,16 @@ dispatch(isc__taskmgr_t *manager) { * have a task to do. We must reacquire the manager * lock before exiting the 'if (task != NULL)' block. */ - manager->tasks_ready--; - manager->tasks_running++; - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); + atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed); + atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed); LOCK(&task->lock); INSIST(task->state == task_state_ready); task->state = task_state_running; XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_RUNNING, "running")); + XTRACE(task->name); TIME_NOW(&task->tnow); task->now = isc_time_seconds(&task->tnow); do { @@ -1004,6 +1040,7 @@ dispatch(isc__taskmgr_t *manager) { ISC_MSGSET_TASK, ISC_MSG_EXECUTE, "execute action")); + XTRACE(task->name); if (event->ev_action != NULL) { UNLOCK(&task->lock); (event->ev_action)( @@ -1094,8 +1131,7 @@ dispatch(isc__taskmgr_t *manager) { if (finished) task_finished(task); - LOCK(&manager->lock); - manager->tasks_running--; + atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed); if (manager->exclusive_requested && manager->tasks_running == 1) { SIGNAL(&manager->exclusive_granted); @@ -1103,6 +1139,7 @@ dispatch(isc__taskmgr_t *manager) { manager->tasks_running == 0) { SIGNAL(&manager->paused); } + LOCK(manager->locks[queue]); if (requeue) { /* * We know we're awake, so we don't have @@ -1123,7 +1160,7 @@ dispatch(isc__taskmgr_t *manager) { * were usually nonempty, the 'optimization' * might even hurt rather than help. */ - push_readyq(manager, task); + push_readyq(manager, task, queue); } } @@ -1133,27 +1170,38 @@ dispatch(isc__taskmgr_t *manager) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (manager->tasks_running == 0 && empty_readyq(manager)) { - manager->mode = isc_taskmgrmode_normal; - if (!empty_readyq(manager)) - BROADCAST(&manager->work_available); + if (manager->tasks_running == 0 && empty_readyq(manager, queue)) { + if (manager->mode != isc_taskmgrmode_normal) { + manager->mode = isc_taskmgrmode_normal; + for (unsigned i=0; i < manager->workers; i++) { + BROADCAST(&manager->work_available[i]); + } + } } } - - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); } +typedef struct st { + isc__taskmgr_t *manager; + int threadid; +} stt; + static isc_threadresult_t #ifdef _WIN32 WINAPI #endif run(void *uap) { - isc__taskmgr_t *manager = uap; + stt *st = uap; + isc__taskmgr_t *manager = st->manager; + int threadid = st->threadid; + isc_mem_put(manager->mctx, st, sizeof(*st)); + isc_thread_setaffinity(threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_STARTING, "starting")); - dispatch(manager); + dispatch(manager, threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_EXITING, "exiting")); @@ -1170,7 +1218,7 @@ manager_free(isc__taskmgr_t *manager) { isc_mem_t *mctx; (void)isc_condition_destroy(&manager->exclusive_granted); - (void)isc_condition_destroy(&manager->work_available); + (void)isc_condition_destroy(&manager->work_available[0]); (void)isc_condition_destroy(&manager->paused); isc_mem_free(manager->mctx, manager->threads); DESTROYLOCK(&manager->lock); @@ -1214,20 +1262,13 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, } manager->workers = 0; + manager->queues = 0; manager->threads = isc_mem_allocate(mctx, workers * sizeof(isc_thread_t)); if (manager->threads == NULL) { result = ISC_R_NOMEMORY; goto cleanup_lock; } - if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_threads; - } if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) { UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_condition_init() %s", @@ -1248,12 +1289,15 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, default_quantum = DEFAULT_DEFAULT_QUANTUM; manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - INIT_LIST(manager->ready_tasks); - INIT_LIST(manager->ready_priority_tasks); + manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t)); + manager->locks = malloc(workers * sizeof(isc_mutex_t *)); + manager->work_available = malloc(workers * sizeof(isc_condition_t)); + manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t)); manager->tasks_running = 0; manager->tasks_ready = 0; manager->exclusive_requested = false; manager->pause_requested = false; + manager->curq = 0; manager->exiting = false; manager->excl = NULL; @@ -1264,17 +1308,33 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * Start workers. */ for (i = 0; i < workers; i++) { - if (isc_thread_create(run, manager, - &manager->threads[manager->workers]) == - ISC_R_SUCCESS) { - char name[16]; /* thread name limit on Linux */ - snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->threads[manager->workers], - name); - manager->workers++; - started++; + INIT_LIST(manager->ready_tasks[i]); + INIT_LIST(manager->ready_priority_tasks[i]); + manager->locks[i] = malloc(4096); + isc_mutex_init(manager->locks[i]); + if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) { + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_condition_init() %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + result = ISC_R_UNEXPECTED; + goto cleanup_threads; } + stt *st = isc_mem_get(mctx, sizeof(stt)); + st->manager = manager; + st->threadid = i; + if (isc_thread_create(run, st, + &manager->threads[i]) != ISC_R_SUCCESS) { + goto cleanup_threads; + } + char name[16]; /* thread name limit on Linux */ + snprintf(name, sizeof(name), "isc-worker%04u", i); + isc_thread_setname(manager->threads[manager->workers], + name); + manager->workers++; + started++; } + manager->queues = manager->workers; UNLOCK(&manager->lock); if (started == 0) { @@ -1290,7 +1350,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, cleanup_exclusivegranted: (void)isc_condition_destroy(&manager->exclusive_granted); cleanup_workavailable: - (void)isc_condition_destroy(&manager->work_available); + (void)isc_condition_destroy(&manager->work_available[0]); cleanup_threads: isc_mem_free(mctx, manager->threads); cleanup_lock: @@ -1362,7 +1422,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) - push_readyq(manager, task); + push_readyq(manager, task, 0); UNLOCK(&task->lock); } /* @@ -1370,7 +1430,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * there's work left to do, and if there are already no tasks left * it will cause the workers to see manager->exiting. */ - BROADCAST(&manager->work_available); + for (i = 0; i < manager->queues; i++) { + LOCK(manager->locks[i]); + BROADCAST(&manager->work_available[i]); + UNLOCK(manager->locks[i]); + } UNLOCK(&manager->lock); /* @@ -1421,7 +1485,7 @@ isc__taskmgr_resume(isc_taskmgr_t *manager0) { LOCK(&manager->lock); if (manager->pause_requested) { manager->pause_requested = false; - BROADCAST(&manager->work_available); + BROADCAST(&manager->work_available[0]); } UNLOCK(&manager->lock); } @@ -1493,7 +1557,9 @@ isc_task_endexclusive(isc_task_t *task0) { LOCK(&manager->lock); REQUIRE(manager->exclusive_requested); manager->exclusive_requested = false; - BROADCAST(&manager->work_available); + for (unsigned int i=0; i < manager->workers; i++) { + BROADCAST(&manager->work_available[i]); + } UNLOCK(&manager->lock); } @@ -1502,6 +1568,7 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { REQUIRE(ISCAPI_TASK_VALID(task0)); isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; + int queue = task->threadid % manager->queues; bool oldpriv; LOCK(&task->lock); @@ -1515,14 +1582,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { if (priv == oldpriv) return; - LOCK(&manager->lock); + LOCK(manager->locks[queue]); if (priv && ISC_LINK_LINKED(task, ready_link)) - ENQUEUE(manager->ready_priority_tasks, task, + ENQUEUE(manager->ready_priority_tasks[queue], task, ready_priority_link); else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks, task, + DEQUEUE(manager->ready_priority_tasks[queue], task, ready_priority_link); - UNLOCK(&manager->lock); + UNLOCK(manager->locks[queue]); } bool @@ -1575,11 +1642,13 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) { TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */ TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running)); + TRY0(xmlTextWriterWriteFormatString(writer, "%d", + (int) mgr->tasks_running)); TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */ TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready")); - TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_ready)); + TRY0(xmlTextWriterWriteFormatString(writer, "%d", + (int) mgr->tasks_ready)); TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */ TRY0(xmlTextWriterEndElement(writer)); /* thread-model */ diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index c468a89fe2..2133f5248f 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -580,6 +580,8 @@ isc_task_purgeevent isc_task_purgerange isc_task_send isc_task_sendanddetach +isc_task_sendto +isc_task_sendtoanddetach isc_task_setname isc_task_setprivilege isc_task_shutdown From d7be8afea59b432c9b36796069ed585d29190b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 18 Oct 2018 18:16:25 +0000 Subject: [PATCH 03/18] Taskmgr shutdown fixes --- lib/isc/task.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 0338e0f743..e8d2fc711c 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1180,6 +1180,15 @@ dispatch(isc__taskmgr_t *manager, int threadid) { } } UNLOCK(manager->locks[queue]); + /* + * There might be other dispatchers waiting on empty tasks, + * wake them up. + */ + for (unsigned i=0; i < manager->workers; i++) { + LOCK(manager->locks[i]); + BROADCAST(&manager->work_available[i]); + UNLOCK(manager->locks[i]); + } } typedef struct st { @@ -1421,8 +1430,10 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { task != NULL; task = NEXT(task, link)) { LOCK(&task->lock); - if (task_shutdown(task)) - push_readyq(manager, task, 0); + if (task_shutdown(task)) { + int queue = task->threadid % manager->queues; + push_readyq(manager, task, queue); + } UNLOCK(&task->lock); } /* From 818d63a3a17a4e354934c3ac579f63184742b00d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Fri, 19 Oct 2018 10:13:20 +0000 Subject: [PATCH 04/18] Always restart dispatchers on empty readyq --- lib/isc/task.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index e8d2fc711c..e2cf9793f5 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1171,11 +1171,9 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * point and continue with the regular ready queue. */ if (manager->tasks_running == 0 && empty_readyq(manager, queue)) { - if (manager->mode != isc_taskmgrmode_normal) { - manager->mode = isc_taskmgrmode_normal; - for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->work_available[i]); - } + manager->mode = isc_taskmgrmode_normal; + for (unsigned i=0; i < manager->workers; i++) { + BROADCAST(&manager->work_available[i]); } } } From c416389d32e4baf6265262aad4b0411355b8424a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 22 Oct 2018 09:37:17 +0000 Subject: [PATCH 05/18] Separate structure for each thread/queue; 2-phase-locking for exclusive tasks --- lib/isc/task.c | 348 +++++++++++++++++++++++-------------------------- 1 file changed, 164 insertions(+), 184 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index e2cf9793f5..d36143d32a 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -43,18 +43,6 @@ #include #endif -/*% - * For BIND9 internal applications: - * when built with threads we use multiple worker threads shared by the whole - * application. - * when built without threads we share a single global task manager and use - * an integrated event loop for socket, timer, and other generic task events. - * For generic library: - * we don't use either of them: an application can have multiple task managers - * whether or not it's threaded, and if the application is threaded each thread - * is expected to have a separate manager; no "worker threads" are shared by - * the application threads. - */ #ifdef ISC_TASK_TRACE #define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \ task, isc_thread_self(), (m)) @@ -88,6 +76,7 @@ static const char *statenames[] = { typedef struct isc__task isc__task_t; typedef struct isc__taskmgr isc__taskmgr_t; +typedef struct isc__taskqueue isc__taskqueue_t; struct isc__task { /* Not locked. */ @@ -124,32 +113,43 @@ struct isc__task { typedef ISC_LIST(isc__task_t) isc__tasklist_t; +struct isc__taskqueue { + isc__tasklist_t ready_tasks; + isc__tasklist_t ready_priority_tasks; + isc_condition_t work_available; + isc_mutex_t lock; + isc_thread_t thread; + unsigned int threadid; + isc__taskmgr_t *manager; +}; + struct isc__taskmgr { /* Not locked. */ isc_taskmgr_t common; isc_mem_t * mctx; isc_mutex_t lock; - isc_mutex_t **locks; + isc_mutex_t prehalt_lock; + isc_mutex_t posthalt_lock; + isc_condition_t halt_cond; + isc_condition_t halt_avail_cond; unsigned int workers; - unsigned int queues; - isc_thread_t * threads; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; atomic_uint_fast32_t curq; + isc__taskqueue_t *queues; /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc__task_t) tasks; - isc__tasklist_t *ready_tasks; - isc__tasklist_t *ready_priority_tasks; isc_taskmgrmode_t mode; - isc_condition_t *work_available; - isc_condition_t exclusive_granted; - isc_condition_t paused; bool pause_requested; bool exclusive_requested; bool exiting; + /* Locked by {pre/post}halt_lock combo */ + unsigned int halted; + + /* * Multiple threads can read/write 'excl' at the same time, so we need * to protect the access. We can't use 'lock' since isc_task_detach() @@ -215,8 +215,8 @@ task_finished(isc__task_t *task) { * any idle worker threads so they * can exit. */ - for (unsigned int i=0; iqueues; i++) { - BROADCAST(&manager->work_available[i]); + for (unsigned int i=0; iworkers; i++) { + BROADCAST(&manager->queues[i].work_available); } } DESTROYLOCK(&task->lock); @@ -242,7 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, return (ISC_R_NOMEMORY); XTRACE("isc_task_create"); task->manager = manager; - task->threadid = -1; + task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers; result = isc_mutex_init(&task->lock); if (result != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, task, sizeof(*task)); @@ -354,17 +354,16 @@ static inline void task_ready(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; bool has_privilege = isc_task_privilege((isc_task_t *) task); - int queue = task->threadid % manager->queues; REQUIRE(VALID_MANAGER(manager)); REQUIRE(task->state == task_state_ready); XTRACE("task_ready"); - LOCK(manager->locks[queue]); - push_readyq(manager, task, queue); + LOCK(&manager->queues[task->threadid].lock); + push_readyq(manager, task, task->threadid); if (manager->mode == isc_taskmgrmode_normal || has_privilege) - SIGNAL(&manager->work_available[queue]); - UNLOCK(manager->locks[queue]); + SIGNAL(&manager->queues[task->threadid].work_available); + UNLOCK(&manager->queues[task->threadid].lock); } static inline bool @@ -468,9 +467,6 @@ void isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { isc__task_t *task = (isc__task_t *)task0; bool was_idle; - if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); - } /* * Send '*event' to 'task'. @@ -478,6 +474,12 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { REQUIRE(VALID_TASK(task)); + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed) + % task->manager->workers; + } + XTRACE("isc_task_send"); /* @@ -523,7 +525,9 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { task = (isc__task_t *)*taskp; REQUIRE(VALID_TASK(task)); if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed) + % task->manager->workers; } XTRACE("isc_task_sendanddetach"); @@ -857,9 +861,9 @@ empty_readyq(isc__taskmgr_t *manager, int c) { isc__tasklist_t queue; if (manager->mode == isc_taskmgrmode_normal) - queue = manager->ready_tasks[c]; + queue = manager->queues[c].ready_tasks; else - queue = manager->ready_priority_tasks[c]; + queue = manager->queues[c].ready_priority_tasks; return (EMPTY(queue)); } @@ -877,14 +881,14 @@ pop_readyq(isc__taskmgr_t *manager, int c) { isc__task_t *task; if (manager->mode == isc_taskmgrmode_normal) - task = HEAD(manager->ready_tasks[c]); + task = HEAD(manager->queues[c].ready_tasks); else - task = HEAD(manager->ready_priority_tasks[c]); + task = HEAD(manager->queues[c].ready_priority_tasks); if (task != NULL) { - DEQUEUE(manager->ready_tasks[c], task, ready_link); + DEQUEUE(manager->queues[c].ready_tasks, task, ready_link); if (ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks[c], task, + DEQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); } @@ -899,9 +903,9 @@ pop_readyq(isc__taskmgr_t *manager, int c) { */ static inline void push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { - ENQUEUE(manager->ready_tasks[c], task, ready_link); + ENQUEUE(manager->queues[c].ready_tasks, task, ready_link); if ((task->flags & TASK_F_PRIVILEGED) != 0) - ENQUEUE(manager->ready_priority_tasks[c], task, + ENQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed); } @@ -916,8 +920,6 @@ dispatch(isc__taskmgr_t *manager, int threadid) { LOCK(&manager->lock); UNLOCK(&manager->lock); - int queue = threadid % manager->queues; - /* * Again we're trying to hold the lock for as short a time as possible * and to do as little locking and unlocking as possible. @@ -967,7 +969,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * For N iterations of the loop, this code does N+1 locks and N+1 * unlocks. The while expression is always protected by the lock. */ - LOCK(manager->locks[queue]); + LOCK(&manager->queues[threadid].lock); while (!FINISHED(manager)) { /* @@ -980,8 +982,8 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * If a pause has been requested, don't do any work * until it's been released. */ - while ((empty_readyq(manager, queue) || manager->pause_requested || - manager->exclusive_requested) && !FINISHED(manager)) + while ((empty_readyq(manager, threadid) && !manager->pause_requested && + !manager->exclusive_requested) && !FINISHED(manager)) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -992,7 +994,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq")); - WAIT(&manager->work_available[queue], manager->locks[queue]); + WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_AWAKE, "awake")); @@ -1000,7 +1002,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "working")); - task = pop_readyq(manager, queue); + if (manager->pause_requested || manager->exclusive_requested) { + UNLOCK(&manager->queues[threadid].lock); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, + ISC_MSG_WORKING, "halting")); + + /* + * First we increase 'halted' and signal the thread + * that's waiting on exclusivity/pause. Then we + * try locking posthalt lock, which will be locked + * by exclusive/pausing thread. It is only unlocked + * after exclusivity/pause is done. + */ + LOCK(&manager->prehalt_lock); + manager->halted++; + SIGNAL(&manager->halt_cond); + UNLOCK(&manager->prehalt_lock); + + LOCK(&manager->posthalt_lock); + manager->halted--; + UNLOCK(&manager->posthalt_lock); + + LOCK(&manager->queues[threadid].lock); + /* Restart the loop after */ + continue; + } + + task = pop_readyq(manager, threadid); if (task != NULL) { unsigned int dispatch_count = 0; bool done = false; @@ -1015,7 +1043,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * have a task to do. We must reacquire the manager * lock before exiting the 'if (task != NULL)' block. */ - UNLOCK(manager->locks[queue]); + UNLOCK(&manager->queues[threadid].lock); atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed); atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed); @@ -1132,14 +1160,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { task_finished(task); atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed); - if (manager->exclusive_requested && - manager->tasks_running == 1) { - SIGNAL(&manager->exclusive_granted); - } else if (manager->pause_requested && - manager->tasks_running == 0) { - SIGNAL(&manager->paused); - } - LOCK(manager->locks[queue]); + LOCK(&manager->queues[threadid].lock); if (requeue) { /* * We know we're awake, so we don't have @@ -1160,7 +1181,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * were usually nonempty, the 'optimization' * might even hurt rather than help. */ - push_readyq(manager, task, queue); + push_readyq(manager, task, threadid); } } @@ -1170,39 +1191,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (manager->tasks_running == 0 && empty_readyq(manager, queue)) { + if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) { manager->mode = isc_taskmgrmode_normal; for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->work_available[i]); + BROADCAST(&manager->queues[i].work_available); } } } - UNLOCK(manager->locks[queue]); + UNLOCK(&manager->queues[threadid].lock); /* * There might be other dispatchers waiting on empty tasks, * wake them up. */ for (unsigned i=0; i < manager->workers; i++) { - LOCK(manager->locks[i]); - BROADCAST(&manager->work_available[i]); - UNLOCK(manager->locks[i]); + LOCK(&manager->queues[i].lock); + BROADCAST(&manager->queues[i].work_available); + UNLOCK(&manager->queues[i].lock); } } -typedef struct st { - isc__taskmgr_t *manager; - int threadid; -} stt; - static isc_threadresult_t #ifdef _WIN32 WINAPI #endif -run(void *uap) { - stt *st = uap; - isc__taskmgr_t *manager = st->manager; - int threadid = st->threadid; - isc_mem_put(manager->mctx, st, sizeof(*st)); +run(void *queuep) { + isc__taskqueue_t *tq = queuep; + isc__taskmgr_t *manager = tq->manager; + int threadid = tq->threadid; isc_thread_setaffinity(threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -1222,19 +1237,15 @@ run(void *uap) { static void manager_free(isc__taskmgr_t *manager) { - isc_mem_t *mctx; + /* TODO */ - (void)isc_condition_destroy(&manager->exclusive_granted); - (void)isc_condition_destroy(&manager->work_available[0]); - (void)isc_condition_destroy(&manager->paused); - isc_mem_free(manager->mctx, manager->threads); DESTROYLOCK(&manager->lock); - DESTROYLOCK(&manager->excl_lock); + DESTROYLOCK(&manager->prehalt_lock); + DESTROYLOCK(&manager->posthalt_lock); + isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t)); manager->common.impmagic = 0; manager->common.magic = 0; - mctx = manager->mctx; - isc_mem_put(mctx, manager, sizeof(*manager)); - isc_mem_detach(&mctx); + isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); } isc_result_t @@ -1242,7 +1253,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, unsigned int default_quantum, isc_taskmgr_t **managerp) { isc_result_t result; - unsigned int i, started = 0; + unsigned int i; isc__taskmgr_t *manager; /* @@ -1268,45 +1279,26 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, goto cleanup_mgr; } - manager->workers = 0; - manager->queues = 0; - manager->threads = isc_mem_allocate(mctx, - workers * sizeof(isc_thread_t)); - if (manager->threads == NULL) { - result = ISC_R_NOMEMORY; - goto cleanup_lock; - } - if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_workavailable; - } - if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_exclusivegranted; - } + RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS); + + RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS); + + manager->workers = workers; + if (default_quantum == 0) default_quantum = DEFAULT_DEFAULT_QUANTUM; manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t)); - manager->locks = malloc(workers * sizeof(isc_mutex_t *)); - manager->work_available = malloc(workers * sizeof(isc_condition_t)); - manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t)); + manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); manager->tasks_running = 0; manager->tasks_ready = 0; - manager->exclusive_requested = false; - manager->pause_requested = false; manager->curq = 0; manager->exiting = false; manager->excl = NULL; + manager->halted = 0; + manager->exclusive_requested = false; + manager->pause_requested = false; isc_mem_attach(mctx, &manager->mctx); @@ -1315,53 +1307,30 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * Start workers. */ for (i = 0; i < workers; i++) { - INIT_LIST(manager->ready_tasks[i]); - INIT_LIST(manager->ready_priority_tasks[i]); - manager->locks[i] = malloc(4096); - isc_mutex_init(manager->locks[i]); - if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_threads; - } - stt *st = isc_mem_get(mctx, sizeof(stt)); - st->manager = manager; - st->threadid = i; - if (isc_thread_create(run, st, - &manager->threads[i]) != ISC_R_SUCCESS) { - goto cleanup_threads; - } - char name[16]; /* thread name limit on Linux */ + INIT_LIST(manager->queues[i].ready_tasks); + INIT_LIST(manager->queues[i].ready_priority_tasks); + RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock) + == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_condition_init( + &manager->queues[i].work_available) + == ISC_R_SUCCESS); + manager->queues[i].manager = manager; + manager->queues[i].threadid = i; + RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i], + &manager->queues[i].thread) + == ISC_R_SUCCESS); + char name[16]; snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->threads[manager->workers], - name); - manager->workers++; - started++; + isc_thread_setname(manager->queues[i].thread, name); } - manager->queues = manager->workers; UNLOCK(&manager->lock); - if (started == 0) { - manager_free(manager); - return (ISC_R_NOTHREADS); - } isc_thread_setconcurrency(workers); *managerp = (isc_taskmgr_t *)manager; return (ISC_R_SUCCESS); - cleanup_exclusivegranted: - (void)isc_condition_destroy(&manager->exclusive_granted); - cleanup_workavailable: - (void)isc_condition_destroy(&manager->work_available[0]); - cleanup_threads: - isc_mem_free(mctx, manager->threads); - cleanup_lock: - DESTROYLOCK(&manager->lock); cleanup_mgr: isc_mem_put(mctx, manager, sizeof(*manager)); return (result); @@ -1429,8 +1398,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) { - int queue = task->threadid % manager->queues; - push_readyq(manager, task, queue); + push_readyq(manager, task, task->threadid); } UNLOCK(&task->lock); } @@ -1439,10 +1407,10 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * there's work left to do, and if there are already no tasks left * it will cause the workers to see manager->exiting. */ - for (i = 0; i < manager->queues; i++) { - LOCK(manager->locks[i]); - BROADCAST(&manager->work_available[i]); - UNLOCK(manager->locks[i]); + for (i = 0; i < manager->workers; i++) { + LOCK(&manager->queues[i].lock); + BROADCAST(&manager->queues[i].work_available); + UNLOCK(&manager->queues[i].lock); } UNLOCK(&manager->lock); @@ -1450,7 +1418,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * Wait for all the worker threads to exit. */ for (i = 0; i < manager->workers; i++) - (void)isc_thread_join(manager->threads[i], NULL); + (void)isc_thread_join(manager->queues[i].thread, NULL); manager_free(manager); @@ -1479,24 +1447,33 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) { void isc__taskmgr_pause(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; - manager->pause_requested = true; - LOCK(&manager->lock); - while (manager->tasks_running > 0) { - WAIT(&manager->paused, &manager->lock); + unsigned int i; + + LOCK(&manager->posthalt_lock); + while (manager->exclusive_requested || manager->pause_requested) { + UNLOCK(&manager->posthalt_lock); + /* This is ugly but pause is used EXCLUSIVELY in tests */ + isc_thread_yield(); + LOCK(&manager->posthalt_lock); } - UNLOCK(&manager->lock); + manager->pause_requested = true; + LOCK(&manager->prehalt_lock); + while (manager->halted < manager->workers) { + for (i = 0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } + WAIT(&manager->halt_cond, &manager->prehalt_lock); + } + UNLOCK(&manager->prehalt_lock); } void isc__taskmgr_resume(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; - - LOCK(&manager->lock); if (manager->pause_requested) { manager->pause_requested = false; - BROADCAST(&manager->work_available[0]); + UNLOCK(&manager->posthalt_lock); } - UNLOCK(&manager->lock); } void @@ -1535,24 +1512,32 @@ isc_result_t isc_task_beginexclusive(isc_task_t *task0) { isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; + unsigned int i; + REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); + /* * TODO REQUIRE(task == task->manager->excl); * it should be here, it fails on shutdown server->task */ - LOCK(&manager->lock); - if (manager->exclusive_requested) { - UNLOCK(&manager->lock); + if (manager->exclusive_requested || manager->pause_requested) { return (ISC_R_LOCKBUSY); } + + LOCK(&manager->posthalt_lock); + INSIST(!manager->exclusive_requested && !manager->pause_requested); manager->exclusive_requested = true; - while (manager->tasks_running > 1) { - WAIT(&manager->exclusive_granted, &manager->lock); + LOCK(&manager->prehalt_lock); + while (manager->halted + 1 < manager->workers) { + for (i = 0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } + WAIT(&manager->halt_cond, &manager->prehalt_lock); } - UNLOCK(&manager->lock); + UNLOCK(&manager->prehalt_lock); return (ISC_R_SUCCESS); } @@ -1563,13 +1548,9 @@ isc_task_endexclusive(isc_task_t *task0) { REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); - LOCK(&manager->lock); REQUIRE(manager->exclusive_requested); manager->exclusive_requested = false; - for (unsigned int i=0; i < manager->workers; i++) { - BROADCAST(&manager->work_available[i]); - } - UNLOCK(&manager->lock); + UNLOCK(&manager->posthalt_lock); } void @@ -1577,7 +1558,6 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { REQUIRE(ISCAPI_TASK_VALID(task0)); isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; - int queue = task->threadid % manager->queues; bool oldpriv; LOCK(&task->lock); @@ -1591,14 +1571,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { if (priv == oldpriv) return; - LOCK(manager->locks[queue]); + LOCK(&manager->queues[task->threadid].lock); if (priv && ISC_LINK_LINKED(task, ready_link)) - ENQUEUE(manager->ready_priority_tasks[queue], task, - ready_priority_link); + ENQUEUE(manager->queues[task->threadid].ready_priority_tasks, + task, ready_priority_link); else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks[queue], task, - ready_priority_link); - UNLOCK(manager->locks[queue]); + DEQUEUE(manager->queues[task->threadid].ready_priority_tasks, + task, ready_priority_link); + UNLOCK(&manager->queues[task->threadid].lock); } bool From 669a694d3bce30407afe9a1a9595d2a02819fd73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 22 Oct 2018 12:26:27 +0000 Subject: [PATCH 06/18] Post shutting down tasks always to manager 0 --- lib/isc/task.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index d36143d32a..9c1f769b8b 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1391,17 +1391,21 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { /* * Post shutdown event(s) to every task (if they haven't already been - * posted). + * posted). To make things easier post idle tasks to worker 0. */ + LOCK(&manager->queues[0].lock); for (task = HEAD(manager->tasks); task != NULL; task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) { - push_readyq(manager, task, task->threadid); + task->threadid = 0; + push_readyq(manager, task, 0); } UNLOCK(&task->lock); } + UNLOCK(&manager->queues[0].lock); + /* * Wake up any sleeping workers. This ensures we get work done if * there's work left to do, and if there are already no tasks left From 64020dd7bca3163441065d5434dad4051b2ab056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 22 Oct 2018 10:57:05 +0000 Subject: [PATCH 07/18] Make sure all priority tasks are done before entering normal execution --- lib/isc/task.c | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 9c1f769b8b..f6a4dc003f 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -43,6 +43,14 @@ #include #endif +/* + * Task manager is built around 'as little locking as possible' concept. + * Each thread has his own queue of tasks to be run, if a task is in running + * state it will stay on the runner it's currently on, if a task is in idle + * state it can be woken up on a specific runner with isc_task_sendto - that + * helps with data locality on CPU. + */ + #ifdef ISC_TASK_TRACE #define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \ task, isc_thread_self(), (m)) @@ -95,7 +103,7 @@ struct isc__task { isc_time_t tnow; char name[16]; void * tag; - int threadid; + unsigned int threadid; /* Locked by task manager lock. */ LINK(isc__task_t) link; LINK(isc__task_t) ready_link; @@ -911,7 +919,7 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { } static void -dispatch(isc__taskmgr_t *manager, int threadid) { +dispatch(isc__taskmgr_t *manager, unsigned int threadid) { isc__task_t *task; REQUIRE(VALID_MANAGER(manager)); @@ -1191,10 +1199,22 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) { - manager->mode = isc_taskmgrmode_normal; - for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->queues[i].work_available); + if (atomic_load(&manager->tasks_running) == 0 && manager->mode != isc_taskmgrmode_normal) { + bool empty = true; + for (unsigned i=0; iworkers && empty; i++) { + if (i != threadid) { + LOCK(&manager->queues[i].lock); + } + empty &= empty_readyq(manager, i); + if (i != threadid) { + UNLOCK(&manager->queues[i].lock); + } + } + if (empty) { + manager->mode = isc_taskmgrmode_normal; + for (unsigned i=0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } } } } From c80e25e482cf8cdf0818326e264b3d22f257b811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Mon, 22 Oct 2018 11:18:45 +0000 Subject: [PATCH 08/18] Get rid of isc_taskmgr_setmode, we only use it to set privileged mode --- bin/named/server.c | 3 +-- lib/isc/include/isc/task.h | 2 +- lib/isc/task.c | 4 ++-- lib/isc/tests/task_test.c | 15 ++++++--------- lib/isc/win32/libisc.def.in | 2 +- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/bin/named/server.c b/bin/named/server.c index e9773fba55..4f14900f7f 100644 --- a/bin/named/server.c +++ b/bin/named/server.c @@ -9327,8 +9327,7 @@ load_zones(named_server_t *server, bool init, bool reconfig) { * the initial server setup; it isn't necessary during * a reload.) */ - isc_taskmgr_setmode(named_g_taskmgr, - isc_taskmgrmode_privileged); + isc_taskmgr_setprivilegedmode(named_g_taskmgr); } isc_task_endexclusive(server->task); diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index 7746f1c12d..d3146f51a7 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -672,7 +672,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, */ void -isc_taskmgr_setmode(isc_taskmgr_t *manager, isc_taskmgrmode_t mode); +isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager); isc_taskmgrmode_t isc_taskmgr_mode(isc_taskmgr_t *manager); diff --git a/lib/isc/task.c b/lib/isc/task.c index f6a4dc003f..2dd05a4e96 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1450,11 +1450,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { } void -isc_taskmgr_setmode(isc_taskmgr_t *manager0, isc_taskmgrmode_t mode) { +isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; LOCK(&manager->lock); - manager->mode = mode; + manager->mode = isc_taskmgrmode_privileged; UNLOCK(&manager->lock); } diff --git a/lib/isc/tests/task_test.c b/lib/isc/tests/task_test.c index 2cb299d4de..fb1ce8499b 100644 --- a/lib/isc/tests/task_test.c +++ b/lib/isc/tests/task_test.c @@ -67,7 +67,6 @@ set_and_drop(isc_task_t *task, isc_event_t *event) { *value = (int) isc_taskmgr_mode(taskmgr); counter++; UNLOCK(&lock); - isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_normal); } /* @@ -230,7 +229,7 @@ ATF_TC_BODY(privileged_events, tc) { isc_task_send(task2, &event); ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_privileged); + isc_taskmgr_setprivilegedmode(taskmgr); ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); isc__taskmgr_resume(taskmgr); @@ -351,7 +350,7 @@ ATF_TC_BODY(privilege_drop, tc) { isc_task_send(task2, &event); ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal); - isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_privileged); + isc_taskmgr_setprivilegedmode(taskmgr); ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged); isc__taskmgr_resume(taskmgr); @@ -363,14 +362,12 @@ ATF_TC_BODY(privilege_drop, tc) { } /* - * We can't guarantee what order the events fire, but - * we do know *exactly one* of the privileged tasks will - * have run in privileged mode... + * We need to check that all privilege mode events were fired + * in privileged mode, and non privileged in non-privileged. */ - ATF_CHECK(a == isc_taskmgrmode_privileged || - c == isc_taskmgrmode_privileged || + ATF_CHECK(a == isc_taskmgrmode_privileged && + c == isc_taskmgrmode_privileged && d == isc_taskmgrmode_privileged); - ATF_CHECK(a + c + d == isc_taskmgrmode_privileged); /* ...and neither of the non-privileged tasks did... */ ATF_CHECK(b == isc_taskmgrmode_normal || e == isc_taskmgrmode_normal); diff --git a/lib/isc/win32/libisc.def.in b/lib/isc/win32/libisc.def.in index 2133f5248f..e323f54d04 100644 --- a/lib/isc/win32/libisc.def.in +++ b/lib/isc/win32/libisc.def.in @@ -598,7 +598,7 @@ isc_taskmgr_renderjson isc_taskmgr_renderxml @END LIBXML2 isc_taskmgr_setexcltask -isc_taskmgr_setmode +isc_taskmgr_setprivilegedmode isc_taskpool_create isc_taskpool_destroy isc_taskpool_expand From 17d46fd48bb070243f6b299dd4d30d98a4b13601 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 23 Oct 2018 08:20:17 +0000 Subject: [PATCH 09/18] Formatting --- lib/isc/task.c | 49 +++++++++++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 2dd05a4e96..b749fd5625 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -122,10 +122,11 @@ struct isc__task { typedef ISC_LIST(isc__task_t) isc__tasklist_t; struct isc__taskqueue { + /* Everything locked by lock */ + isc_mutex_t lock; isc__tasklist_t ready_tasks; isc__tasklist_t ready_priority_tasks; isc_condition_t work_available; - isc_mutex_t lock; isc_thread_t thread; unsigned int threadid; isc__taskmgr_t *manager; @@ -157,7 +158,6 @@ struct isc__taskmgr { /* Locked by {pre/post}halt_lock combo */ unsigned int halted; - /* * Multiple threads can read/write 'excl' at the same time, so we need * to protect the access. We can't use 'lock' since isc_task_detach() @@ -250,7 +250,9 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, return (ISC_R_NOMEMORY); XTRACE("isc_task_create"); task->manager = manager; - task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers; + task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, + memory_order_relaxed) + % manager->workers; result = isc_mutex_init(&task->lock); if (result != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, task, sizeof(*task)); @@ -1257,12 +1259,14 @@ run(void *queuep) { static void manager_free(isc__taskmgr_t *manager) { - /* TODO */ - + for (unsigned int i=0; i < manager->workers; i++) { + DESTROYLOCK(&manager->queues[i].lock); + } DESTROYLOCK(&manager->lock); DESTROYLOCK(&manager->prehalt_lock); DESTROYLOCK(&manager->posthalt_lock); - isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t)); + isc_mem_put(manager->mctx, manager->queues, + manager->workers * sizeof(isc__taskqueue_t)); manager->common.impmagic = 0; manager->common.magic = 0; isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); @@ -1272,7 +1276,6 @@ isc_result_t isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, unsigned int default_quantum, isc_taskmgr_t **managerp) { - isc_result_t result; unsigned int i; isc__taskmgr_t *manager; @@ -1284,25 +1287,20 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, REQUIRE(managerp != NULL && *managerp == NULL); manager = isc_mem_get(mctx, sizeof(*manager)); - if (manager == NULL) - return (ISC_R_NOMEMORY); + RUNTIME_CHECK(manager != NULL); manager->common.impmagic = TASK_MANAGER_MAGIC; manager->common.magic = ISCAPI_TASKMGR_MAGIC; manager->mode = isc_taskmgrmode_normal; manager->mctx = NULL; - result = isc_mutex_init(&manager->lock); - if (result != ISC_R_SUCCESS) - goto cleanup_mgr; - result = isc_mutex_init(&manager->excl_lock); - if (result != ISC_R_SUCCESS) { - DESTROYLOCK(&manager->lock); - goto cleanup_mgr; - } + RUNTIME_CHECK(isc_mutex_init(&manager->lock) == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->excl_lock) == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS); - - RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) + == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) + == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) + == ISC_R_SUCCESS); manager->workers = workers; @@ -1310,7 +1308,10 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, default_quantum = DEFAULT_DEFAULT_QUANTUM; manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); + manager->queues = isc_mem_get(mctx, workers * + sizeof(isc__taskqueue_t)); + RUNTIME_CHECK(manager->queues != NULL); + manager->tasks_running = 0; manager->tasks_ready = 0; manager->curq = 0; @@ -1350,10 +1351,6 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, *managerp = (isc_taskmgr_t *)manager; return (ISC_R_SUCCESS); - - cleanup_mgr: - isc_mem_put(mctx, manager, sizeof(*manager)); - return (result); } void From 913856911adf171a0d7d866743ceb5311d4dd995 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 23 Oct 2018 08:47:44 +0000 Subject: [PATCH 10/18] Saner exclusive task handling in taskmgr --- lib/isc/task.c | 66 ++++++++++++++++++++++++-------------------------- 1 file changed, 32 insertions(+), 34 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index b749fd5625..2d01c9288c 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -137,10 +137,8 @@ struct isc__taskmgr { isc_taskmgr_t common; isc_mem_t * mctx; isc_mutex_t lock; - isc_mutex_t prehalt_lock; - isc_mutex_t posthalt_lock; + isc_mutex_t halt_lock; isc_condition_t halt_cond; - isc_condition_t halt_avail_cond; unsigned int workers; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; @@ -155,7 +153,7 @@ struct isc__taskmgr { bool exclusive_requested; bool exiting; - /* Locked by {pre/post}halt_lock combo */ + /* Locked by halt_lock */ unsigned int halted; /* @@ -1017,21 +1015,15 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "halting")); - /* - * First we increase 'halted' and signal the thread - * that's waiting on exclusivity/pause. Then we - * try locking posthalt lock, which will be locked - * by exclusive/pausing thread. It is only unlocked - * after exclusivity/pause is done. - */ - LOCK(&manager->prehalt_lock); + LOCK(&manager->halt_lock); manager->halted++; - SIGNAL(&manager->halt_cond); - UNLOCK(&manager->prehalt_lock); - - LOCK(&manager->posthalt_lock); + BROADCAST(&manager->halt_cond); + while (manager->pause_requested || manager->exclusive_requested) { + WAIT(&manager->halt_cond, &manager->halt_lock); + } manager->halted--; - UNLOCK(&manager->posthalt_lock); + SIGNAL(&manager->halt_cond); + UNLOCK(&manager->halt_lock); LOCK(&manager->queues[threadid].lock); /* Restart the loop after */ @@ -1263,8 +1255,7 @@ manager_free(isc__taskmgr_t *manager) { DESTROYLOCK(&manager->queues[i].lock); } DESTROYLOCK(&manager->lock); - DESTROYLOCK(&manager->prehalt_lock); - DESTROYLOCK(&manager->posthalt_lock); + DESTROYLOCK(&manager->halt_lock); isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t)); manager->common.impmagic = 0; @@ -1295,9 +1286,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, RUNTIME_CHECK(isc_mutex_init(&manager->lock) == ISC_R_SUCCESS); RUNTIME_CHECK(isc_mutex_init(&manager->excl_lock) == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) - == ISC_R_SUCCESS); - RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) + RUNTIME_CHECK(isc_mutex_init(&manager->halt_lock) == ISC_R_SUCCESS); RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS); @@ -1470,31 +1459,36 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; unsigned int i; - LOCK(&manager->posthalt_lock); + LOCK(&manager->halt_lock); while (manager->exclusive_requested || manager->pause_requested) { - UNLOCK(&manager->posthalt_lock); + UNLOCK(&manager->halt_lock); /* This is ugly but pause is used EXCLUSIVELY in tests */ isc_thread_yield(); - LOCK(&manager->posthalt_lock); + LOCK(&manager->halt_lock); } + manager->pause_requested = true; - LOCK(&manager->prehalt_lock); while (manager->halted < manager->workers) { for (i = 0; i < manager->workers; i++) { BROADCAST(&manager->queues[i].work_available); } - WAIT(&manager->halt_cond, &manager->prehalt_lock); + WAIT(&manager->halt_cond, &manager->halt_lock); } - UNLOCK(&manager->prehalt_lock); + UNLOCK(&manager->halt_lock); } void isc__taskmgr_resume(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; + LOCK(&manager->halt_lock); if (manager->pause_requested) { manager->pause_requested = false; - UNLOCK(&manager->posthalt_lock); + while (manager->halted > 0) { + BROADCAST(&manager->halt_cond); + WAIT(&manager->halt_cond, &manager->halt_lock); + } } + UNLOCK(&manager->halt_lock); } void @@ -1548,17 +1542,16 @@ isc_task_beginexclusive(isc_task_t *task0) { return (ISC_R_LOCKBUSY); } - LOCK(&manager->posthalt_lock); + LOCK(&manager->halt_lock); INSIST(!manager->exclusive_requested && !manager->pause_requested); manager->exclusive_requested = true; - LOCK(&manager->prehalt_lock); while (manager->halted + 1 < manager->workers) { for (i = 0; i < manager->workers; i++) { BROADCAST(&manager->queues[i].work_available); } - WAIT(&manager->halt_cond, &manager->prehalt_lock); + WAIT(&manager->halt_cond, &manager->halt_lock); } - UNLOCK(&manager->prehalt_lock); + UNLOCK(&manager->halt_lock); return (ISC_R_SUCCESS); } @@ -1569,9 +1562,14 @@ isc_task_endexclusive(isc_task_t *task0) { REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); + LOCK(&manager->halt_lock); REQUIRE(manager->exclusive_requested); manager->exclusive_requested = false; - UNLOCK(&manager->posthalt_lock); + while (manager->halted > 0) { + BROADCAST(&manager->halt_cond); + WAIT(&manager->halt_cond, &manager->halt_lock); + } + UNLOCK(&manager->halt_lock); } void From 152c437bb84629a57e0e4d9a33500df99a75c269 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 23 Oct 2018 09:03:31 +0000 Subject: [PATCH 11/18] Allow slight over-quota in'checking lame server clients are dropped at the soft limit' test --- bin/tests/system/fetchlimit/tests.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/tests/system/fetchlimit/tests.sh b/bin/tests/system/fetchlimit/tests.sh index 9250c46253..f62825a25b 100644 --- a/bin/tests/system/fetchlimit/tests.sh +++ b/bin/tests/system/fetchlimit/tests.sh @@ -166,7 +166,7 @@ touch ans4/norespond for try in 1 2 3 4 5; do burst b $try 400 $DIG @10.53.0.3 -p ${PORT} a ${try}.example > dig.out.ns3.$try - stat 360 || exceeded=`expr $exceeded + 1` + stat 370 || exceeded=`expr $exceeded + 1` grep "status: NOERROR" dig.out.ns3.$try > /dev/null 2>&1 && \ success=`expr $success + 1` grep "status: SERVFAIL" dig.out.ns3.$try > /dev/null 2>&1 && \ @@ -177,7 +177,7 @@ echo_i "$success successful valid queries (expected 5)" [ "$success" -eq 5 ] || { echo_i "failed"; ret=1; } echo_i "$fail SERVFAIL responses (expected 0)" [ "$fail" -eq 0 ] || { echo_i "failed"; ret=1; } -echo_i "clients count exceeded 360 on $exceeded trials (expected 0)" +echo_i "clients count exceeded 370 on $exceeded trials (expected 0)" [ "$exceeded" -eq 0 ] || { echo_i "failed"; ret=1; } if [ $ret != 0 ]; then echo_i "failed"; fi status=`expr $status + $ret` From 8fb5bc783f836eff2cb90e5356aad059c1d66c3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 23 Oct 2018 09:39:56 +0000 Subject: [PATCH 12/18] Comment about taskmgr exclusive mode, fix a REQUIRE. --- lib/isc/task.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 2d01c9288c..87ece27ec0 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1015,6 +1015,16 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "halting")); + /* + * Switching to exclusive mode is done as a 2-phase-lock, + * checking if we have to switch is done without any locks + * on pause_requested and exclusive_requested to save time - + * the worst thing that can happen is that we'll launch one task + * more and exclusive task will be postponed a bit. + * + * Broadcasting on halt_cond seems suboptimal, but exclusive tasks + * are rare enought that we don't care. + */ LOCK(&manager->halt_lock); manager->halted++; BROADCAST(&manager->halt_cond); @@ -1532,11 +1542,8 @@ isc_task_beginexclusive(isc_task_t *task0) { REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); - -/* - * TODO REQUIRE(task == task->manager->excl); - * it should be here, it fails on shutdown server->task - */ + REQUIRE(task == task->manager->excl || + (task->manager->exiting && task->manager->excl == NULL)); if (manager->exclusive_requested || manager->pause_requested) { return (ISC_R_LOCKBUSY); From 025c74adee0cf2ace500e3b41a30ef26f5cbc06f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 25 Oct 2018 06:27:24 +0000 Subject: [PATCH 13/18] Use proper memory ordering for tasks_running/tasks_ready --- lib/isc/task.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 87ece27ec0..55418590bf 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -915,7 +915,8 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { if ((task->flags & TASK_F_PRIVILEGED) != 0) ENQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); - atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed); + atomic_fetch_add_explicit(&manager->tasks_ready, 1, + memory_order_acquire); } static void @@ -1051,13 +1052,15 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { INSIST(VALID_TASK(task)); /* - * Note we only unlock the manager lock if we actually - * have a task to do. We must reacquire the manager + * Note we only unlock the queue lock if we actually + * have a task to do. We must reacquire the queue * lock before exiting the 'if (task != NULL)' block. */ UNLOCK(&manager->queues[threadid].lock); - atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed); - atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed); + RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_ready, + 1, memory_order_release) > 0); + atomic_fetch_add_explicit(&manager->tasks_running, 1, + memory_order_acquire); LOCK(&task->lock); INSIST(task->state == task_state_ready); @@ -1171,7 +1174,8 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { if (finished) task_finished(task); - atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed); + RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_running, + 1, memory_order_release) > 0); LOCK(&manager->queues[threadid].lock); if (requeue) { /* @@ -1203,7 +1207,10 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (atomic_load(&manager->tasks_running) == 0 && manager->mode != isc_taskmgrmode_normal) { + if (atomic_load_explicit(&manager->tasks_running, + memory_order_acquire) == 0 && + manager->mode != isc_taskmgrmode_normal) + { bool empty = true; for (unsigned i=0; iworkers && empty; i++) { if (i != threadid) { From f166cabcae9c11d34bf269dd45435a6c13df6900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 25 Oct 2018 06:31:53 +0000 Subject: [PATCH 14/18] Document isc_task_sendto properly, make sure that cpu we're sending to is always sane --- lib/isc/include/isc/task.h | 6 ++++-- lib/isc/task.c | 26 +++++++++++++------------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/lib/isc/include/isc/task.h b/lib/isc/include/isc/task.h index d3146f51a7..4ee783aedc 100644 --- a/lib/isc/include/isc/task.h +++ b/lib/isc/include/isc/task.h @@ -212,7 +212,8 @@ isc_task_send(isc_task_t *task, isc_event_t **eventp); void isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c); /*%< - * Send '*event' to 'task'. + * Send '*event' to 'task', if task is idle try starting it on cpu 'c' + * If 'c' is smaller than 0 then cpu is selected randomly. * * Requires: * @@ -231,7 +232,8 @@ void isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp); /*%< * Send '*event' to '*taskp' and then detach '*taskp' from its - * task. + * task. If task is idle try starting it on cpu 'c' + * If 'c' is smaller than 0 then cpu is selected randomly. * * Requires: * diff --git a/lib/isc/task.c b/lib/isc/task.c index 55418590bf..e2124dd53c 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -481,15 +481,14 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { */ REQUIRE(VALID_TASK(task)); - - if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed) - % task->manager->workers; - } - XTRACE("isc_task_send"); + if (c < 0) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed); + } + c %= task->manager->workers; + /* * We're trying hard to hold locks for as short a time as possible. * We're also trying to hold as few locks as possible. This is why @@ -532,13 +531,14 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { REQUIRE(taskp != NULL); task = (isc__task_t *)*taskp; REQUIRE(VALID_TASK(task)); - if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, - memory_order_relaxed) - % task->manager->workers; - } - XTRACE("isc_task_sendanddetach"); + + if (c < 0) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed); + } + c %= task->manager->workers; + LOCK(&task->lock); idle1 = task_send(task, eventp, c); idle2 = task_detach(task); From b3827319e01f7b4ab4b2a2241121ba34228bba46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 25 Oct 2018 12:41:59 +0000 Subject: [PATCH 15/18] Switch from privileged to un-privileged mode under lock --- lib/isc/task.c | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index e2124dd53c..3862f14c53 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -1211,22 +1211,34 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { memory_order_acquire) == 0 && manager->mode != isc_taskmgrmode_normal) { - bool empty = true; - for (unsigned i=0; iworkers && empty; i++) { - if (i != threadid) { + UNLOCK(&manager->queues[threadid].lock); + LOCK(&manager->lock); + /* + * Check once again, under lock. Mode can only + * change from privileged to normal anyway, and + * if we enter this loop twice at the same time + * we'll end up in a deadlock over queue locks. + * + */ + if (atomic_load_explicit(&manager->tasks_running, + memory_order_acquire) == 0 && + manager->mode != isc_taskmgrmode_normal) + { + bool empty = true; + for (unsigned i=0; iworkers && empty; i++) { LOCK(&manager->queues[i].lock); - } - empty &= empty_readyq(manager, i); - if (i != threadid) { + empty &= empty_readyq(manager, i); UNLOCK(&manager->queues[i].lock); } - } - if (empty) { - manager->mode = isc_taskmgrmode_normal; - for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->queues[i].work_available); + if (empty) { + manager->mode = isc_taskmgrmode_normal; + for (unsigned i=0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } } } + UNLOCK(&manager->lock); + LOCK(&manager->queues[threadid].lock); } } UNLOCK(&manager->queues[threadid].lock); From 460c8038c13b31ee5d53498f8d78cf5651a52e1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Thu, 25 Oct 2018 15:01:25 +0000 Subject: [PATCH 16/18] Use a single wake_all_queues() function to wake all queues --- lib/isc/task.c | 58 +++++++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/lib/isc/task.c b/lib/isc/task.c index 3862f14c53..6e5b9a031e 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -196,10 +196,22 @@ pop_readyq(isc__taskmgr_t *manager, int c); static inline void push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c); +static inline void +wake_all_queues(isc__taskmgr_t *manager); + /*** *** Tasks. ***/ +static inline void +wake_all_queues(isc__taskmgr_t *manager) { + for (unsigned i=0; i < manager->workers; i++) { + LOCK(&manager->queues[i].lock); + BROADCAST(&manager->queues[i].work_available); + UNLOCK(&manager->queues[i].lock); + } +} + static void task_finished(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; @@ -221,9 +233,7 @@ task_finished(isc__task_t *task) { * any idle worker threads so they * can exit. */ - for (unsigned int i=0; iworkers; i++) { - BROADCAST(&manager->queues[i].work_available); - } + wake_all_queues(manager); } DESTROYLOCK(&task->lock); task->common.impmagic = 0; @@ -1207,9 +1217,9 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0 && - manager->mode != isc_taskmgrmode_normal) + if (manager->mode != isc_taskmgrmode_normal && + atomic_load_explicit(&manager->tasks_running, + memory_order_acquire) == 0) { UNLOCK(&manager->queues[threadid].lock); LOCK(&manager->lock); @@ -1220,21 +1230,21 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * we'll end up in a deadlock over queue locks. * */ - if (atomic_load_explicit(&manager->tasks_running, - memory_order_acquire) == 0 && - manager->mode != isc_taskmgrmode_normal) + if (manager->mode != isc_taskmgrmode_normal && + atomic_load_explicit(&manager->tasks_running, + memory_order_acquire) == 0) { bool empty = true; - for (unsigned i=0; iworkers && empty; i++) { + unsigned int i; + for (i=0; iworkers && empty; i++) + { LOCK(&manager->queues[i].lock); empty &= empty_readyq(manager, i); UNLOCK(&manager->queues[i].lock); } if (empty) { manager->mode = isc_taskmgrmode_normal; - for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->queues[i].work_available); - } + wake_all_queues(manager); } } UNLOCK(&manager->lock); @@ -1246,11 +1256,7 @@ dispatch(isc__taskmgr_t *manager, unsigned int threadid) { * There might be other dispatchers waiting on empty tasks, * wake them up. */ - for (unsigned i=0; i < manager->workers; i++) { - LOCK(&manager->queues[i].lock); - BROADCAST(&manager->queues[i].work_available); - UNLOCK(&manager->queues[i].lock); - } + wake_all_queues(manager); } static isc_threadresult_t @@ -1446,11 +1452,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * there's work left to do, and if there are already no tasks left * it will cause the workers to see manager->exiting. */ - for (i = 0; i < manager->workers; i++) { - LOCK(&manager->queues[i].lock); - BROADCAST(&manager->queues[i].work_available); - UNLOCK(&manager->queues[i].lock); - } + wake_all_queues(manager); UNLOCK(&manager->lock); /* @@ -1486,7 +1488,6 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) { void isc__taskmgr_pause(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; - unsigned int i; LOCK(&manager->halt_lock); while (manager->exclusive_requested || manager->pause_requested) { @@ -1498,9 +1499,7 @@ isc__taskmgr_pause(isc_taskmgr_t *manager0) { manager->pause_requested = true; while (manager->halted < manager->workers) { - for (i = 0; i < manager->workers; i++) { - BROADCAST(&manager->queues[i].work_available); - } + wake_all_queues(manager); WAIT(&manager->halt_cond, &manager->halt_lock); } UNLOCK(&manager->halt_lock); @@ -1556,7 +1555,6 @@ isc_result_t isc_task_beginexclusive(isc_task_t *task0) { isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; - unsigned int i; REQUIRE(VALID_TASK(task)); @@ -1572,9 +1570,7 @@ isc_task_beginexclusive(isc_task_t *task0) { INSIST(!manager->exclusive_requested && !manager->pause_requested); manager->exclusive_requested = true; while (manager->halted + 1 < manager->workers) { - for (i = 0; i < manager->workers; i++) { - BROADCAST(&manager->queues[i].work_available); - } + wake_all_queues(manager); WAIT(&manager->halt_cond, &manager->halt_lock); } UNLOCK(&manager->halt_lock); From 9a903789ed258ce721f440158834215c4f3955f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 30 Oct 2018 15:07:25 +0000 Subject: [PATCH 17/18] Use larger quantum for network tasks --- lib/dns/client.c | 2 +- lib/dns/dispatch.c | 4 ++-- lib/ns/client.c | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/dns/client.c b/lib/dns/client.c index c70841cf34..eea0ed8ace 100644 --- a/lib/dns/client.c +++ b/lib/dns/client.c @@ -461,7 +461,7 @@ dns_client_createx(isc_mem_t *mctx, isc_appctx_t *actx, client->timermgr = timermgr; client->task = NULL; - result = isc_task_create(client->taskmgr, 0, &client->task); + result = isc_task_create(client->taskmgr, 50, &client->task); if (result != ISC_R_SUCCESS) goto cleanup; diff --git a/lib/dns/dispatch.c b/lib/dns/dispatch.c index 623be924f4..9c440d370e 100644 --- a/lib/dns/dispatch.c +++ b/lib/dns/dispatch.c @@ -2503,7 +2503,7 @@ dns_dispatch_createtcp(dns_dispatchmgr_t *mgr, isc_socket_t *sock, disp->ntasks = 1; disp->task[0] = NULL; - result = isc_task_create(taskmgr, 0, &disp->task[0]); + result = isc_task_create(taskmgr, 50, &disp->task[0]); if (result != ISC_R_SUCCESS) goto kill_socket; @@ -2941,7 +2941,7 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr, disp->ntasks = 1; for (i = 0; i < disp->ntasks; i++) { disp->task[i] = NULL; - result = isc_task_create(taskmgr, 0, &disp->task[i]); + result = isc_task_create(taskmgr, 50, &disp->task[i]); if (result != ISC_R_SUCCESS) { while (--i >= 0) { isc_task_shutdown(disp->task[i]); diff --git a/lib/ns/client.c b/lib/ns/client.c index 7b789047e6..7df2336c1b 100644 --- a/lib/ns/client.c +++ b/lib/ns/client.c @@ -2960,7 +2960,7 @@ client_create(ns_clientmgr_t *manager, ns_client_t **clientp) { ns_server_attach(manager->sctx, &client->sctx); client->task = NULL; - result = isc_task_create(manager->taskmgr, 0, &client->task); + result = isc_task_create(manager->taskmgr, 50, &client->task); if (result != ISC_R_SUCCESS) goto cleanup_client; isc_task_setname(client->task, "client", client); From b673f509d23f670a4b56db7daed660a761eca049 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Witold=20Kr=C4=99cicki?= Date: Tue, 6 Nov 2018 08:17:33 +0000 Subject: [PATCH 18/18] CHANGES note --- CHANGES | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES b/CHANGES index 255fd9c81b..d44ef6711e 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,6 @@ +5081. [func] Use per-worker queues in task manager, make task + runners CPU-affine. [GL #659] + 5080. [func] Improvements to "rndc nta" user interface: - catch and report invalid command line options - when removing an NTA from all views, do not