mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-30 14:07:59 +00:00
Separate structure for each thread/queue; 2-phase-locking for exclusive tasks
This commit is contained in:
348
lib/isc/task.c
348
lib/isc/task.c
@@ -43,18 +43,6 @@
|
|||||||
#include <openssl/err.h>
|
#include <openssl/err.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/*%
|
|
||||||
* For BIND9 internal applications:
|
|
||||||
* when built with threads we use multiple worker threads shared by the whole
|
|
||||||
* application.
|
|
||||||
* when built without threads we share a single global task manager and use
|
|
||||||
* an integrated event loop for socket, timer, and other generic task events.
|
|
||||||
* For generic library:
|
|
||||||
* we don't use either of them: an application can have multiple task managers
|
|
||||||
* whether or not it's threaded, and if the application is threaded each thread
|
|
||||||
* is expected to have a separate manager; no "worker threads" are shared by
|
|
||||||
* the application threads.
|
|
||||||
*/
|
|
||||||
#ifdef ISC_TASK_TRACE
|
#ifdef ISC_TASK_TRACE
|
||||||
#define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \
|
#define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \
|
||||||
task, isc_thread_self(), (m))
|
task, isc_thread_self(), (m))
|
||||||
@@ -88,6 +76,7 @@ static const char *statenames[] = {
|
|||||||
|
|
||||||
typedef struct isc__task isc__task_t;
|
typedef struct isc__task isc__task_t;
|
||||||
typedef struct isc__taskmgr isc__taskmgr_t;
|
typedef struct isc__taskmgr isc__taskmgr_t;
|
||||||
|
typedef struct isc__taskqueue isc__taskqueue_t;
|
||||||
|
|
||||||
struct isc__task {
|
struct isc__task {
|
||||||
/* Not locked. */
|
/* Not locked. */
|
||||||
@@ -124,32 +113,43 @@ struct isc__task {
|
|||||||
|
|
||||||
typedef ISC_LIST(isc__task_t) isc__tasklist_t;
|
typedef ISC_LIST(isc__task_t) isc__tasklist_t;
|
||||||
|
|
||||||
|
struct isc__taskqueue {
|
||||||
|
isc__tasklist_t ready_tasks;
|
||||||
|
isc__tasklist_t ready_priority_tasks;
|
||||||
|
isc_condition_t work_available;
|
||||||
|
isc_mutex_t lock;
|
||||||
|
isc_thread_t thread;
|
||||||
|
unsigned int threadid;
|
||||||
|
isc__taskmgr_t *manager;
|
||||||
|
};
|
||||||
|
|
||||||
struct isc__taskmgr {
|
struct isc__taskmgr {
|
||||||
/* Not locked. */
|
/* Not locked. */
|
||||||
isc_taskmgr_t common;
|
isc_taskmgr_t common;
|
||||||
isc_mem_t * mctx;
|
isc_mem_t * mctx;
|
||||||
isc_mutex_t lock;
|
isc_mutex_t lock;
|
||||||
isc_mutex_t **locks;
|
isc_mutex_t prehalt_lock;
|
||||||
|
isc_mutex_t posthalt_lock;
|
||||||
|
isc_condition_t halt_cond;
|
||||||
|
isc_condition_t halt_avail_cond;
|
||||||
unsigned int workers;
|
unsigned int workers;
|
||||||
unsigned int queues;
|
|
||||||
isc_thread_t * threads;
|
|
||||||
atomic_uint_fast32_t tasks_running;
|
atomic_uint_fast32_t tasks_running;
|
||||||
atomic_uint_fast32_t tasks_ready;
|
atomic_uint_fast32_t tasks_ready;
|
||||||
atomic_uint_fast32_t curq;
|
atomic_uint_fast32_t curq;
|
||||||
|
isc__taskqueue_t *queues;
|
||||||
|
|
||||||
/* Locked by task manager lock. */
|
/* Locked by task manager lock. */
|
||||||
unsigned int default_quantum;
|
unsigned int default_quantum;
|
||||||
LIST(isc__task_t) tasks;
|
LIST(isc__task_t) tasks;
|
||||||
isc__tasklist_t *ready_tasks;
|
|
||||||
isc__tasklist_t *ready_priority_tasks;
|
|
||||||
isc_taskmgrmode_t mode;
|
isc_taskmgrmode_t mode;
|
||||||
isc_condition_t *work_available;
|
|
||||||
isc_condition_t exclusive_granted;
|
|
||||||
isc_condition_t paused;
|
|
||||||
bool pause_requested;
|
bool pause_requested;
|
||||||
bool exclusive_requested;
|
bool exclusive_requested;
|
||||||
bool exiting;
|
bool exiting;
|
||||||
|
|
||||||
|
/* Locked by {pre/post}halt_lock combo */
|
||||||
|
unsigned int halted;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Multiple threads can read/write 'excl' at the same time, so we need
|
* Multiple threads can read/write 'excl' at the same time, so we need
|
||||||
* to protect the access. We can't use 'lock' since isc_task_detach()
|
* to protect the access. We can't use 'lock' since isc_task_detach()
|
||||||
@@ -215,8 +215,8 @@ task_finished(isc__task_t *task) {
|
|||||||
* any idle worker threads so they
|
* any idle worker threads so they
|
||||||
* can exit.
|
* can exit.
|
||||||
*/
|
*/
|
||||||
for (unsigned int i=0; i<manager->queues; i++) {
|
for (unsigned int i=0; i<manager->workers; i++) {
|
||||||
BROADCAST(&manager->work_available[i]);
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DESTROYLOCK(&task->lock);
|
DESTROYLOCK(&task->lock);
|
||||||
@@ -242,7 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
|
|||||||
return (ISC_R_NOMEMORY);
|
return (ISC_R_NOMEMORY);
|
||||||
XTRACE("isc_task_create");
|
XTRACE("isc_task_create");
|
||||||
task->manager = manager;
|
task->manager = manager;
|
||||||
task->threadid = -1;
|
task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers;
|
||||||
result = isc_mutex_init(&task->lock);
|
result = isc_mutex_init(&task->lock);
|
||||||
if (result != ISC_R_SUCCESS) {
|
if (result != ISC_R_SUCCESS) {
|
||||||
isc_mem_put(manager->mctx, task, sizeof(*task));
|
isc_mem_put(manager->mctx, task, sizeof(*task));
|
||||||
@@ -354,17 +354,16 @@ static inline void
|
|||||||
task_ready(isc__task_t *task) {
|
task_ready(isc__task_t *task) {
|
||||||
isc__taskmgr_t *manager = task->manager;
|
isc__taskmgr_t *manager = task->manager;
|
||||||
bool has_privilege = isc_task_privilege((isc_task_t *) task);
|
bool has_privilege = isc_task_privilege((isc_task_t *) task);
|
||||||
int queue = task->threadid % manager->queues;
|
|
||||||
|
|
||||||
REQUIRE(VALID_MANAGER(manager));
|
REQUIRE(VALID_MANAGER(manager));
|
||||||
REQUIRE(task->state == task_state_ready);
|
REQUIRE(task->state == task_state_ready);
|
||||||
|
|
||||||
XTRACE("task_ready");
|
XTRACE("task_ready");
|
||||||
LOCK(manager->locks[queue]);
|
LOCK(&manager->queues[task->threadid].lock);
|
||||||
push_readyq(manager, task, queue);
|
push_readyq(manager, task, task->threadid);
|
||||||
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
|
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
|
||||||
SIGNAL(&manager->work_available[queue]);
|
SIGNAL(&manager->queues[task->threadid].work_available);
|
||||||
UNLOCK(manager->locks[queue]);
|
UNLOCK(&manager->queues[task->threadid].lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool
|
static inline bool
|
||||||
@@ -468,9 +467,6 @@ void
|
|||||||
isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
||||||
isc__task_t *task = (isc__task_t *)task0;
|
isc__task_t *task = (isc__task_t *)task0;
|
||||||
bool was_idle;
|
bool was_idle;
|
||||||
if (c == -1) {
|
|
||||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Send '*event' to 'task'.
|
* Send '*event' to 'task'.
|
||||||
@@ -478,6 +474,12 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
|||||||
|
|
||||||
REQUIRE(VALID_TASK(task));
|
REQUIRE(VALID_TASK(task));
|
||||||
|
|
||||||
|
if (c == -1) {
|
||||||
|
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||||
|
memory_order_relaxed)
|
||||||
|
% task->manager->workers;
|
||||||
|
}
|
||||||
|
|
||||||
XTRACE("isc_task_send");
|
XTRACE("isc_task_send");
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -523,7 +525,9 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
|
|||||||
task = (isc__task_t *)*taskp;
|
task = (isc__task_t *)*taskp;
|
||||||
REQUIRE(VALID_TASK(task));
|
REQUIRE(VALID_TASK(task));
|
||||||
if (c == -1) {
|
if (c == -1) {
|
||||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
|
c = atomic_fetch_add_explicit(&task->manager->curq, 1,
|
||||||
|
memory_order_relaxed)
|
||||||
|
% task->manager->workers;
|
||||||
}
|
}
|
||||||
|
|
||||||
XTRACE("isc_task_sendanddetach");
|
XTRACE("isc_task_sendanddetach");
|
||||||
@@ -857,9 +861,9 @@ empty_readyq(isc__taskmgr_t *manager, int c) {
|
|||||||
isc__tasklist_t queue;
|
isc__tasklist_t queue;
|
||||||
|
|
||||||
if (manager->mode == isc_taskmgrmode_normal)
|
if (manager->mode == isc_taskmgrmode_normal)
|
||||||
queue = manager->ready_tasks[c];
|
queue = manager->queues[c].ready_tasks;
|
||||||
else
|
else
|
||||||
queue = manager->ready_priority_tasks[c];
|
queue = manager->queues[c].ready_priority_tasks;
|
||||||
|
|
||||||
return (EMPTY(queue));
|
return (EMPTY(queue));
|
||||||
}
|
}
|
||||||
@@ -877,14 +881,14 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
|
|||||||
isc__task_t *task;
|
isc__task_t *task;
|
||||||
|
|
||||||
if (manager->mode == isc_taskmgrmode_normal)
|
if (manager->mode == isc_taskmgrmode_normal)
|
||||||
task = HEAD(manager->ready_tasks[c]);
|
task = HEAD(manager->queues[c].ready_tasks);
|
||||||
else
|
else
|
||||||
task = HEAD(manager->ready_priority_tasks[c]);
|
task = HEAD(manager->queues[c].ready_priority_tasks);
|
||||||
|
|
||||||
if (task != NULL) {
|
if (task != NULL) {
|
||||||
DEQUEUE(manager->ready_tasks[c], task, ready_link);
|
DEQUEUE(manager->queues[c].ready_tasks, task, ready_link);
|
||||||
if (ISC_LINK_LINKED(task, ready_priority_link))
|
if (ISC_LINK_LINKED(task, ready_priority_link))
|
||||||
DEQUEUE(manager->ready_priority_tasks[c], task,
|
DEQUEUE(manager->queues[c].ready_priority_tasks, task,
|
||||||
ready_priority_link);
|
ready_priority_link);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -899,9 +903,9 @@ pop_readyq(isc__taskmgr_t *manager, int c) {
|
|||||||
*/
|
*/
|
||||||
static inline void
|
static inline void
|
||||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
||||||
ENQUEUE(manager->ready_tasks[c], task, ready_link);
|
ENQUEUE(manager->queues[c].ready_tasks, task, ready_link);
|
||||||
if ((task->flags & TASK_F_PRIVILEGED) != 0)
|
if ((task->flags & TASK_F_PRIVILEGED) != 0)
|
||||||
ENQUEUE(manager->ready_priority_tasks[c], task,
|
ENQUEUE(manager->queues[c].ready_priority_tasks, task,
|
||||||
ready_priority_link);
|
ready_priority_link);
|
||||||
atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
|
atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
|
||||||
}
|
}
|
||||||
@@ -916,8 +920,6 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
LOCK(&manager->lock);
|
LOCK(&manager->lock);
|
||||||
UNLOCK(&manager->lock);
|
UNLOCK(&manager->lock);
|
||||||
|
|
||||||
int queue = threadid % manager->queues;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Again we're trying to hold the lock for as short a time as possible
|
* Again we're trying to hold the lock for as short a time as possible
|
||||||
* and to do as little locking and unlocking as possible.
|
* and to do as little locking and unlocking as possible.
|
||||||
@@ -967,7 +969,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
* For N iterations of the loop, this code does N+1 locks and N+1
|
* For N iterations of the loop, this code does N+1 locks and N+1
|
||||||
* unlocks. The while expression is always protected by the lock.
|
* unlocks. The while expression is always protected by the lock.
|
||||||
*/
|
*/
|
||||||
LOCK(manager->locks[queue]);
|
LOCK(&manager->queues[threadid].lock);
|
||||||
|
|
||||||
while (!FINISHED(manager)) {
|
while (!FINISHED(manager)) {
|
||||||
/*
|
/*
|
||||||
@@ -980,8 +982,8 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
* If a pause has been requested, don't do any work
|
* If a pause has been requested, don't do any work
|
||||||
* until it's been released.
|
* until it's been released.
|
||||||
*/
|
*/
|
||||||
while ((empty_readyq(manager, queue) || manager->pause_requested ||
|
while ((empty_readyq(manager, threadid) && !manager->pause_requested &&
|
||||||
manager->exclusive_requested) && !FINISHED(manager))
|
!manager->exclusive_requested) && !FINISHED(manager))
|
||||||
{
|
{
|
||||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||||
ISC_MSGSET_GENERAL,
|
ISC_MSGSET_GENERAL,
|
||||||
@@ -992,7 +994,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||||
ISC_MSGSET_GENERAL,
|
ISC_MSGSET_GENERAL,
|
||||||
ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
|
ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
|
||||||
WAIT(&manager->work_available[queue], manager->locks[queue]);
|
WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock);
|
||||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||||
ISC_MSGSET_TASK,
|
ISC_MSGSET_TASK,
|
||||||
ISC_MSG_AWAKE, "awake"));
|
ISC_MSG_AWAKE, "awake"));
|
||||||
@@ -1000,7 +1002,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
||||||
ISC_MSG_WORKING, "working"));
|
ISC_MSG_WORKING, "working"));
|
||||||
|
|
||||||
task = pop_readyq(manager, queue);
|
if (manager->pause_requested || manager->exclusive_requested) {
|
||||||
|
UNLOCK(&manager->queues[threadid].lock);
|
||||||
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
||||||
|
ISC_MSG_WORKING, "halting"));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* First we increase 'halted' and signal the thread
|
||||||
|
* that's waiting on exclusivity/pause. Then we
|
||||||
|
* try locking posthalt lock, which will be locked
|
||||||
|
* by exclusive/pausing thread. It is only unlocked
|
||||||
|
* after exclusivity/pause is done.
|
||||||
|
*/
|
||||||
|
LOCK(&manager->prehalt_lock);
|
||||||
|
manager->halted++;
|
||||||
|
SIGNAL(&manager->halt_cond);
|
||||||
|
UNLOCK(&manager->prehalt_lock);
|
||||||
|
|
||||||
|
LOCK(&manager->posthalt_lock);
|
||||||
|
manager->halted--;
|
||||||
|
UNLOCK(&manager->posthalt_lock);
|
||||||
|
|
||||||
|
LOCK(&manager->queues[threadid].lock);
|
||||||
|
/* Restart the loop after */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
task = pop_readyq(manager, threadid);
|
||||||
if (task != NULL) {
|
if (task != NULL) {
|
||||||
unsigned int dispatch_count = 0;
|
unsigned int dispatch_count = 0;
|
||||||
bool done = false;
|
bool done = false;
|
||||||
@@ -1015,7 +1043,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
* have a task to do. We must reacquire the manager
|
* have a task to do. We must reacquire the manager
|
||||||
* lock before exiting the 'if (task != NULL)' block.
|
* lock before exiting the 'if (task != NULL)' block.
|
||||||
*/
|
*/
|
||||||
UNLOCK(manager->locks[queue]);
|
UNLOCK(&manager->queues[threadid].lock);
|
||||||
atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
|
atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
|
||||||
atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
|
atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
|
||||||
|
|
||||||
@@ -1132,14 +1160,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
task_finished(task);
|
task_finished(task);
|
||||||
|
|
||||||
atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
|
atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
|
||||||
if (manager->exclusive_requested &&
|
LOCK(&manager->queues[threadid].lock);
|
||||||
manager->tasks_running == 1) {
|
|
||||||
SIGNAL(&manager->exclusive_granted);
|
|
||||||
} else if (manager->pause_requested &&
|
|
||||||
manager->tasks_running == 0) {
|
|
||||||
SIGNAL(&manager->paused);
|
|
||||||
}
|
|
||||||
LOCK(manager->locks[queue]);
|
|
||||||
if (requeue) {
|
if (requeue) {
|
||||||
/*
|
/*
|
||||||
* We know we're awake, so we don't have
|
* We know we're awake, so we don't have
|
||||||
@@ -1160,7 +1181,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
* were usually nonempty, the 'optimization'
|
* were usually nonempty, the 'optimization'
|
||||||
* might even hurt rather than help.
|
* might even hurt rather than help.
|
||||||
*/
|
*/
|
||||||
push_readyq(manager, task, queue);
|
push_readyq(manager, task, threadid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1170,39 +1191,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) {
|
|||||||
* we're stuck. Automatically drop privileges at that
|
* we're stuck. Automatically drop privileges at that
|
||||||
* point and continue with the regular ready queue.
|
* point and continue with the regular ready queue.
|
||||||
*/
|
*/
|
||||||
if (manager->tasks_running == 0 && empty_readyq(manager, queue)) {
|
if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) {
|
||||||
manager->mode = isc_taskmgrmode_normal;
|
manager->mode = isc_taskmgrmode_normal;
|
||||||
for (unsigned i=0; i < manager->workers; i++) {
|
for (unsigned i=0; i < manager->workers; i++) {
|
||||||
BROADCAST(&manager->work_available[i]);
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
UNLOCK(manager->locks[queue]);
|
UNLOCK(&manager->queues[threadid].lock);
|
||||||
/*
|
/*
|
||||||
* There might be other dispatchers waiting on empty tasks,
|
* There might be other dispatchers waiting on empty tasks,
|
||||||
* wake them up.
|
* wake them up.
|
||||||
*/
|
*/
|
||||||
for (unsigned i=0; i < manager->workers; i++) {
|
for (unsigned i=0; i < manager->workers; i++) {
|
||||||
LOCK(manager->locks[i]);
|
LOCK(&manager->queues[i].lock);
|
||||||
BROADCAST(&manager->work_available[i]);
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
UNLOCK(manager->locks[i]);
|
UNLOCK(&manager->queues[i].lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct st {
|
|
||||||
isc__taskmgr_t *manager;
|
|
||||||
int threadid;
|
|
||||||
} stt;
|
|
||||||
|
|
||||||
static isc_threadresult_t
|
static isc_threadresult_t
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
WINAPI
|
WINAPI
|
||||||
#endif
|
#endif
|
||||||
run(void *uap) {
|
run(void *queuep) {
|
||||||
stt *st = uap;
|
isc__taskqueue_t *tq = queuep;
|
||||||
isc__taskmgr_t *manager = st->manager;
|
isc__taskmgr_t *manager = tq->manager;
|
||||||
int threadid = st->threadid;
|
int threadid = tq->threadid;
|
||||||
isc_mem_put(manager->mctx, st, sizeof(*st));
|
|
||||||
isc_thread_setaffinity(threadid);
|
isc_thread_setaffinity(threadid);
|
||||||
|
|
||||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||||
@@ -1222,19 +1237,15 @@ run(void *uap) {
|
|||||||
|
|
||||||
static void
|
static void
|
||||||
manager_free(isc__taskmgr_t *manager) {
|
manager_free(isc__taskmgr_t *manager) {
|
||||||
isc_mem_t *mctx;
|
/* TODO */
|
||||||
|
|
||||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
|
||||||
(void)isc_condition_destroy(&manager->work_available[0]);
|
|
||||||
(void)isc_condition_destroy(&manager->paused);
|
|
||||||
isc_mem_free(manager->mctx, manager->threads);
|
|
||||||
DESTROYLOCK(&manager->lock);
|
DESTROYLOCK(&manager->lock);
|
||||||
DESTROYLOCK(&manager->excl_lock);
|
DESTROYLOCK(&manager->prehalt_lock);
|
||||||
|
DESTROYLOCK(&manager->posthalt_lock);
|
||||||
|
isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t));
|
||||||
manager->common.impmagic = 0;
|
manager->common.impmagic = 0;
|
||||||
manager->common.magic = 0;
|
manager->common.magic = 0;
|
||||||
mctx = manager->mctx;
|
isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager));
|
||||||
isc_mem_put(mctx, manager, sizeof(*manager));
|
|
||||||
isc_mem_detach(&mctx);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isc_result_t
|
isc_result_t
|
||||||
@@ -1242,7 +1253,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
|||||||
unsigned int default_quantum, isc_taskmgr_t **managerp)
|
unsigned int default_quantum, isc_taskmgr_t **managerp)
|
||||||
{
|
{
|
||||||
isc_result_t result;
|
isc_result_t result;
|
||||||
unsigned int i, started = 0;
|
unsigned int i;
|
||||||
isc__taskmgr_t *manager;
|
isc__taskmgr_t *manager;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -1268,45 +1279,26 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
|||||||
goto cleanup_mgr;
|
goto cleanup_mgr;
|
||||||
}
|
}
|
||||||
|
|
||||||
manager->workers = 0;
|
RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS);
|
||||||
manager->queues = 0;
|
RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS);
|
||||||
manager->threads = isc_mem_allocate(mctx,
|
|
||||||
workers * sizeof(isc_thread_t));
|
RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS);
|
||||||
if (manager->threads == NULL) {
|
|
||||||
result = ISC_R_NOMEMORY;
|
manager->workers = workers;
|
||||||
goto cleanup_lock;
|
|
||||||
}
|
|
||||||
if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
|
|
||||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
|
||||||
"isc_condition_init() %s",
|
|
||||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
|
||||||
ISC_MSG_FAILED, "failed"));
|
|
||||||
result = ISC_R_UNEXPECTED;
|
|
||||||
goto cleanup_workavailable;
|
|
||||||
}
|
|
||||||
if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) {
|
|
||||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
|
||||||
"isc_condition_init() %s",
|
|
||||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
|
||||||
ISC_MSG_FAILED, "failed"));
|
|
||||||
result = ISC_R_UNEXPECTED;
|
|
||||||
goto cleanup_exclusivegranted;
|
|
||||||
}
|
|
||||||
if (default_quantum == 0)
|
if (default_quantum == 0)
|
||||||
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
||||||
manager->default_quantum = default_quantum;
|
manager->default_quantum = default_quantum;
|
||||||
INIT_LIST(manager->tasks);
|
INIT_LIST(manager->tasks);
|
||||||
manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t));
|
manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t));
|
||||||
manager->locks = malloc(workers * sizeof(isc_mutex_t *));
|
|
||||||
manager->work_available = malloc(workers * sizeof(isc_condition_t));
|
|
||||||
manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t));
|
|
||||||
manager->tasks_running = 0;
|
manager->tasks_running = 0;
|
||||||
manager->tasks_ready = 0;
|
manager->tasks_ready = 0;
|
||||||
manager->exclusive_requested = false;
|
|
||||||
manager->pause_requested = false;
|
|
||||||
manager->curq = 0;
|
manager->curq = 0;
|
||||||
manager->exiting = false;
|
manager->exiting = false;
|
||||||
manager->excl = NULL;
|
manager->excl = NULL;
|
||||||
|
manager->halted = 0;
|
||||||
|
manager->exclusive_requested = false;
|
||||||
|
manager->pause_requested = false;
|
||||||
|
|
||||||
isc_mem_attach(mctx, &manager->mctx);
|
isc_mem_attach(mctx, &manager->mctx);
|
||||||
|
|
||||||
@@ -1315,53 +1307,30 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
|||||||
* Start workers.
|
* Start workers.
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < workers; i++) {
|
for (i = 0; i < workers; i++) {
|
||||||
INIT_LIST(manager->ready_tasks[i]);
|
INIT_LIST(manager->queues[i].ready_tasks);
|
||||||
INIT_LIST(manager->ready_priority_tasks[i]);
|
INIT_LIST(manager->queues[i].ready_priority_tasks);
|
||||||
manager->locks[i] = malloc(4096);
|
RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock)
|
||||||
isc_mutex_init(manager->locks[i]);
|
== ISC_R_SUCCESS);
|
||||||
if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) {
|
RUNTIME_CHECK(isc_condition_init(
|
||||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
&manager->queues[i].work_available)
|
||||||
"isc_condition_init() %s",
|
== ISC_R_SUCCESS);
|
||||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
manager->queues[i].manager = manager;
|
||||||
ISC_MSG_FAILED, "failed"));
|
manager->queues[i].threadid = i;
|
||||||
result = ISC_R_UNEXPECTED;
|
RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i],
|
||||||
goto cleanup_threads;
|
&manager->queues[i].thread)
|
||||||
}
|
== ISC_R_SUCCESS);
|
||||||
stt *st = isc_mem_get(mctx, sizeof(stt));
|
char name[16];
|
||||||
st->manager = manager;
|
|
||||||
st->threadid = i;
|
|
||||||
if (isc_thread_create(run, st,
|
|
||||||
&manager->threads[i]) != ISC_R_SUCCESS) {
|
|
||||||
goto cleanup_threads;
|
|
||||||
}
|
|
||||||
char name[16]; /* thread name limit on Linux */
|
|
||||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||||
isc_thread_setname(manager->threads[manager->workers],
|
isc_thread_setname(manager->queues[i].thread, name);
|
||||||
name);
|
|
||||||
manager->workers++;
|
|
||||||
started++;
|
|
||||||
}
|
}
|
||||||
manager->queues = manager->workers;
|
|
||||||
UNLOCK(&manager->lock);
|
UNLOCK(&manager->lock);
|
||||||
|
|
||||||
if (started == 0) {
|
|
||||||
manager_free(manager);
|
|
||||||
return (ISC_R_NOTHREADS);
|
|
||||||
}
|
|
||||||
isc_thread_setconcurrency(workers);
|
isc_thread_setconcurrency(workers);
|
||||||
|
|
||||||
*managerp = (isc_taskmgr_t *)manager;
|
*managerp = (isc_taskmgr_t *)manager;
|
||||||
|
|
||||||
return (ISC_R_SUCCESS);
|
return (ISC_R_SUCCESS);
|
||||||
|
|
||||||
cleanup_exclusivegranted:
|
|
||||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
|
||||||
cleanup_workavailable:
|
|
||||||
(void)isc_condition_destroy(&manager->work_available[0]);
|
|
||||||
cleanup_threads:
|
|
||||||
isc_mem_free(mctx, manager->threads);
|
|
||||||
cleanup_lock:
|
|
||||||
DESTROYLOCK(&manager->lock);
|
|
||||||
cleanup_mgr:
|
cleanup_mgr:
|
||||||
isc_mem_put(mctx, manager, sizeof(*manager));
|
isc_mem_put(mctx, manager, sizeof(*manager));
|
||||||
return (result);
|
return (result);
|
||||||
@@ -1429,8 +1398,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
|||||||
task = NEXT(task, link)) {
|
task = NEXT(task, link)) {
|
||||||
LOCK(&task->lock);
|
LOCK(&task->lock);
|
||||||
if (task_shutdown(task)) {
|
if (task_shutdown(task)) {
|
||||||
int queue = task->threadid % manager->queues;
|
push_readyq(manager, task, task->threadid);
|
||||||
push_readyq(manager, task, queue);
|
|
||||||
}
|
}
|
||||||
UNLOCK(&task->lock);
|
UNLOCK(&task->lock);
|
||||||
}
|
}
|
||||||
@@ -1439,10 +1407,10 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
|||||||
* there's work left to do, and if there are already no tasks left
|
* there's work left to do, and if there are already no tasks left
|
||||||
* it will cause the workers to see manager->exiting.
|
* it will cause the workers to see manager->exiting.
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < manager->queues; i++) {
|
for (i = 0; i < manager->workers; i++) {
|
||||||
LOCK(manager->locks[i]);
|
LOCK(&manager->queues[i].lock);
|
||||||
BROADCAST(&manager->work_available[i]);
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
UNLOCK(manager->locks[i]);
|
UNLOCK(&manager->queues[i].lock);
|
||||||
}
|
}
|
||||||
UNLOCK(&manager->lock);
|
UNLOCK(&manager->lock);
|
||||||
|
|
||||||
@@ -1450,7 +1418,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
|||||||
* Wait for all the worker threads to exit.
|
* Wait for all the worker threads to exit.
|
||||||
*/
|
*/
|
||||||
for (i = 0; i < manager->workers; i++)
|
for (i = 0; i < manager->workers; i++)
|
||||||
(void)isc_thread_join(manager->threads[i], NULL);
|
(void)isc_thread_join(manager->queues[i].thread, NULL);
|
||||||
|
|
||||||
manager_free(manager);
|
manager_free(manager);
|
||||||
|
|
||||||
@@ -1479,24 +1447,33 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) {
|
|||||||
void
|
void
|
||||||
isc__taskmgr_pause(isc_taskmgr_t *manager0) {
|
isc__taskmgr_pause(isc_taskmgr_t *manager0) {
|
||||||
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
||||||
manager->pause_requested = true;
|
unsigned int i;
|
||||||
LOCK(&manager->lock);
|
|
||||||
while (manager->tasks_running > 0) {
|
LOCK(&manager->posthalt_lock);
|
||||||
WAIT(&manager->paused, &manager->lock);
|
while (manager->exclusive_requested || manager->pause_requested) {
|
||||||
|
UNLOCK(&manager->posthalt_lock);
|
||||||
|
/* This is ugly but pause is used EXCLUSIVELY in tests */
|
||||||
|
isc_thread_yield();
|
||||||
|
LOCK(&manager->posthalt_lock);
|
||||||
}
|
}
|
||||||
UNLOCK(&manager->lock);
|
manager->pause_requested = true;
|
||||||
|
LOCK(&manager->prehalt_lock);
|
||||||
|
while (manager->halted < manager->workers) {
|
||||||
|
for (i = 0; i < manager->workers; i++) {
|
||||||
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
|
}
|
||||||
|
WAIT(&manager->halt_cond, &manager->prehalt_lock);
|
||||||
|
}
|
||||||
|
UNLOCK(&manager->prehalt_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
isc__taskmgr_resume(isc_taskmgr_t *manager0) {
|
isc__taskmgr_resume(isc_taskmgr_t *manager0) {
|
||||||
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0;
|
||||||
|
|
||||||
LOCK(&manager->lock);
|
|
||||||
if (manager->pause_requested) {
|
if (manager->pause_requested) {
|
||||||
manager->pause_requested = false;
|
manager->pause_requested = false;
|
||||||
BROADCAST(&manager->work_available[0]);
|
UNLOCK(&manager->posthalt_lock);
|
||||||
}
|
}
|
||||||
UNLOCK(&manager->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -1535,24 +1512,32 @@ isc_result_t
|
|||||||
isc_task_beginexclusive(isc_task_t *task0) {
|
isc_task_beginexclusive(isc_task_t *task0) {
|
||||||
isc__task_t *task = (isc__task_t *)task0;
|
isc__task_t *task = (isc__task_t *)task0;
|
||||||
isc__taskmgr_t *manager = task->manager;
|
isc__taskmgr_t *manager = task->manager;
|
||||||
|
unsigned int i;
|
||||||
|
|
||||||
REQUIRE(VALID_TASK(task));
|
REQUIRE(VALID_TASK(task));
|
||||||
|
|
||||||
REQUIRE(task->state == task_state_running);
|
REQUIRE(task->state == task_state_running);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TODO REQUIRE(task == task->manager->excl);
|
* TODO REQUIRE(task == task->manager->excl);
|
||||||
* it should be here, it fails on shutdown server->task
|
* it should be here, it fails on shutdown server->task
|
||||||
*/
|
*/
|
||||||
|
|
||||||
LOCK(&manager->lock);
|
if (manager->exclusive_requested || manager->pause_requested) {
|
||||||
if (manager->exclusive_requested) {
|
|
||||||
UNLOCK(&manager->lock);
|
|
||||||
return (ISC_R_LOCKBUSY);
|
return (ISC_R_LOCKBUSY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOCK(&manager->posthalt_lock);
|
||||||
|
INSIST(!manager->exclusive_requested && !manager->pause_requested);
|
||||||
manager->exclusive_requested = true;
|
manager->exclusive_requested = true;
|
||||||
while (manager->tasks_running > 1) {
|
LOCK(&manager->prehalt_lock);
|
||||||
WAIT(&manager->exclusive_granted, &manager->lock);
|
while (manager->halted + 1 < manager->workers) {
|
||||||
|
for (i = 0; i < manager->workers; i++) {
|
||||||
|
BROADCAST(&manager->queues[i].work_available);
|
||||||
|
}
|
||||||
|
WAIT(&manager->halt_cond, &manager->prehalt_lock);
|
||||||
}
|
}
|
||||||
UNLOCK(&manager->lock);
|
UNLOCK(&manager->prehalt_lock);
|
||||||
return (ISC_R_SUCCESS);
|
return (ISC_R_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1563,13 +1548,9 @@ isc_task_endexclusive(isc_task_t *task0) {
|
|||||||
|
|
||||||
REQUIRE(VALID_TASK(task));
|
REQUIRE(VALID_TASK(task));
|
||||||
REQUIRE(task->state == task_state_running);
|
REQUIRE(task->state == task_state_running);
|
||||||
LOCK(&manager->lock);
|
|
||||||
REQUIRE(manager->exclusive_requested);
|
REQUIRE(manager->exclusive_requested);
|
||||||
manager->exclusive_requested = false;
|
manager->exclusive_requested = false;
|
||||||
for (unsigned int i=0; i < manager->workers; i++) {
|
UNLOCK(&manager->posthalt_lock);
|
||||||
BROADCAST(&manager->work_available[i]);
|
|
||||||
}
|
|
||||||
UNLOCK(&manager->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
@@ -1577,7 +1558,6 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
|
|||||||
REQUIRE(ISCAPI_TASK_VALID(task0));
|
REQUIRE(ISCAPI_TASK_VALID(task0));
|
||||||
isc__task_t *task = (isc__task_t *)task0;
|
isc__task_t *task = (isc__task_t *)task0;
|
||||||
isc__taskmgr_t *manager = task->manager;
|
isc__taskmgr_t *manager = task->manager;
|
||||||
int queue = task->threadid % manager->queues;
|
|
||||||
bool oldpriv;
|
bool oldpriv;
|
||||||
|
|
||||||
LOCK(&task->lock);
|
LOCK(&task->lock);
|
||||||
@@ -1591,14 +1571,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
|
|||||||
if (priv == oldpriv)
|
if (priv == oldpriv)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LOCK(manager->locks[queue]);
|
LOCK(&manager->queues[task->threadid].lock);
|
||||||
if (priv && ISC_LINK_LINKED(task, ready_link))
|
if (priv && ISC_LINK_LINKED(task, ready_link))
|
||||||
ENQUEUE(manager->ready_priority_tasks[queue], task,
|
ENQUEUE(manager->queues[task->threadid].ready_priority_tasks,
|
||||||
ready_priority_link);
|
task, ready_priority_link);
|
||||||
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
|
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
|
||||||
DEQUEUE(manager->ready_priority_tasks[queue], task,
|
DEQUEUE(manager->queues[task->threadid].ready_priority_tasks,
|
||||||
ready_priority_link);
|
task, ready_priority_link);
|
||||||
UNLOCK(manager->locks[queue]);
|
UNLOCK(&manager->queues[task->threadid].lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
|
Reference in New Issue
Block a user