mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-30 05:57:52 +00:00
Merge branch 'wpk-multiple-task-queues' into 'master'
Multiple task queues See merge request isc-projects/bind9!878
This commit is contained in:
commit
319c166a50
3
CHANGES
3
CHANGES
@ -1,3 +1,6 @@
|
||||
5081. [func] Use per-worker queues in task manager, make task
|
||||
runners CPU-affine. [GL #659]
|
||||
|
||||
5080. [func] Improvements to "rndc nta" user interface:
|
||||
- catch and report invalid command line options
|
||||
- when removing an NTA from all views, do not
|
||||
|
@ -9327,8 +9327,7 @@ load_zones(named_server_t *server, bool init, bool reconfig) {
|
||||
* the initial server setup; it isn't necessary during
|
||||
* a reload.)
|
||||
*/
|
||||
isc_taskmgr_setmode(named_g_taskmgr,
|
||||
isc_taskmgrmode_privileged);
|
||||
isc_taskmgr_setprivilegedmode(named_g_taskmgr);
|
||||
}
|
||||
|
||||
isc_task_endexclusive(server->task);
|
||||
|
@ -166,7 +166,7 @@ touch ans4/norespond
|
||||
for try in 1 2 3 4 5; do
|
||||
burst b $try 400
|
||||
$DIG @10.53.0.3 -p ${PORT} a ${try}.example > dig.out.ns3.$try
|
||||
stat 360 || exceeded=`expr $exceeded + 1`
|
||||
stat 370 || exceeded=`expr $exceeded + 1`
|
||||
grep "status: NOERROR" dig.out.ns3.$try > /dev/null 2>&1 && \
|
||||
success=`expr $success + 1`
|
||||
grep "status: SERVFAIL" dig.out.ns3.$try > /dev/null 2>&1 && \
|
||||
@ -177,7 +177,7 @@ echo_i "$success successful valid queries (expected 5)"
|
||||
[ "$success" -eq 5 ] || { echo_i "failed"; ret=1; }
|
||||
echo_i "$fail SERVFAIL responses (expected 0)"
|
||||
[ "$fail" -eq 0 ] || { echo_i "failed"; ret=1; }
|
||||
echo_i "clients count exceeded 360 on $exceeded trials (expected 0)"
|
||||
echo_i "clients count exceeded 370 on $exceeded trials (expected 0)"
|
||||
[ "$exceeded" -eq 0 ] || { echo_i "failed"; ret=1; }
|
||||
if [ $ret != 0 ]; then echo_i "failed"; fi
|
||||
status=`expr $status + $ret`
|
||||
|
21
config.h.in
21
config.h.in
@ -75,6 +75,9 @@
|
||||
/* Define to 1 if you have the <cmocka.h> header file. */
|
||||
#undef HAVE_CMOCKA_H
|
||||
|
||||
/* Define to 1 if you have the `cpuset_setaffinty' function. */
|
||||
#undef HAVE_CPUSET_SETAFFINITY
|
||||
|
||||
/* Define to 1 if you have the `CRYPTO_zalloc' function. */
|
||||
#undef HAVE_CRYPTO_ZALLOC
|
||||
|
||||
@ -285,6 +288,9 @@
|
||||
/* define if OpenSSL supports Ed25519 */
|
||||
#undef HAVE_OPENSSL_ED25519
|
||||
|
||||
/* Define to 1 if you have the `processor_bind' function. */
|
||||
#undef HAVE_PROCESSOR_BIND
|
||||
|
||||
/* Define if you have POSIX threads libraries and header files. */
|
||||
#undef HAVE_PTHREAD
|
||||
|
||||
@ -303,6 +309,9 @@
|
||||
/* Have PTHREAD_PRIO_INHERIT. */
|
||||
#undef HAVE_PTHREAD_PRIO_INHERIT
|
||||
|
||||
/* Define to 1 if you have the `pthread_setaffinity_np' function. */
|
||||
#undef HAVE_PTHREAD_SETAFFINITY_NP
|
||||
|
||||
/* Define to 1 if you have the `pthread_setname_np' function. */
|
||||
#undef HAVE_PTHREAD_SETNAME_NP
|
||||
|
||||
@ -333,6 +342,9 @@
|
||||
/* Define to 1 if you have the <sched.h> header file. */
|
||||
#undef HAVE_SCHED_H
|
||||
|
||||
/* Define to 1 if you have the `sched_setaffinity' function. */
|
||||
#undef HAVE_SCHED_SETAFFINITY
|
||||
|
||||
/* Define to 1 if you have the `sched_yield' function. */
|
||||
#undef HAVE_SCHED_YIELD
|
||||
|
||||
@ -387,6 +399,9 @@
|
||||
/* Define to 1 if you have the <sys/capability.h> header file. */
|
||||
#undef HAVE_SYS_CAPABILITY_H
|
||||
|
||||
/* Define to 1 if you have the <sys/cpuset.h> header file. */
|
||||
#undef HAVE_SYS_CPUSET_H
|
||||
|
||||
/* Define to 1 if you have the <sys/devpoll.h> header file. */
|
||||
#undef HAVE_SYS_DEVPOLL_H
|
||||
|
||||
@ -396,6 +411,12 @@
|
||||
/* Define to 1 if you have the <sys/param.h> header file. */
|
||||
#undef HAVE_SYS_PARAM_H
|
||||
|
||||
/* Define to 1 if you have the <sys/prctl.h> header file. */
|
||||
#undef HAVE_SYS_PRCTL_H
|
||||
|
||||
/* Define to 1 if you have the <sys/procset.h> header file. */
|
||||
#undef HAVE_SYS_PROCSET_H
|
||||
|
||||
/* Define to 1 if you have the <sys/select.h> header file. */
|
||||
#undef HAVE_SYS_SELECT_H
|
||||
|
||||
|
51
configure
vendored
51
configure
vendored
@ -844,6 +844,7 @@ infodir
|
||||
docdir
|
||||
oldincludedir
|
||||
includedir
|
||||
runstatedir
|
||||
localstatedir
|
||||
sharedstatedir
|
||||
sysconfdir
|
||||
@ -1003,6 +1004,7 @@ datadir='${datarootdir}'
|
||||
sysconfdir='${prefix}/etc'
|
||||
sharedstatedir='${prefix}/com'
|
||||
localstatedir='${prefix}/var'
|
||||
runstatedir='${localstatedir}/run'
|
||||
includedir='${prefix}/include'
|
||||
oldincludedir='/usr/include'
|
||||
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
|
||||
@ -1255,6 +1257,15 @@ do
|
||||
| -silent | --silent | --silen | --sile | --sil)
|
||||
silent=yes ;;
|
||||
|
||||
-runstatedir | --runstatedir | --runstatedi | --runstated \
|
||||
| --runstate | --runstat | --runsta | --runst | --runs \
|
||||
| --run | --ru | --r)
|
||||
ac_prev=runstatedir ;;
|
||||
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
|
||||
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
|
||||
| --run=* | --ru=* | --r=*)
|
||||
runstatedir=$ac_optarg ;;
|
||||
|
||||
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
|
||||
ac_prev=sbindir ;;
|
||||
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
|
||||
@ -1392,7 +1403,7 @@ fi
|
||||
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
|
||||
datadir sysconfdir sharedstatedir localstatedir includedir \
|
||||
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
|
||||
libdir localedir mandir
|
||||
libdir localedir mandir runstatedir
|
||||
do
|
||||
eval ac_val=\$$ac_var
|
||||
# Remove trailing slashes.
|
||||
@ -1545,6 +1556,7 @@ Fine tuning of the installation directories:
|
||||
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
|
||||
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
|
||||
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
|
||||
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
|
||||
--libdir=DIR object code libraries [EPREFIX/lib]
|
||||
--includedir=DIR C header files [PREFIX/include]
|
||||
--oldincludedir=DIR C header files for non-gcc [/usr/include]
|
||||
@ -15235,6 +15247,43 @@ fi
|
||||
done
|
||||
|
||||
|
||||
for ac_header in sys/cpuset.h
|
||||
do :
|
||||
ac_fn_c_check_header_mongrel "$LINENO" "sys/cpuset.h" "ac_cv_header_sys_cpuset_h" "$ac_includes_default"
|
||||
if test "x$ac_cv_header_sys_cpuset_h" = xyes; then :
|
||||
cat >>confdefs.h <<_ACEOF
|
||||
#define HAVE_SYS_CPUSET_H 1
|
||||
_ACEOF
|
||||
|
||||
fi
|
||||
|
||||
done
|
||||
|
||||
for ac_header in sys/procset.h
|
||||
do :
|
||||
ac_fn_c_check_header_mongrel "$LINENO" "sys/procset.h" "ac_cv_header_sys_procset_h" "$ac_includes_default"
|
||||
if test "x$ac_cv_header_sys_procset_h" = xyes; then :
|
||||
cat >>confdefs.h <<_ACEOF
|
||||
#define HAVE_SYS_PROCSET_H 1
|
||||
_ACEOF
|
||||
|
||||
fi
|
||||
|
||||
done
|
||||
|
||||
for ac_func in pthread_setaffinity_np cpuset_setaffinity processor_bind sched_setaffinity
|
||||
do :
|
||||
as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh`
|
||||
ac_fn_c_check_func "$LINENO" "$ac_func" "$as_ac_var"
|
||||
if eval test \"x\$"$as_ac_var"\" = x"yes"; then :
|
||||
cat >>confdefs.h <<_ACEOF
|
||||
#define `$as_echo "HAVE_$ac_func" | $as_tr_cpp` 1
|
||||
_ACEOF
|
||||
|
||||
fi
|
||||
done
|
||||
|
||||
|
||||
# Look for functions relating to thread naming
|
||||
for ac_func in pthread_setname_np pthread_set_name_np
|
||||
do :
|
||||
|
@ -701,6 +701,10 @@ AC_CHECK_HEADERS([sched.h])
|
||||
AC_SEARCH_LIBS([sched_yield],[rt])
|
||||
AC_CHECK_FUNCS([sched_yield pthread_yield pthread_yield_np])
|
||||
|
||||
AC_CHECK_HEADERS([sys/cpuset.h])
|
||||
AC_CHECK_HEADERS([sys/procset.h])
|
||||
AC_CHECK_FUNCS([pthread_setaffinity_np cpuset_setaffinity processor_bind sched_setaffinity])
|
||||
|
||||
# Look for functions relating to thread naming
|
||||
AC_CHECK_FUNCS([pthread_setname_np pthread_set_name_np])
|
||||
AC_CHECK_HEADERS([pthread_np.h], [], [], [#include <pthread.h>])
|
||||
|
@ -461,7 +461,7 @@ dns_client_createx(isc_mem_t *mctx, isc_appctx_t *actx,
|
||||
client->timermgr = timermgr;
|
||||
|
||||
client->task = NULL;
|
||||
result = isc_task_create(client->taskmgr, 0, &client->task);
|
||||
result = isc_task_create(client->taskmgr, 50, &client->task);
|
||||
if (result != ISC_R_SUCCESS)
|
||||
goto cleanup;
|
||||
|
||||
|
@ -2503,7 +2503,7 @@ dns_dispatch_createtcp(dns_dispatchmgr_t *mgr, isc_socket_t *sock,
|
||||
|
||||
disp->ntasks = 1;
|
||||
disp->task[0] = NULL;
|
||||
result = isc_task_create(taskmgr, 0, &disp->task[0]);
|
||||
result = isc_task_create(taskmgr, 50, &disp->task[0]);
|
||||
if (result != ISC_R_SUCCESS)
|
||||
goto kill_socket;
|
||||
|
||||
@ -2941,7 +2941,7 @@ dispatch_createudp(dns_dispatchmgr_t *mgr, isc_socketmgr_t *sockmgr,
|
||||
disp->ntasks = 1;
|
||||
for (i = 0; i < disp->ntasks; i++) {
|
||||
disp->task[i] = NULL;
|
||||
result = isc_task_create(taskmgr, 0, &disp->task[i]);
|
||||
result = isc_task_create(taskmgr, 50, &disp->task[i]);
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
while (--i >= 0) {
|
||||
isc_task_shutdown(disp->task[i]);
|
||||
|
@ -208,8 +208,12 @@ isc_task_detach(isc_task_t **taskp);
|
||||
|
||||
void
|
||||
isc_task_send(isc_task_t *task, isc_event_t **eventp);
|
||||
|
||||
void
|
||||
isc_task_sendto(isc_task_t *task, isc_event_t **eventp, int c);
|
||||
/*%<
|
||||
* Send '*event' to 'task'.
|
||||
* Send '*event' to 'task', if task is idle try starting it on cpu 'c'
|
||||
* If 'c' is smaller than 0 then cpu is selected randomly.
|
||||
*
|
||||
* Requires:
|
||||
*
|
||||
@ -221,11 +225,15 @@ isc_task_send(isc_task_t *task, isc_event_t **eventp);
|
||||
*\li *eventp == NULL.
|
||||
*/
|
||||
|
||||
void
|
||||
isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c);
|
||||
|
||||
void
|
||||
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp);
|
||||
/*%<
|
||||
* Send '*event' to '*taskp' and then detach '*taskp' from its
|
||||
* task.
|
||||
* task. If task is idle try starting it on cpu 'c'
|
||||
* If 'c' is smaller than 0 then cpu is selected randomly.
|
||||
*
|
||||
* Requires:
|
||||
*
|
||||
@ -666,7 +674,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
*/
|
||||
|
||||
void
|
||||
isc_taskmgr_setmode(isc_taskmgr_t *manager, isc_taskmgrmode_t mode);
|
||||
isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager);
|
||||
|
||||
isc_taskmgrmode_t
|
||||
isc_taskmgr_mode(isc_taskmgr_t *manager);
|
||||
|
@ -44,6 +44,9 @@ isc_thread_yield(void);
|
||||
void
|
||||
isc_thread_setname(isc_thread_t thread, const char *name);
|
||||
|
||||
isc_result_t
|
||||
isc_thread_setaffinity(int cpu);
|
||||
|
||||
/* XXX We could do fancier error handling... */
|
||||
|
||||
#define isc_thread_join(t, rp) \
|
||||
|
@ -18,6 +18,17 @@
|
||||
#include <sched.h>
|
||||
#endif
|
||||
|
||||
#if defined(HAVE_CPUSET_H)
|
||||
#include <sys/param.h>
|
||||
#include <sys/cpuset.h>
|
||||
#endif
|
||||
|
||||
#if defined(HAVE_SYS_PROCESET_H)
|
||||
#include <sys/types.h>
|
||||
#include <sys/processor.h>
|
||||
#include <sys/procset.h>
|
||||
#endif
|
||||
|
||||
#include <isc/thread.h>
|
||||
#include <isc/util.h>
|
||||
|
||||
@ -91,3 +102,31 @@ isc_thread_yield(void) {
|
||||
pthread_yield_np();
|
||||
#endif
|
||||
}
|
||||
|
||||
isc_result_t
|
||||
isc_thread_setaffinity(int cpu) {
|
||||
#if defined(HAVE_CPUSET_SETAFFINITY)
|
||||
cpuset_t cpuset;
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(cpu, &cpuset);
|
||||
if (cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1,
|
||||
&cpuset, sizeof(cpuset)) != 0) {
|
||||
return (ISC_R_FAILURE);
|
||||
}
|
||||
#elif defined(HAVE_PTHREAD_SETAFFINITY_NP)
|
||||
cpu_set_t set;
|
||||
CPU_ZERO(&set);
|
||||
CPU_SET(cpu, &set);
|
||||
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t),
|
||||
&set) != 0) {
|
||||
return (ISC_R_FAILURE);
|
||||
}
|
||||
#elif defined(HAVE_PROCESSOR_BIND)
|
||||
if (processor_bind(P_LWPID, P_MYID, cpu, NULL) != 0) {
|
||||
return (ISC_R_FAILURE);
|
||||
}
|
||||
#else
|
||||
UNUSED(cpu);
|
||||
#endif
|
||||
return (ISC_R_SUCCESS);
|
||||
}
|
||||
|
505
lib/isc/task.c
505
lib/isc/task.c
@ -21,6 +21,7 @@
|
||||
#include <stdbool.h>
|
||||
|
||||
#include <isc/app.h>
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/condition.h>
|
||||
#include <isc/event.h>
|
||||
#include <isc/json.h>
|
||||
@ -31,6 +32,7 @@
|
||||
#include <isc/platform.h>
|
||||
#include <isc/print.h>
|
||||
#include <isc/string.h>
|
||||
#include <isc/random.h>
|
||||
#include <isc/task.h>
|
||||
#include <isc/thread.h>
|
||||
#include <isc/time.h>
|
||||
@ -41,18 +43,14 @@
|
||||
#include <openssl/err.h>
|
||||
#endif
|
||||
|
||||
/*%
|
||||
* For BIND9 internal applications:
|
||||
* when built with threads we use multiple worker threads shared by the whole
|
||||
* application.
|
||||
* when built without threads we share a single global task manager and use
|
||||
* an integrated event loop for socket, timer, and other generic task events.
|
||||
* For generic library:
|
||||
* we don't use either of them: an application can have multiple task managers
|
||||
* whether or not it's threaded, and if the application is threaded each thread
|
||||
* is expected to have a separate manager; no "worker threads" are shared by
|
||||
* the application threads.
|
||||
/*
|
||||
* Task manager is built around 'as little locking as possible' concept.
|
||||
* Each thread has his own queue of tasks to be run, if a task is in running
|
||||
* state it will stay on the runner it's currently on, if a task is in idle
|
||||
* state it can be woken up on a specific runner with isc_task_sendto - that
|
||||
* helps with data locality on CPU.
|
||||
*/
|
||||
|
||||
#ifdef ISC_TASK_TRACE
|
||||
#define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \
|
||||
task, isc_thread_self(), (m))
|
||||
@ -86,6 +84,7 @@ static const char *statenames[] = {
|
||||
|
||||
typedef struct isc__task isc__task_t;
|
||||
typedef struct isc__taskmgr isc__taskmgr_t;
|
||||
typedef struct isc__taskqueue isc__taskqueue_t;
|
||||
|
||||
struct isc__task {
|
||||
/* Not locked. */
|
||||
@ -104,6 +103,7 @@ struct isc__task {
|
||||
isc_time_t tnow;
|
||||
char name[16];
|
||||
void * tag;
|
||||
unsigned int threadid;
|
||||
/* Locked by task manager lock. */
|
||||
LINK(isc__task_t) link;
|
||||
LINK(isc__task_t) ready_link;
|
||||
@ -121,27 +121,40 @@ struct isc__task {
|
||||
|
||||
typedef ISC_LIST(isc__task_t) isc__tasklist_t;
|
||||
|
||||
struct isc__taskqueue {
|
||||
/* Everything locked by lock */
|
||||
isc_mutex_t lock;
|
||||
isc__tasklist_t ready_tasks;
|
||||
isc__tasklist_t ready_priority_tasks;
|
||||
isc_condition_t work_available;
|
||||
isc_thread_t thread;
|
||||
unsigned int threadid;
|
||||
isc__taskmgr_t *manager;
|
||||
};
|
||||
|
||||
struct isc__taskmgr {
|
||||
/* Not locked. */
|
||||
isc_taskmgr_t common;
|
||||
isc_mem_t * mctx;
|
||||
isc_mutex_t lock;
|
||||
isc_mutex_t halt_lock;
|
||||
isc_condition_t halt_cond;
|
||||
unsigned int workers;
|
||||
isc_thread_t * threads;
|
||||
atomic_uint_fast32_t tasks_running;
|
||||
atomic_uint_fast32_t tasks_ready;
|
||||
atomic_uint_fast32_t curq;
|
||||
isc__taskqueue_t *queues;
|
||||
|
||||
/* Locked by task manager lock. */
|
||||
unsigned int default_quantum;
|
||||
LIST(isc__task_t) tasks;
|
||||
isc__tasklist_t ready_tasks;
|
||||
isc__tasklist_t ready_priority_tasks;
|
||||
isc_taskmgrmode_t mode;
|
||||
isc_condition_t work_available;
|
||||
isc_condition_t exclusive_granted;
|
||||
isc_condition_t paused;
|
||||
unsigned int tasks_running;
|
||||
unsigned int tasks_ready;
|
||||
bool pause_requested;
|
||||
bool exclusive_requested;
|
||||
bool exiting;
|
||||
bool pause_requested;
|
||||
bool exclusive_requested;
|
||||
bool exiting;
|
||||
|
||||
/* Locked by halt_lock */
|
||||
unsigned int halted;
|
||||
|
||||
/*
|
||||
* Multiple threads can read/write 'excl' at the same time, so we need
|
||||
@ -175,22 +188,33 @@ isc_taskmgr_setexcltask(isc_taskmgr_t *mgr0, isc_task_t *task0);
|
||||
isc_result_t
|
||||
isc_taskmgr_excltask(isc_taskmgr_t *mgr0, isc_task_t **taskp);
|
||||
static inline bool
|
||||
empty_readyq(isc__taskmgr_t *manager);
|
||||
empty_readyq(isc__taskmgr_t *manager, int c);
|
||||
|
||||
static inline isc__task_t *
|
||||
pop_readyq(isc__taskmgr_t *manager);
|
||||
pop_readyq(isc__taskmgr_t *manager, int c);
|
||||
|
||||
static inline void
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task);
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c);
|
||||
|
||||
static inline void
|
||||
wake_all_queues(isc__taskmgr_t *manager);
|
||||
|
||||
/***
|
||||
*** Tasks.
|
||||
***/
|
||||
|
||||
static inline void
|
||||
wake_all_queues(isc__taskmgr_t *manager) {
|
||||
for (unsigned i=0; i < manager->workers; i++) {
|
||||
LOCK(&manager->queues[i].lock);
|
||||
BROADCAST(&manager->queues[i].work_available);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
task_finished(isc__task_t *task) {
|
||||
isc__taskmgr_t *manager = task->manager;
|
||||
|
||||
REQUIRE(EMPTY(task->events));
|
||||
REQUIRE(task->nevents == 0);
|
||||
REQUIRE(EMPTY(task->on_shutdown));
|
||||
@ -201,6 +225,7 @@ task_finished(isc__task_t *task) {
|
||||
|
||||
LOCK(&manager->lock);
|
||||
UNLINK(manager->tasks, task, link);
|
||||
UNLOCK(&manager->lock);
|
||||
if (FINISHED(manager)) {
|
||||
/*
|
||||
* All tasks have completed and the
|
||||
@ -208,10 +233,8 @@ task_finished(isc__task_t *task) {
|
||||
* any idle worker threads so they
|
||||
* can exit.
|
||||
*/
|
||||
BROADCAST(&manager->work_available);
|
||||
wake_all_queues(manager);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
DESTROYLOCK(&task->lock);
|
||||
task->common.impmagic = 0;
|
||||
task->common.magic = 0;
|
||||
@ -235,6 +258,9 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
|
||||
return (ISC_R_NOMEMORY);
|
||||
XTRACE("isc_task_create");
|
||||
task->manager = manager;
|
||||
task->threadid = atomic_fetch_add_explicit(&manager->curq, 1,
|
||||
memory_order_relaxed)
|
||||
% manager->workers;
|
||||
result = isc_mutex_init(&task->lock);
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
isc_mem_put(manager->mctx, task, sizeof(*task));
|
||||
@ -351,12 +377,11 @@ task_ready(isc__task_t *task) {
|
||||
REQUIRE(task->state == task_state_ready);
|
||||
|
||||
XTRACE("task_ready");
|
||||
|
||||
LOCK(&manager->lock);
|
||||
push_readyq(manager, task);
|
||||
LOCK(&manager->queues[task->threadid].lock);
|
||||
push_readyq(manager, task, task->threadid);
|
||||
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
|
||||
SIGNAL(&manager->work_available);
|
||||
UNLOCK(&manager->lock);
|
||||
SIGNAL(&manager->queues[task->threadid].work_available);
|
||||
UNLOCK(&manager->queues[task->threadid].lock);
|
||||
}
|
||||
|
||||
static inline bool
|
||||
@ -414,7 +439,7 @@ isc_task_detach(isc_task_t **taskp) {
|
||||
}
|
||||
|
||||
static inline bool
|
||||
task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
task_send(isc__task_t *task, isc_event_t **eventp, int c) {
|
||||
bool was_idle = false;
|
||||
isc_event_t *event;
|
||||
|
||||
@ -433,6 +458,7 @@ task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
|
||||
if (task->state == task_state_idle) {
|
||||
was_idle = true;
|
||||
task->threadid = c;
|
||||
INSIST(EMPTY(task->events));
|
||||
task->state = task_state_ready;
|
||||
}
|
||||
@ -447,6 +473,16 @@ task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
|
||||
void
|
||||
isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
isc_task_sendto(task0, eventp, -1);
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
isc_task_sendtoanddetach(taskp, eventp, -1);
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
||||
isc__task_t *task = (isc__task_t *)task0;
|
||||
bool was_idle;
|
||||
|
||||
@ -455,16 +491,21 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
*/
|
||||
|
||||
REQUIRE(VALID_TASK(task));
|
||||
|
||||
XTRACE("isc_task_send");
|
||||
|
||||
if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
|
||||
/*
|
||||
* We're trying hard to hold locks for as short a time as possible.
|
||||
* We're also trying to hold as few locks as possible. This is why
|
||||
* some processing is deferred until after the lock is released.
|
||||
*/
|
||||
LOCK(&task->lock);
|
||||
was_idle = task_send(task, eventp);
|
||||
was_idle = task_send(task, eventp, c);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
if (was_idle) {
|
||||
@ -488,7 +529,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
|
||||
bool idle1, idle2;
|
||||
isc__task_t *task;
|
||||
|
||||
@ -500,11 +541,16 @@ isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
REQUIRE(taskp != NULL);
|
||||
task = (isc__task_t *)*taskp;
|
||||
REQUIRE(VALID_TASK(task));
|
||||
|
||||
XTRACE("isc_task_sendanddetach");
|
||||
|
||||
if (c < 0) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||
memory_order_relaxed);
|
||||
}
|
||||
c %= task->manager->workers;
|
||||
|
||||
LOCK(&task->lock);
|
||||
idle1 = task_send(task, eventp);
|
||||
idle1 = task_send(task, eventp, c);
|
||||
idle2 = task_detach(task);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
@ -829,13 +875,13 @@ isc_task_getcurrenttimex(isc_task_t *task0, isc_time_t *t) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline bool
|
||||
empty_readyq(isc__taskmgr_t *manager) {
|
||||
empty_readyq(isc__taskmgr_t *manager, int c) {
|
||||
isc__tasklist_t queue;
|
||||
|
||||
if (manager->mode == isc_taskmgrmode_normal)
|
||||
queue = manager->ready_tasks;
|
||||
queue = manager->queues[c].ready_tasks;
|
||||
else
|
||||
queue = manager->ready_priority_tasks;
|
||||
queue = manager->queues[c].ready_priority_tasks;
|
||||
|
||||
return (EMPTY(queue));
|
||||
}
|
||||
@ -849,18 +895,18 @@ empty_readyq(isc__taskmgr_t *manager) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline isc__task_t *
|
||||
pop_readyq(isc__taskmgr_t *manager) {
|
||||
pop_readyq(isc__taskmgr_t *manager, int c) {
|
||||
isc__task_t *task;
|
||||
|
||||
if (manager->mode == isc_taskmgrmode_normal)
|
||||
task = HEAD(manager->ready_tasks);
|
||||
task = HEAD(manager->queues[c].ready_tasks);
|
||||
else
|
||||
task = HEAD(manager->ready_priority_tasks);
|
||||
task = HEAD(manager->queues[c].ready_priority_tasks);
|
||||
|
||||
if (task != NULL) {
|
||||
DEQUEUE(manager->ready_tasks, task, ready_link);
|
||||
DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
|
||||
if (ISC_LINK_LINKED(task, ready_priority_link))
|
||||
DEQUEUE(manager->ready_priority_tasks, task,
|
||||
DEQUEUE(manager->queues[c].ready_priority_tasks, task,
|
||||
ready_priority_link);
|
||||
}
|
||||
|
||||
@ -874,20 +920,25 @@ pop_readyq(isc__taskmgr_t *manager) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline void
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task) {
|
||||
ENQUEUE(manager->ready_tasks, task, ready_link);
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
||||
ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
|
||||
if ((task->flags & TASK_F_PRIVILEGED) != 0)
|
||||
ENQUEUE(manager->ready_priority_tasks, task,
|
||||
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
|
||||
ready_priority_link);
|
||||
manager->tasks_ready++;
|
||||
atomic_fetch_add_explicit(&manager->tasks_ready, 1,
|
||||
memory_order_acquire);
|
||||
}
|
||||
|
||||
static void
|
||||
dispatch(isc__taskmgr_t *manager) {
|
||||
dispatch(isc__taskmgr_t *manager, unsigned int threadid) {
|
||||
isc__task_t *task;
|
||||
|
||||
REQUIRE(VALID_MANAGER(manager));
|
||||
|
||||
/* Wait for everything to initialize */
|
||||
LOCK(&manager->lock);
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
/*
|
||||
* Again we're trying to hold the lock for as short a time as possible
|
||||
* and to do as little locking and unlocking as possible.
|
||||
@ -937,8 +988,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* For N iterations of the loop, this code does N+1 locks and N+1
|
||||
* unlocks. The while expression is always protected by the lock.
|
||||
*/
|
||||
|
||||
LOCK(&manager->lock);
|
||||
LOCK(&manager->queues[threadid].lock);
|
||||
|
||||
while (!FINISHED(manager)) {
|
||||
/*
|
||||
@ -951,13 +1001,19 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* If a pause has been requested, don't do any work
|
||||
* until it's been released.
|
||||
*/
|
||||
while ((empty_readyq(manager) || manager->pause_requested ||
|
||||
manager->exclusive_requested) && !FINISHED(manager))
|
||||
while ((empty_readyq(manager, threadid) && !manager->pause_requested &&
|
||||
!manager->exclusive_requested) && !FINISHED(manager))
|
||||
{
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, "wait"));
|
||||
WAIT(&manager->work_available, &manager->lock);
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused"));
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
|
||||
WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock);
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_TASK,
|
||||
ISC_MSG_AWAKE, "awake"));
|
||||
@ -965,7 +1021,37 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
||||
ISC_MSG_WORKING, "working"));
|
||||
|
||||
task = pop_readyq(manager);
|
||||
if (manager->pause_requested || manager->exclusive_requested) {
|
||||
UNLOCK(&manager->queues[threadid].lock);
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
||||
ISC_MSG_WORKING, "halting"));
|
||||
|
||||
/*
|
||||
* Switching to exclusive mode is done as a 2-phase-lock,
|
||||
* checking if we have to switch is done without any locks
|
||||
* on pause_requested and exclusive_requested to save time -
|
||||
* the worst thing that can happen is that we'll launch one task
|
||||
* more and exclusive task will be postponed a bit.
|
||||
*
|
||||
* Broadcasting on halt_cond seems suboptimal, but exclusive tasks
|
||||
* are rare enought that we don't care.
|
||||
*/
|
||||
LOCK(&manager->halt_lock);
|
||||
manager->halted++;
|
||||
BROADCAST(&manager->halt_cond);
|
||||
while (manager->pause_requested || manager->exclusive_requested) {
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
manager->halted--;
|
||||
SIGNAL(&manager->halt_cond);
|
||||
UNLOCK(&manager->halt_lock);
|
||||
|
||||
LOCK(&manager->queues[threadid].lock);
|
||||
/* Restart the loop after */
|
||||
continue;
|
||||
}
|
||||
|
||||
task = pop_readyq(manager, threadid);
|
||||
if (task != NULL) {
|
||||
unsigned int dispatch_count = 0;
|
||||
bool done = false;
|
||||
@ -976,19 +1062,22 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
INSIST(VALID_TASK(task));
|
||||
|
||||
/*
|
||||
* Note we only unlock the manager lock if we actually
|
||||
* have a task to do. We must reacquire the manager
|
||||
* Note we only unlock the queue lock if we actually
|
||||
* have a task to do. We must reacquire the queue
|
||||
* lock before exiting the 'if (task != NULL)' block.
|
||||
*/
|
||||
manager->tasks_ready--;
|
||||
manager->tasks_running++;
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(&manager->queues[threadid].lock);
|
||||
RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_ready,
|
||||
1, memory_order_release) > 0);
|
||||
atomic_fetch_add_explicit(&manager->tasks_running, 1,
|
||||
memory_order_acquire);
|
||||
|
||||
LOCK(&task->lock);
|
||||
INSIST(task->state == task_state_ready);
|
||||
task->state = task_state_running;
|
||||
XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_RUNNING, "running"));
|
||||
XTRACE(task->name);
|
||||
TIME_NOW(&task->tnow);
|
||||
task->now = isc_time_seconds(&task->tnow);
|
||||
do {
|
||||
@ -1004,6 +1093,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
ISC_MSGSET_TASK,
|
||||
ISC_MSG_EXECUTE,
|
||||
"execute action"));
|
||||
XTRACE(task->name);
|
||||
if (event->ev_action != NULL) {
|
||||
UNLOCK(&task->lock);
|
||||
(event->ev_action)(
|
||||
@ -1094,15 +1184,9 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
if (finished)
|
||||
task_finished(task);
|
||||
|
||||
LOCK(&manager->lock);
|
||||
manager->tasks_running--;
|
||||
if (manager->exclusive_requested &&
|
||||
manager->tasks_running == 1) {
|
||||
SIGNAL(&manager->exclusive_granted);
|
||||
} else if (manager->pause_requested &&
|
||||
manager->tasks_running == 0) {
|
||||
SIGNAL(&manager->paused);
|
||||
}
|
||||
RUNTIME_CHECK(atomic_fetch_sub_explicit(&manager->tasks_running,
|
||||
1, memory_order_release) > 0);
|
||||
LOCK(&manager->queues[threadid].lock);
|
||||
if (requeue) {
|
||||
/*
|
||||
* We know we're awake, so we don't have
|
||||
@ -1123,7 +1207,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* were usually nonempty, the 'optimization'
|
||||
* might even hurt rather than help.
|
||||
*/
|
||||
push_readyq(manager, task);
|
||||
push_readyq(manager, task, threadid);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1133,27 +1217,62 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* we're stuck. Automatically drop privileges at that
|
||||
* point and continue with the regular ready queue.
|
||||
*/
|
||||
if (manager->tasks_running == 0 && empty_readyq(manager)) {
|
||||
manager->mode = isc_taskmgrmode_normal;
|
||||
if (!empty_readyq(manager))
|
||||
BROADCAST(&manager->work_available);
|
||||
if (manager->mode != isc_taskmgrmode_normal &&
|
||||
atomic_load_explicit(&manager->tasks_running,
|
||||
memory_order_acquire) == 0)
|
||||
{
|
||||
UNLOCK(&manager->queues[threadid].lock);
|
||||
LOCK(&manager->lock);
|
||||
/*
|
||||
* Check once again, under lock. Mode can only
|
||||
* change from privileged to normal anyway, and
|
||||
* if we enter this loop twice at the same time
|
||||
* we'll end up in a deadlock over queue locks.
|
||||
*
|
||||
*/
|
||||
if (manager->mode != isc_taskmgrmode_normal &&
|
||||
atomic_load_explicit(&manager->tasks_running,
|
||||
memory_order_acquire) == 0)
|
||||
{
|
||||
bool empty = true;
|
||||
unsigned int i;
|
||||
for (i=0; i<manager->workers && empty; i++)
|
||||
{
|
||||
LOCK(&manager->queues[i].lock);
|
||||
empty &= empty_readyq(manager, i);
|
||||
UNLOCK(&manager->queues[i].lock);
|
||||
}
|
||||
if (empty) {
|
||||
manager->mode = isc_taskmgrmode_normal;
|
||||
wake_all_queues(manager);
|
||||
}
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
LOCK(&manager->queues[threadid].lock);
|
||||
}
|
||||
}
|
||||
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(&manager->queues[threadid].lock);
|
||||
/*
|
||||
* There might be other dispatchers waiting on empty tasks,
|
||||
* wake them up.
|
||||
*/
|
||||
wake_all_queues(manager);
|
||||
}
|
||||
|
||||
static isc_threadresult_t
|
||||
#ifdef _WIN32
|
||||
WINAPI
|
||||
#endif
|
||||
run(void *uap) {
|
||||
isc__taskmgr_t *manager = uap;
|
||||
run(void *queuep) {
|
||||
isc__taskqueue_t *tq = queuep;
|
||||
isc__taskmgr_t *manager = tq->manager;
|
||||
int threadid = tq->threadid;
|
||||
isc_thread_setaffinity(threadid);
|
||||
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_STARTING, "starting"));
|
||||
|
||||
dispatch(manager);
|
||||
dispatch(manager, threadid);
|
||||
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_EXITING, "exiting"));
|
||||
@ -1167,27 +1286,23 @@ run(void *uap) {
|
||||
|
||||
static void
|
||||
manager_free(isc__taskmgr_t *manager) {
|
||||
isc_mem_t *mctx;
|
||||
|
||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
||||
(void)isc_condition_destroy(&manager->work_available);
|
||||
(void)isc_condition_destroy(&manager->paused);
|
||||
isc_mem_free(manager->mctx, manager->threads);
|
||||
for (unsigned int i=0; i < manager->workers; i++) {
|
||||
DESTROYLOCK(&manager->queues[i].lock);
|
||||
}
|
||||
DESTROYLOCK(&manager->lock);
|
||||
DESTROYLOCK(&manager->excl_lock);
|
||||
DESTROYLOCK(&manager->halt_lock);
|
||||
isc_mem_put(manager->mctx, manager->queues,
|
||||
manager->workers * sizeof(isc__taskqueue_t));
|
||||
manager->common.impmagic = 0;
|
||||
manager->common.magic = 0;
|
||||
mctx = manager->mctx;
|
||||
isc_mem_put(mctx, manager, sizeof(*manager));
|
||||
isc_mem_detach(&mctx);
|
||||
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
|
||||
}
|
||||
|
||||
isc_result_t
|
||||
isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
unsigned int default_quantum, isc_taskmgr_t **managerp)
|
||||
{
|
||||
isc_result_t result;
|
||||
unsigned int i, started = 0;
|
||||
unsigned int i;
|
||||
isc__taskmgr_t *manager;
|
||||
|
||||
/*
|
||||
@ -1198,64 +1313,37 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
REQUIRE(managerp != NULL && *managerp == NULL);
|
||||
|
||||
manager = isc_mem_get(mctx, sizeof(*manager));
|
||||
if (manager == NULL)
|
||||
return (ISC_R_NOMEMORY);
|
||||
RUNTIME_CHECK(manager != NULL);
|
||||
manager->common.impmagic = TASK_MANAGER_MAGIC;
|
||||
manager->common.magic = ISCAPI_TASKMGR_MAGIC;
|
||||
manager->mode = isc_taskmgrmode_normal;
|
||||
manager->mctx = NULL;
|
||||
result = isc_mutex_init(&manager->lock);
|
||||
if (result != ISC_R_SUCCESS)
|
||||
goto cleanup_mgr;
|
||||
result = isc_mutex_init(&manager->excl_lock);
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
DESTROYLOCK(&manager->lock);
|
||||
goto cleanup_mgr;
|
||||
}
|
||||
RUNTIME_CHECK(isc_mutex_init(&manager->lock) == ISC_R_SUCCESS);
|
||||
RUNTIME_CHECK(isc_mutex_init(&manager->excl_lock) == ISC_R_SUCCESS);
|
||||
|
||||
RUNTIME_CHECK(isc_mutex_init(&manager->halt_lock)
|
||||
== ISC_R_SUCCESS);
|
||||
RUNTIME_CHECK(isc_condition_init(&manager->halt_cond)
|
||||
== ISC_R_SUCCESS);
|
||||
|
||||
manager->workers = workers;
|
||||
|
||||
manager->workers = 0;
|
||||
manager->threads = isc_mem_allocate(mctx,
|
||||
workers * sizeof(isc_thread_t));
|
||||
if (manager->threads == NULL) {
|
||||
result = ISC_R_NOMEMORY;
|
||||
goto cleanup_lock;
|
||||
}
|
||||
if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_FAILED, "failed"));
|
||||
result = ISC_R_UNEXPECTED;
|
||||
goto cleanup_threads;
|
||||
}
|
||||
if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_FAILED, "failed"));
|
||||
result = ISC_R_UNEXPECTED;
|
||||
goto cleanup_workavailable;
|
||||
}
|
||||
if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_FAILED, "failed"));
|
||||
result = ISC_R_UNEXPECTED;
|
||||
goto cleanup_exclusivegranted;
|
||||
}
|
||||
if (default_quantum == 0)
|
||||
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
||||
manager->default_quantum = default_quantum;
|
||||
INIT_LIST(manager->tasks);
|
||||
INIT_LIST(manager->ready_tasks);
|
||||
INIT_LIST(manager->ready_priority_tasks);
|
||||
manager->queues = isc_mem_get(mctx, workers *
|
||||
sizeof(isc__taskqueue_t));
|
||||
RUNTIME_CHECK(manager->queues != NULL);
|
||||
|
||||
manager->tasks_running = 0;
|
||||
manager->tasks_ready = 0;
|
||||
manager->exclusive_requested = false;
|
||||
manager->pause_requested = false;
|
||||
manager->curq = 0;
|
||||
manager->exiting = false;
|
||||
manager->excl = NULL;
|
||||
manager->halted = 0;
|
||||
manager->exclusive_requested = false;
|
||||
manager->pause_requested = false;
|
||||
|
||||
isc_mem_attach(mctx, &manager->mctx);
|
||||
|
||||
@ -1264,40 +1352,29 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
* Start workers.
|
||||
*/
|
||||
for (i = 0; i < workers; i++) {
|
||||
if (isc_thread_create(run, manager,
|
||||
&manager->threads[manager->workers]) ==
|
||||
ISC_R_SUCCESS) {
|
||||
char name[16]; /* thread name limit on Linux */
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
isc_thread_setname(manager->threads[manager->workers],
|
||||
name);
|
||||
manager->workers++;
|
||||
started++;
|
||||
}
|
||||
INIT_LIST(manager->queues[i].ready_tasks);
|
||||
INIT_LIST(manager->queues[i].ready_priority_tasks);
|
||||
RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock)
|
||||
== ISC_R_SUCCESS);
|
||||
RUNTIME_CHECK(isc_condition_init(
|
||||
&manager->queues[i].work_available)
|
||||
== ISC_R_SUCCESS);
|
||||
manager->queues[i].manager = manager;
|
||||
manager->queues[i].threadid = i;
|
||||
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
|
||||
&manager->queues[i].thread)
|
||||
== ISC_R_SUCCESS);
|
||||
char name[16];
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
isc_thread_setname(manager->queues[i].thread, name);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
if (started == 0) {
|
||||
manager_free(manager);
|
||||
return (ISC_R_NOTHREADS);
|
||||
}
|
||||
isc_thread_setconcurrency(workers);
|
||||
|
||||
*managerp = (isc_taskmgr_t *)manager;
|
||||
|
||||
return (ISC_R_SUCCESS);
|
||||
|
||||
cleanup_exclusivegranted:
|
||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
||||
cleanup_workavailable:
|
||||
(void)isc_condition_destroy(&manager->work_available);
|
||||
cleanup_threads:
|
||||
isc_mem_free(mctx, manager->threads);
|
||||
cleanup_lock:
|
||||
DESTROYLOCK(&manager->lock);
|
||||
cleanup_mgr:
|
||||
isc_mem_put(mctx, manager, sizeof(*manager));
|
||||
return (result);
|
||||
}
|
||||
|
||||
void
|
||||
@ -1355,29 +1432,34 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
|
||||
/*
|
||||
* Post shutdown event(s) to every task (if they haven't already been
|
||||
* posted).
|
||||
* posted). To make things easier post idle tasks to worker 0.
|
||||
*/
|
||||
LOCK(&manager->queues[0].lock);
|
||||
for (task = HEAD(manager->tasks);
|
||||
task != NULL;
|
||||
task = NEXT(task, link)) {
|
||||
LOCK(&task->lock);
|
||||
if (task_shutdown(task))
|
||||
push_readyq(manager, task);
|
||||
if (task_shutdown(task)) {
|
||||
task->threadid = 0;
|
||||
push_readyq(manager, task, 0);
|
||||
}
|
||||
UNLOCK(&task->lock);
|
||||
}
|
||||
UNLOCK(&manager->queues[0].lock);
|
||||
|
||||
/*
|
||||
* Wake up any sleeping workers. This ensures we get work done if
|
||||
* there's work left to do, and if there are already no tasks left
|
||||
* it will cause the workers to see manager->exiting.
|
||||
*/
|
||||
BROADCAST(&manager->work_available);
|
||||
wake_all_queues(manager);
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
/*
|
||||
* Wait for all the worker threads to exit.
|
||||
*/
|
||||
for (i = 0; i < manager->workers; i++)
|
||||
(void)isc_thread_join(manager->threads[i], NULL);
|
||||
(void)isc_thread_join(manager->queues[i].thread, NULL);
|
||||
|
||||
manager_free(manager);
|
||||
|
||||
@ -1385,11 +1467,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
}
|
||||
|
||||
void
|
||||
isc_taskmgr_setmode(isc_taskmgr_t *manager0, isc_taskmgrmode_t mode) {
|
||||
isc_taskmgr_setprivilegedmode(isc_taskmgr_t *manager0) {
|
||||
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
||||
|
||||
LOCK(&manager->lock);
|
||||
manager->mode = mode;
|
||||
manager->mode = isc_taskmgrmode_privileged;
|
||||
UNLOCK(&manager->lock);
|
||||
}
|
||||
|
||||
@ -1406,24 +1488,35 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) {
|
||||
void
|
||||
isc__taskmgr_pause(isc_taskmgr_t *manager0) {
|
||||
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
||||
manager->pause_requested = true;
|
||||
LOCK(&manager->lock);
|
||||
while (manager->tasks_running > 0) {
|
||||
WAIT(&manager->paused, &manager->lock);
|
||||
|
||||
LOCK(&manager->halt_lock);
|
||||
while (manager->exclusive_requested || manager->pause_requested) {
|
||||
UNLOCK(&manager->halt_lock);
|
||||
/* This is ugly but pause is used EXCLUSIVELY in tests */
|
||||
isc_thread_yield();
|
||||
LOCK(&manager->halt_lock);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
manager->pause_requested = true;
|
||||
while (manager->halted < manager->workers) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
UNLOCK(&manager->halt_lock);
|
||||
}
|
||||
|
||||
void
|
||||
isc__taskmgr_resume(isc_taskmgr_t *manager0) {
|
||||
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
||||
|
||||
LOCK(&manager->lock);
|
||||
LOCK(&manager->halt_lock);
|
||||
if (manager->pause_requested) {
|
||||
manager->pause_requested = false;
|
||||
BROADCAST(&manager->work_available);
|
||||
while (manager->halted > 0) {
|
||||
BROADCAST(&manager->halt_cond);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(&manager->halt_lock);
|
||||
}
|
||||
|
||||
void
|
||||
@ -1462,24 +1555,25 @@ isc_result_t
|
||||
isc_task_beginexclusive(isc_task_t *task0) {
|
||||
isc__task_t *task = (isc__task_t *)task0;
|
||||
isc__taskmgr_t *manager = task->manager;
|
||||
|
||||
REQUIRE(VALID_TASK(task));
|
||||
|
||||
REQUIRE(task->state == task_state_running);
|
||||
/*
|
||||
* TODO REQUIRE(task == task->manager->excl);
|
||||
* it should be here, it fails on shutdown server->task
|
||||
*/
|
||||
REQUIRE(task == task->manager->excl ||
|
||||
(task->manager->exiting && task->manager->excl == NULL));
|
||||
|
||||
LOCK(&manager->lock);
|
||||
if (manager->exclusive_requested) {
|
||||
UNLOCK(&manager->lock);
|
||||
if (manager->exclusive_requested || manager->pause_requested) {
|
||||
return (ISC_R_LOCKBUSY);
|
||||
}
|
||||
|
||||
LOCK(&manager->halt_lock);
|
||||
INSIST(!manager->exclusive_requested && !manager->pause_requested);
|
||||
manager->exclusive_requested = true;
|
||||
while (manager->tasks_running > 1) {
|
||||
WAIT(&manager->exclusive_granted, &manager->lock);
|
||||
while (manager->halted + 1 < manager->workers) {
|
||||
wake_all_queues(manager);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(&manager->halt_lock);
|
||||
return (ISC_R_SUCCESS);
|
||||
}
|
||||
|
||||
@ -1490,11 +1584,14 @@ isc_task_endexclusive(isc_task_t *task0) {
|
||||
|
||||
REQUIRE(VALID_TASK(task));
|
||||
REQUIRE(task->state == task_state_running);
|
||||
LOCK(&manager->lock);
|
||||
LOCK(&manager->halt_lock);
|
||||
REQUIRE(manager->exclusive_requested);
|
||||
manager->exclusive_requested = false;
|
||||
BROADCAST(&manager->work_available);
|
||||
UNLOCK(&manager->lock);
|
||||
while (manager->halted > 0) {
|
||||
BROADCAST(&manager->halt_cond);
|
||||
WAIT(&manager->halt_cond, &manager->halt_lock);
|
||||
}
|
||||
UNLOCK(&manager->halt_lock);
|
||||
}
|
||||
|
||||
void
|
||||
@ -1515,14 +1612,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
|
||||
if (priv == oldpriv)
|
||||
return;
|
||||
|
||||
LOCK(&manager->lock);
|
||||
LOCK(&manager->queues[task->threadid].lock);
|
||||
if (priv && ISC_LINK_LINKED(task, ready_link))
|
||||
ENQUEUE(manager->ready_priority_tasks, task,
|
||||
ready_priority_link);
|
||||
ENQUEUE(manager->queues[task->threadid].ready_priority_tasks,
|
||||
task, ready_priority_link);
|
||||
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
|
||||
DEQUEUE(manager->ready_priority_tasks, task,
|
||||
ready_priority_link);
|
||||
UNLOCK(&manager->lock);
|
||||
DEQUEUE(manager->queues[task->threadid].ready_priority_tasks,
|
||||
task, ready_priority_link);
|
||||
UNLOCK(&manager->queues[task->threadid].lock);
|
||||
}
|
||||
|
||||
bool
|
||||
@ -1575,11 +1672,13 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) {
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
||||
(int) mgr->tasks_running));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_ready));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
||||
(int) mgr->tasks_ready));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
|
||||
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
|
||||
|
@ -67,7 +67,6 @@ set_and_drop(isc_task_t *task, isc_event_t *event) {
|
||||
*value = (int) isc_taskmgr_mode(taskmgr);
|
||||
counter++;
|
||||
UNLOCK(&lock);
|
||||
isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_normal);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -230,7 +229,7 @@ ATF_TC_BODY(privileged_events, tc) {
|
||||
isc_task_send(task2, &event);
|
||||
|
||||
ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
|
||||
isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_privileged);
|
||||
isc_taskmgr_setprivilegedmode(taskmgr);
|
||||
ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged);
|
||||
|
||||
isc__taskmgr_resume(taskmgr);
|
||||
@ -351,7 +350,7 @@ ATF_TC_BODY(privilege_drop, tc) {
|
||||
isc_task_send(task2, &event);
|
||||
|
||||
ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_normal);
|
||||
isc_taskmgr_setmode(taskmgr, isc_taskmgrmode_privileged);
|
||||
isc_taskmgr_setprivilegedmode(taskmgr);
|
||||
ATF_CHECK_EQ(isc_taskmgr_mode(taskmgr), isc_taskmgrmode_privileged);
|
||||
|
||||
isc__taskmgr_resume(taskmgr);
|
||||
@ -363,14 +362,12 @@ ATF_TC_BODY(privilege_drop, tc) {
|
||||
}
|
||||
|
||||
/*
|
||||
* We can't guarantee what order the events fire, but
|
||||
* we do know *exactly one* of the privileged tasks will
|
||||
* have run in privileged mode...
|
||||
* We need to check that all privilege mode events were fired
|
||||
* in privileged mode, and non privileged in non-privileged.
|
||||
*/
|
||||
ATF_CHECK(a == isc_taskmgrmode_privileged ||
|
||||
c == isc_taskmgrmode_privileged ||
|
||||
ATF_CHECK(a == isc_taskmgrmode_privileged &&
|
||||
c == isc_taskmgrmode_privileged &&
|
||||
d == isc_taskmgrmode_privileged);
|
||||
ATF_CHECK(a + c + d == isc_taskmgrmode_privileged);
|
||||
|
||||
/* ...and neither of the non-privileged tasks did... */
|
||||
ATF_CHECK(b == isc_taskmgrmode_normal || e == isc_taskmgrmode_normal);
|
||||
|
@ -79,6 +79,9 @@ isc_thread_setconcurrency(unsigned int level);
|
||||
void
|
||||
isc_thread_setname(isc_thread_t, const char *);
|
||||
|
||||
isc_result_t
|
||||
isc_thread_setaffinity(int cpu);
|
||||
|
||||
int
|
||||
isc_thread_key_create(isc_thread_key_t *key, void (*func)(void *));
|
||||
|
||||
|
@ -580,6 +580,8 @@ isc_task_purgeevent
|
||||
isc_task_purgerange
|
||||
isc_task_send
|
||||
isc_task_sendanddetach
|
||||
isc_task_sendto
|
||||
isc_task_sendtoanddetach
|
||||
isc_task_setname
|
||||
isc_task_setprivilege
|
||||
isc_task_shutdown
|
||||
@ -596,7 +598,7 @@ isc_taskmgr_renderjson
|
||||
isc_taskmgr_renderxml
|
||||
@END LIBXML2
|
||||
isc_taskmgr_setexcltask
|
||||
isc_taskmgr_setmode
|
||||
isc_taskmgr_setprivilegedmode
|
||||
isc_taskpool_create
|
||||
isc_taskpool_destroy
|
||||
isc_taskpool_expand
|
||||
@ -609,6 +611,7 @@ isc_thread_key_create
|
||||
isc_thread_key_delete
|
||||
isc_thread_key_getspecific
|
||||
isc_thread_key_setspecific
|
||||
isc_thread_setaffinity
|
||||
isc_thread_setconcurrency
|
||||
isc_thread_setname
|
||||
isc_time_add
|
||||
|
@ -66,6 +66,12 @@ isc_thread_setname(isc_thread_t thread, const char *name) {
|
||||
UNUSED(name);
|
||||
}
|
||||
|
||||
isc_result_t
|
||||
isc_thread_setaffinity(int cpu) {
|
||||
/* no-op on Windows for now */
|
||||
return (ISC_R_SUCCESS);
|
||||
}
|
||||
|
||||
void *
|
||||
isc_thread_key_getspecific(isc_thread_key_t key) {
|
||||
return(TlsGetValue(key));
|
||||
|
@ -2960,7 +2960,7 @@ client_create(ns_clientmgr_t *manager, ns_client_t **clientp) {
|
||||
ns_server_attach(manager->sctx, &client->sctx);
|
||||
|
||||
client->task = NULL;
|
||||
result = isc_task_create(manager->taskmgr, 0, &client->task);
|
||||
result = isc_task_create(manager->taskmgr, 50, &client->task);
|
||||
if (result != ISC_R_SUCCESS)
|
||||
goto cleanup_client;
|
||||
isc_task_setname(client->task, "client", client);
|
||||
|
Loading…
x
Reference in New Issue
Block a user