mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-31 06:25:31 +00:00
Multiple worker queues
This commit is contained in:
245
lib/isc/task.c
245
lib/isc/task.c
@@ -21,6 +21,7 @@
|
||||
#include <stdbool.h>
|
||||
|
||||
#include <isc/app.h>
|
||||
#include <isc/atomic.h>
|
||||
#include <isc/condition.h>
|
||||
#include <isc/event.h>
|
||||
#include <isc/json.h>
|
||||
@@ -31,6 +32,7 @@
|
||||
#include <isc/platform.h>
|
||||
#include <isc/print.h>
|
||||
#include <isc/string.h>
|
||||
#include <isc/random.h>
|
||||
#include <isc/task.h>
|
||||
#include <isc/thread.h>
|
||||
#include <isc/time.h>
|
||||
@@ -104,6 +106,7 @@ struct isc__task {
|
||||
isc_time_t tnow;
|
||||
char name[16];
|
||||
void * tag;
|
||||
int threadid;
|
||||
/* Locked by task manager lock. */
|
||||
LINK(isc__task_t) link;
|
||||
LINK(isc__task_t) ready_link;
|
||||
@@ -126,22 +129,26 @@ struct isc__taskmgr {
|
||||
isc_taskmgr_t common;
|
||||
isc_mem_t * mctx;
|
||||
isc_mutex_t lock;
|
||||
isc_mutex_t **locks;
|
||||
unsigned int workers;
|
||||
unsigned int queues;
|
||||
isc_thread_t * threads;
|
||||
atomic_uint_fast32_t tasks_running;
|
||||
atomic_uint_fast32_t tasks_ready;
|
||||
atomic_uint_fast32_t curq;
|
||||
|
||||
/* Locked by task manager lock. */
|
||||
unsigned int default_quantum;
|
||||
LIST(isc__task_t) tasks;
|
||||
isc__tasklist_t ready_tasks;
|
||||
isc__tasklist_t ready_priority_tasks;
|
||||
isc__tasklist_t *ready_tasks;
|
||||
isc__tasklist_t *ready_priority_tasks;
|
||||
isc_taskmgrmode_t mode;
|
||||
isc_condition_t work_available;
|
||||
isc_condition_t *work_available;
|
||||
isc_condition_t exclusive_granted;
|
||||
isc_condition_t paused;
|
||||
unsigned int tasks_running;
|
||||
unsigned int tasks_ready;
|
||||
bool pause_requested;
|
||||
bool exclusive_requested;
|
||||
bool exiting;
|
||||
bool pause_requested;
|
||||
bool exclusive_requested;
|
||||
bool exiting;
|
||||
|
||||
/*
|
||||
* Multiple threads can read/write 'excl' at the same time, so we need
|
||||
@@ -175,13 +182,13 @@ isc_taskmgr_setexcltask(isc_taskmgr_t *mgr0, isc_task_t *task0);
|
||||
isc_result_t
|
||||
isc_taskmgr_excltask(isc_taskmgr_t *mgr0, isc_task_t **taskp);
|
||||
static inline bool
|
||||
empty_readyq(isc__taskmgr_t *manager);
|
||||
empty_readyq(isc__taskmgr_t *manager, int c);
|
||||
|
||||
static inline isc__task_t *
|
||||
pop_readyq(isc__taskmgr_t *manager);
|
||||
pop_readyq(isc__taskmgr_t *manager, int c);
|
||||
|
||||
static inline void
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task);
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c);
|
||||
|
||||
/***
|
||||
*** Tasks.
|
||||
@@ -190,7 +197,6 @@ push_readyq(isc__taskmgr_t *manager, isc__task_t *task);
|
||||
static void
|
||||
task_finished(isc__task_t *task) {
|
||||
isc__taskmgr_t *manager = task->manager;
|
||||
|
||||
REQUIRE(EMPTY(task->events));
|
||||
REQUIRE(task->nevents == 0);
|
||||
REQUIRE(EMPTY(task->on_shutdown));
|
||||
@@ -201,6 +207,7 @@ task_finished(isc__task_t *task) {
|
||||
|
||||
LOCK(&manager->lock);
|
||||
UNLINK(manager->tasks, task, link);
|
||||
UNLOCK(&manager->lock);
|
||||
if (FINISHED(manager)) {
|
||||
/*
|
||||
* All tasks have completed and the
|
||||
@@ -208,10 +215,10 @@ task_finished(isc__task_t *task) {
|
||||
* any idle worker threads so they
|
||||
* can exit.
|
||||
*/
|
||||
BROADCAST(&manager->work_available);
|
||||
for (unsigned int i=0; i<manager->queues; i++) {
|
||||
BROADCAST(&manager->work_available[i]);
|
||||
}
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
DESTROYLOCK(&task->lock);
|
||||
task->common.impmagic = 0;
|
||||
task->common.magic = 0;
|
||||
@@ -235,6 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum,
|
||||
return (ISC_R_NOMEMORY);
|
||||
XTRACE("isc_task_create");
|
||||
task->manager = manager;
|
||||
task->threadid = -1;
|
||||
result = isc_mutex_init(&task->lock);
|
||||
if (result != ISC_R_SUCCESS) {
|
||||
isc_mem_put(manager->mctx, task, sizeof(*task));
|
||||
@@ -346,17 +354,17 @@ static inline void
|
||||
task_ready(isc__task_t *task) {
|
||||
isc__taskmgr_t *manager = task->manager;
|
||||
bool has_privilege = isc_task_privilege((isc_task_t *) task);
|
||||
int queue = task->threadid % manager->queues;
|
||||
|
||||
REQUIRE(VALID_MANAGER(manager));
|
||||
REQUIRE(task->state == task_state_ready);
|
||||
|
||||
XTRACE("task_ready");
|
||||
|
||||
LOCK(&manager->lock);
|
||||
push_readyq(manager, task);
|
||||
LOCK(manager->locks[queue]);
|
||||
push_readyq(manager, task, queue);
|
||||
if (manager->mode == isc_taskmgrmode_normal || has_privilege)
|
||||
SIGNAL(&manager->work_available);
|
||||
UNLOCK(&manager->lock);
|
||||
SIGNAL(&manager->work_available[queue]);
|
||||
UNLOCK(manager->locks[queue]);
|
||||
}
|
||||
|
||||
static inline bool
|
||||
@@ -414,7 +422,7 @@ isc_task_detach(isc_task_t **taskp) {
|
||||
}
|
||||
|
||||
static inline bool
|
||||
task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
task_send(isc__task_t *task, isc_event_t **eventp, int c) {
|
||||
bool was_idle = false;
|
||||
isc_event_t *event;
|
||||
|
||||
@@ -433,6 +441,7 @@ task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
|
||||
if (task->state == task_state_idle) {
|
||||
was_idle = true;
|
||||
task->threadid = c;
|
||||
INSIST(EMPTY(task->events));
|
||||
task->state = task_state_ready;
|
||||
}
|
||||
@@ -447,8 +456,21 @@ task_send(isc__task_t *task, isc_event_t **eventp) {
|
||||
|
||||
void
|
||||
isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
isc_task_sendto(task0, eventp, -1);
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
isc_task_sendtoanddetach(taskp, eventp, -1);
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) {
|
||||
isc__task_t *task = (isc__task_t *)task0;
|
||||
bool was_idle;
|
||||
if (c == -1) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
/*
|
||||
* Send '*event' to 'task'.
|
||||
@@ -464,7 +486,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
* some processing is deferred until after the lock is released.
|
||||
*/
|
||||
LOCK(&task->lock);
|
||||
was_idle = task_send(task, eventp);
|
||||
was_idle = task_send(task, eventp, c);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
if (was_idle) {
|
||||
@@ -488,7 +510,7 @@ isc_task_send(isc_task_t *task0, isc_event_t **eventp) {
|
||||
}
|
||||
|
||||
void
|
||||
isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) {
|
||||
bool idle1, idle2;
|
||||
isc__task_t *task;
|
||||
|
||||
@@ -500,11 +522,13 @@ isc_task_sendanddetach(isc_task_t **taskp, isc_event_t **eventp) {
|
||||
REQUIRE(taskp != NULL);
|
||||
task = (isc__task_t *)*taskp;
|
||||
REQUIRE(VALID_TASK(task));
|
||||
if (c == -1) {
|
||||
c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
XTRACE("isc_task_sendanddetach");
|
||||
|
||||
LOCK(&task->lock);
|
||||
idle1 = task_send(task, eventp);
|
||||
idle1 = task_send(task, eventp, c);
|
||||
idle2 = task_detach(task);
|
||||
UNLOCK(&task->lock);
|
||||
|
||||
@@ -829,13 +853,13 @@ isc_task_getcurrenttimex(isc_task_t *task0, isc_time_t *t) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline bool
|
||||
empty_readyq(isc__taskmgr_t *manager) {
|
||||
empty_readyq(isc__taskmgr_t *manager, int c) {
|
||||
isc__tasklist_t queue;
|
||||
|
||||
if (manager->mode == isc_taskmgrmode_normal)
|
||||
queue = manager->ready_tasks;
|
||||
queue = manager->ready_tasks[c];
|
||||
else
|
||||
queue = manager->ready_priority_tasks;
|
||||
queue = manager->ready_priority_tasks[c];
|
||||
|
||||
return (EMPTY(queue));
|
||||
}
|
||||
@@ -849,18 +873,18 @@ empty_readyq(isc__taskmgr_t *manager) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline isc__task_t *
|
||||
pop_readyq(isc__taskmgr_t *manager) {
|
||||
pop_readyq(isc__taskmgr_t *manager, int c) {
|
||||
isc__task_t *task;
|
||||
|
||||
if (manager->mode == isc_taskmgrmode_normal)
|
||||
task = HEAD(manager->ready_tasks);
|
||||
task = HEAD(manager->ready_tasks[c]);
|
||||
else
|
||||
task = HEAD(manager->ready_priority_tasks);
|
||||
task = HEAD(manager->ready_priority_tasks[c]);
|
||||
|
||||
if (task != NULL) {
|
||||
DEQUEUE(manager->ready_tasks, task, ready_link);
|
||||
DEQUEUE(manager->ready_tasks[c], task, ready_link);
|
||||
if (ISC_LINK_LINKED(task, ready_priority_link))
|
||||
DEQUEUE(manager->ready_priority_tasks, task,
|
||||
DEQUEUE(manager->ready_priority_tasks[c], task,
|
||||
ready_priority_link);
|
||||
}
|
||||
|
||||
@@ -874,20 +898,26 @@ pop_readyq(isc__taskmgr_t *manager) {
|
||||
* Caller must hold the task manager lock.
|
||||
*/
|
||||
static inline void
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task) {
|
||||
ENQUEUE(manager->ready_tasks, task, ready_link);
|
||||
push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) {
|
||||
ENQUEUE(manager->ready_tasks[c], task, ready_link);
|
||||
if ((task->flags & TASK_F_PRIVILEGED) != 0)
|
||||
ENQUEUE(manager->ready_priority_tasks, task,
|
||||
ENQUEUE(manager->ready_priority_tasks[c], task,
|
||||
ready_priority_link);
|
||||
manager->tasks_ready++;
|
||||
atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
static void
|
||||
dispatch(isc__taskmgr_t *manager) {
|
||||
dispatch(isc__taskmgr_t *manager, int threadid) {
|
||||
isc__task_t *task;
|
||||
|
||||
REQUIRE(VALID_MANAGER(manager));
|
||||
|
||||
/* Wait for everything to initialize */
|
||||
LOCK(&manager->lock);
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
int queue = threadid % manager->queues;
|
||||
|
||||
/*
|
||||
* Again we're trying to hold the lock for as short a time as possible
|
||||
* and to do as little locking and unlocking as possible.
|
||||
@@ -937,8 +967,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* For N iterations of the loop, this code does N+1 locks and N+1
|
||||
* unlocks. The while expression is always protected by the lock.
|
||||
*/
|
||||
|
||||
LOCK(&manager->lock);
|
||||
LOCK(manager->locks[queue]);
|
||||
|
||||
while (!FINISHED(manager)) {
|
||||
/*
|
||||
@@ -951,13 +980,19 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* If a pause has been requested, don't do any work
|
||||
* until it's been released.
|
||||
*/
|
||||
while ((empty_readyq(manager) || manager->pause_requested ||
|
||||
while ((empty_readyq(manager, queue) || manager->pause_requested ||
|
||||
manager->exclusive_requested) && !FINISHED(manager))
|
||||
{
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, "wait"));
|
||||
WAIT(&manager->work_available, &manager->lock);
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, manager->pause_requested ? "paused" : "notpaused"));
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq"));
|
||||
WAIT(&manager->work_available[queue], manager->locks[queue]);
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat,
|
||||
ISC_MSGSET_TASK,
|
||||
ISC_MSG_AWAKE, "awake"));
|
||||
@@ -965,7 +1000,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK,
|
||||
ISC_MSG_WORKING, "working"));
|
||||
|
||||
task = pop_readyq(manager);
|
||||
task = pop_readyq(manager, queue);
|
||||
if (task != NULL) {
|
||||
unsigned int dispatch_count = 0;
|
||||
bool done = false;
|
||||
@@ -980,15 +1015,16 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* have a task to do. We must reacquire the manager
|
||||
* lock before exiting the 'if (task != NULL)' block.
|
||||
*/
|
||||
manager->tasks_ready--;
|
||||
manager->tasks_running++;
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(manager->locks[queue]);
|
||||
atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed);
|
||||
atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed);
|
||||
|
||||
LOCK(&task->lock);
|
||||
INSIST(task->state == task_state_ready);
|
||||
task->state = task_state_running;
|
||||
XTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_RUNNING, "running"));
|
||||
XTRACE(task->name);
|
||||
TIME_NOW(&task->tnow);
|
||||
task->now = isc_time_seconds(&task->tnow);
|
||||
do {
|
||||
@@ -1004,6 +1040,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
ISC_MSGSET_TASK,
|
||||
ISC_MSG_EXECUTE,
|
||||
"execute action"));
|
||||
XTRACE(task->name);
|
||||
if (event->ev_action != NULL) {
|
||||
UNLOCK(&task->lock);
|
||||
(event->ev_action)(
|
||||
@@ -1094,8 +1131,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
if (finished)
|
||||
task_finished(task);
|
||||
|
||||
LOCK(&manager->lock);
|
||||
manager->tasks_running--;
|
||||
atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed);
|
||||
if (manager->exclusive_requested &&
|
||||
manager->tasks_running == 1) {
|
||||
SIGNAL(&manager->exclusive_granted);
|
||||
@@ -1103,6 +1139,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
manager->tasks_running == 0) {
|
||||
SIGNAL(&manager->paused);
|
||||
}
|
||||
LOCK(manager->locks[queue]);
|
||||
if (requeue) {
|
||||
/*
|
||||
* We know we're awake, so we don't have
|
||||
@@ -1123,7 +1160,7 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* were usually nonempty, the 'optimization'
|
||||
* might even hurt rather than help.
|
||||
*/
|
||||
push_readyq(manager, task);
|
||||
push_readyq(manager, task, queue);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1133,27 +1170,38 @@ dispatch(isc__taskmgr_t *manager) {
|
||||
* we're stuck. Automatically drop privileges at that
|
||||
* point and continue with the regular ready queue.
|
||||
*/
|
||||
if (manager->tasks_running == 0 && empty_readyq(manager)) {
|
||||
manager->mode = isc_taskmgrmode_normal;
|
||||
if (!empty_readyq(manager))
|
||||
BROADCAST(&manager->work_available);
|
||||
if (manager->tasks_running == 0 && empty_readyq(manager, queue)) {
|
||||
if (manager->mode != isc_taskmgrmode_normal) {
|
||||
manager->mode = isc_taskmgrmode_normal;
|
||||
for (unsigned i=0; i < manager->workers; i++) {
|
||||
BROADCAST(&manager->work_available[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(manager->locks[queue]);
|
||||
}
|
||||
|
||||
typedef struct st {
|
||||
isc__taskmgr_t *manager;
|
||||
int threadid;
|
||||
} stt;
|
||||
|
||||
static isc_threadresult_t
|
||||
#ifdef _WIN32
|
||||
WINAPI
|
||||
#endif
|
||||
run(void *uap) {
|
||||
isc__taskmgr_t *manager = uap;
|
||||
stt *st = uap;
|
||||
isc__taskmgr_t *manager = st->manager;
|
||||
int threadid = st->threadid;
|
||||
isc_mem_put(manager->mctx, st, sizeof(*st));
|
||||
isc_thread_setaffinity(threadid);
|
||||
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_STARTING, "starting"));
|
||||
|
||||
dispatch(manager);
|
||||
dispatch(manager, threadid);
|
||||
|
||||
XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_EXITING, "exiting"));
|
||||
@@ -1170,7 +1218,7 @@ manager_free(isc__taskmgr_t *manager) {
|
||||
isc_mem_t *mctx;
|
||||
|
||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
||||
(void)isc_condition_destroy(&manager->work_available);
|
||||
(void)isc_condition_destroy(&manager->work_available[0]);
|
||||
(void)isc_condition_destroy(&manager->paused);
|
||||
isc_mem_free(manager->mctx, manager->threads);
|
||||
DESTROYLOCK(&manager->lock);
|
||||
@@ -1214,20 +1262,13 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
}
|
||||
|
||||
manager->workers = 0;
|
||||
manager->queues = 0;
|
||||
manager->threads = isc_mem_allocate(mctx,
|
||||
workers * sizeof(isc_thread_t));
|
||||
if (manager->threads == NULL) {
|
||||
result = ISC_R_NOMEMORY;
|
||||
goto cleanup_lock;
|
||||
}
|
||||
if (isc_condition_init(&manager->work_available) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_FAILED, "failed"));
|
||||
result = ISC_R_UNEXPECTED;
|
||||
goto cleanup_threads;
|
||||
}
|
||||
if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
@@ -1248,12 +1289,15 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
default_quantum = DEFAULT_DEFAULT_QUANTUM;
|
||||
manager->default_quantum = default_quantum;
|
||||
INIT_LIST(manager->tasks);
|
||||
INIT_LIST(manager->ready_tasks);
|
||||
INIT_LIST(manager->ready_priority_tasks);
|
||||
manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t));
|
||||
manager->locks = malloc(workers * sizeof(isc_mutex_t *));
|
||||
manager->work_available = malloc(workers * sizeof(isc_condition_t));
|
||||
manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t));
|
||||
manager->tasks_running = 0;
|
||||
manager->tasks_ready = 0;
|
||||
manager->exclusive_requested = false;
|
||||
manager->pause_requested = false;
|
||||
manager->curq = 0;
|
||||
manager->exiting = false;
|
||||
manager->excl = NULL;
|
||||
|
||||
@@ -1264,17 +1308,33 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
* Start workers.
|
||||
*/
|
||||
for (i = 0; i < workers; i++) {
|
||||
if (isc_thread_create(run, manager,
|
||||
&manager->threads[manager->workers]) ==
|
||||
ISC_R_SUCCESS) {
|
||||
char name[16]; /* thread name limit on Linux */
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
isc_thread_setname(manager->threads[manager->workers],
|
||||
name);
|
||||
manager->workers++;
|
||||
started++;
|
||||
INIT_LIST(manager->ready_tasks[i]);
|
||||
INIT_LIST(manager->ready_priority_tasks[i]);
|
||||
manager->locks[i] = malloc(4096);
|
||||
isc_mutex_init(manager->locks[i]);
|
||||
if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) {
|
||||
UNEXPECTED_ERROR(__FILE__, __LINE__,
|
||||
"isc_condition_init() %s",
|
||||
isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL,
|
||||
ISC_MSG_FAILED, "failed"));
|
||||
result = ISC_R_UNEXPECTED;
|
||||
goto cleanup_threads;
|
||||
}
|
||||
stt *st = isc_mem_get(mctx, sizeof(stt));
|
||||
st->manager = manager;
|
||||
st->threadid = i;
|
||||
if (isc_thread_create(run, st,
|
||||
&manager->threads[i]) != ISC_R_SUCCESS) {
|
||||
goto cleanup_threads;
|
||||
}
|
||||
char name[16]; /* thread name limit on Linux */
|
||||
snprintf(name, sizeof(name), "isc-worker%04u", i);
|
||||
isc_thread_setname(manager->threads[manager->workers],
|
||||
name);
|
||||
manager->workers++;
|
||||
started++;
|
||||
}
|
||||
manager->queues = manager->workers;
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
if (started == 0) {
|
||||
@@ -1290,7 +1350,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers,
|
||||
cleanup_exclusivegranted:
|
||||
(void)isc_condition_destroy(&manager->exclusive_granted);
|
||||
cleanup_workavailable:
|
||||
(void)isc_condition_destroy(&manager->work_available);
|
||||
(void)isc_condition_destroy(&manager->work_available[0]);
|
||||
cleanup_threads:
|
||||
isc_mem_free(mctx, manager->threads);
|
||||
cleanup_lock:
|
||||
@@ -1362,7 +1422,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
task = NEXT(task, link)) {
|
||||
LOCK(&task->lock);
|
||||
if (task_shutdown(task))
|
||||
push_readyq(manager, task);
|
||||
push_readyq(manager, task, 0);
|
||||
UNLOCK(&task->lock);
|
||||
}
|
||||
/*
|
||||
@@ -1370,7 +1430,11 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) {
|
||||
* there's work left to do, and if there are already no tasks left
|
||||
* it will cause the workers to see manager->exiting.
|
||||
*/
|
||||
BROADCAST(&manager->work_available);
|
||||
for (i = 0; i < manager->queues; i++) {
|
||||
LOCK(manager->locks[i]);
|
||||
BROADCAST(&manager->work_available[i]);
|
||||
UNLOCK(manager->locks[i]);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
|
||||
/*
|
||||
@@ -1421,7 +1485,7 @@ isc__taskmgr_resume(isc_taskmgr_t *manager0) {
|
||||
LOCK(&manager->lock);
|
||||
if (manager->pause_requested) {
|
||||
manager->pause_requested = false;
|
||||
BROADCAST(&manager->work_available);
|
||||
BROADCAST(&manager->work_available[0]);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
}
|
||||
@@ -1493,7 +1557,9 @@ isc_task_endexclusive(isc_task_t *task0) {
|
||||
LOCK(&manager->lock);
|
||||
REQUIRE(manager->exclusive_requested);
|
||||
manager->exclusive_requested = false;
|
||||
BROADCAST(&manager->work_available);
|
||||
for (unsigned int i=0; i < manager->workers; i++) {
|
||||
BROADCAST(&manager->work_available[i]);
|
||||
}
|
||||
UNLOCK(&manager->lock);
|
||||
}
|
||||
|
||||
@@ -1502,6 +1568,7 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
|
||||
REQUIRE(ISCAPI_TASK_VALID(task0));
|
||||
isc__task_t *task = (isc__task_t *)task0;
|
||||
isc__taskmgr_t *manager = task->manager;
|
||||
int queue = task->threadid % manager->queues;
|
||||
bool oldpriv;
|
||||
|
||||
LOCK(&task->lock);
|
||||
@@ -1515,14 +1582,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) {
|
||||
if (priv == oldpriv)
|
||||
return;
|
||||
|
||||
LOCK(&manager->lock);
|
||||
LOCK(manager->locks[queue]);
|
||||
if (priv && ISC_LINK_LINKED(task, ready_link))
|
||||
ENQUEUE(manager->ready_priority_tasks, task,
|
||||
ENQUEUE(manager->ready_priority_tasks[queue], task,
|
||||
ready_priority_link);
|
||||
else if (!priv && ISC_LINK_LINKED(task, ready_priority_link))
|
||||
DEQUEUE(manager->ready_priority_tasks, task,
|
||||
DEQUEUE(manager->ready_priority_tasks[queue], task,
|
||||
ready_priority_link);
|
||||
UNLOCK(&manager->lock);
|
||||
UNLOCK(manager->locks[queue]);
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -1575,11 +1642,13 @@ isc_taskmgr_renderxml(isc_taskmgr_t *mgr0, xmlTextWriterPtr writer) {
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* default-quantum */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-running"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_running));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
||||
(int) mgr->tasks_running));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* tasks-running */
|
||||
|
||||
TRY0(xmlTextWriterStartElement(writer, ISC_XMLCHAR "tasks-ready"));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d", mgr->tasks_ready));
|
||||
TRY0(xmlTextWriterWriteFormatString(writer, "%d",
|
||||
(int) mgr->tasks_ready));
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* tasks-ready */
|
||||
|
||||
TRY0(xmlTextWriterEndElement(writer)); /* thread-model */
|
||||
|
Reference in New Issue
Block a user