diff --git a/lib/isc/task.c b/lib/isc/task.c index e2cf9793f5..d36143d32a 100644 --- a/lib/isc/task.c +++ b/lib/isc/task.c @@ -43,18 +43,6 @@ #include #endif -/*% - * For BIND9 internal applications: - * when built with threads we use multiple worker threads shared by the whole - * application. - * when built without threads we share a single global task manager and use - * an integrated event loop for socket, timer, and other generic task events. - * For generic library: - * we don't use either of them: an application can have multiple task managers - * whether or not it's threaded, and if the application is threaded each thread - * is expected to have a separate manager; no "worker threads" are shared by - * the application threads. - */ #ifdef ISC_TASK_TRACE #define XTRACE(m) fprintf(stderr, "task %p thread %lu: %s\n", \ task, isc_thread_self(), (m)) @@ -88,6 +76,7 @@ static const char *statenames[] = { typedef struct isc__task isc__task_t; typedef struct isc__taskmgr isc__taskmgr_t; +typedef struct isc__taskqueue isc__taskqueue_t; struct isc__task { /* Not locked. */ @@ -124,32 +113,43 @@ struct isc__task { typedef ISC_LIST(isc__task_t) isc__tasklist_t; +struct isc__taskqueue { + isc__tasklist_t ready_tasks; + isc__tasklist_t ready_priority_tasks; + isc_condition_t work_available; + isc_mutex_t lock; + isc_thread_t thread; + unsigned int threadid; + isc__taskmgr_t *manager; +}; + struct isc__taskmgr { /* Not locked. */ isc_taskmgr_t common; isc_mem_t * mctx; isc_mutex_t lock; - isc_mutex_t **locks; + isc_mutex_t prehalt_lock; + isc_mutex_t posthalt_lock; + isc_condition_t halt_cond; + isc_condition_t halt_avail_cond; unsigned int workers; - unsigned int queues; - isc_thread_t * threads; atomic_uint_fast32_t tasks_running; atomic_uint_fast32_t tasks_ready; atomic_uint_fast32_t curq; + isc__taskqueue_t *queues; /* Locked by task manager lock. */ unsigned int default_quantum; LIST(isc__task_t) tasks; - isc__tasklist_t *ready_tasks; - isc__tasklist_t *ready_priority_tasks; isc_taskmgrmode_t mode; - isc_condition_t *work_available; - isc_condition_t exclusive_granted; - isc_condition_t paused; bool pause_requested; bool exclusive_requested; bool exiting; + /* Locked by {pre/post}halt_lock combo */ + unsigned int halted; + + /* * Multiple threads can read/write 'excl' at the same time, so we need * to protect the access. We can't use 'lock' since isc_task_detach() @@ -215,8 +215,8 @@ task_finished(isc__task_t *task) { * any idle worker threads so they * can exit. */ - for (unsigned int i=0; iqueues; i++) { - BROADCAST(&manager->work_available[i]); + for (unsigned int i=0; iworkers; i++) { + BROADCAST(&manager->queues[i].work_available); } } DESTROYLOCK(&task->lock); @@ -242,7 +242,7 @@ isc_task_create(isc_taskmgr_t *manager0, unsigned int quantum, return (ISC_R_NOMEMORY); XTRACE("isc_task_create"); task->manager = manager; - task->threadid = -1; + task->threadid = atomic_fetch_add_explicit(&manager->curq, 1, memory_order_relaxed) % manager->workers; result = isc_mutex_init(&task->lock); if (result != ISC_R_SUCCESS) { isc_mem_put(manager->mctx, task, sizeof(*task)); @@ -354,17 +354,16 @@ static inline void task_ready(isc__task_t *task) { isc__taskmgr_t *manager = task->manager; bool has_privilege = isc_task_privilege((isc_task_t *) task); - int queue = task->threadid % manager->queues; REQUIRE(VALID_MANAGER(manager)); REQUIRE(task->state == task_state_ready); XTRACE("task_ready"); - LOCK(manager->locks[queue]); - push_readyq(manager, task, queue); + LOCK(&manager->queues[task->threadid].lock); + push_readyq(manager, task, task->threadid); if (manager->mode == isc_taskmgrmode_normal || has_privilege) - SIGNAL(&manager->work_available[queue]); - UNLOCK(manager->locks[queue]); + SIGNAL(&manager->queues[task->threadid].work_available); + UNLOCK(&manager->queues[task->threadid].lock); } static inline bool @@ -468,9 +467,6 @@ void isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { isc__task_t *task = (isc__task_t *)task0; bool was_idle; - if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); - } /* * Send '*event' to 'task'. @@ -478,6 +474,12 @@ isc_task_sendto(isc_task_t *task0, isc_event_t **eventp, int c) { REQUIRE(VALID_TASK(task)); + if (c == -1) { + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed) + % task->manager->workers; + } + XTRACE("isc_task_send"); /* @@ -523,7 +525,9 @@ isc_task_sendtoanddetach(isc_task_t **taskp, isc_event_t **eventp, int c) { task = (isc__task_t *)*taskp; REQUIRE(VALID_TASK(task)); if (c == -1) { - c = atomic_fetch_add_explicit(&task->manager->curq, 1, memory_order_relaxed); + c = atomic_fetch_add_explicit(&task->manager->curq, 1, + memory_order_relaxed) + % task->manager->workers; } XTRACE("isc_task_sendanddetach"); @@ -857,9 +861,9 @@ empty_readyq(isc__taskmgr_t *manager, int c) { isc__tasklist_t queue; if (manager->mode == isc_taskmgrmode_normal) - queue = manager->ready_tasks[c]; + queue = manager->queues[c].ready_tasks; else - queue = manager->ready_priority_tasks[c]; + queue = manager->queues[c].ready_priority_tasks; return (EMPTY(queue)); } @@ -877,14 +881,14 @@ pop_readyq(isc__taskmgr_t *manager, int c) { isc__task_t *task; if (manager->mode == isc_taskmgrmode_normal) - task = HEAD(manager->ready_tasks[c]); + task = HEAD(manager->queues[c].ready_tasks); else - task = HEAD(manager->ready_priority_tasks[c]); + task = HEAD(manager->queues[c].ready_priority_tasks); if (task != NULL) { - DEQUEUE(manager->ready_tasks[c], task, ready_link); + DEQUEUE(manager->queues[c].ready_tasks, task, ready_link); if (ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks[c], task, + DEQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); } @@ -899,9 +903,9 @@ pop_readyq(isc__taskmgr_t *manager, int c) { */ static inline void push_readyq(isc__taskmgr_t *manager, isc__task_t *task, int c) { - ENQUEUE(manager->ready_tasks[c], task, ready_link); + ENQUEUE(manager->queues[c].ready_tasks, task, ready_link); if ((task->flags & TASK_F_PRIVILEGED) != 0) - ENQUEUE(manager->ready_priority_tasks[c], task, + ENQUEUE(manager->queues[c].ready_priority_tasks, task, ready_priority_link); atomic_fetch_add_explicit(&manager->tasks_ready, 1, memory_order_relaxed); } @@ -916,8 +920,6 @@ dispatch(isc__taskmgr_t *manager, int threadid) { LOCK(&manager->lock); UNLOCK(&manager->lock); - int queue = threadid % manager->queues; - /* * Again we're trying to hold the lock for as short a time as possible * and to do as little locking and unlocking as possible. @@ -967,7 +969,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * For N iterations of the loop, this code does N+1 locks and N+1 * unlocks. The while expression is always protected by the lock. */ - LOCK(manager->locks[queue]); + LOCK(&manager->queues[threadid].lock); while (!FINISHED(manager)) { /* @@ -980,8 +982,8 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * If a pause has been requested, don't do any work * until it's been released. */ - while ((empty_readyq(manager, queue) || manager->pause_requested || - manager->exclusive_requested) && !FINISHED(manager)) + while ((empty_readyq(manager, threadid) && !manager->pause_requested && + !manager->exclusive_requested) && !FINISHED(manager)) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -992,7 +994,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_WAIT, manager->exclusive_requested ? "excreq" : "notexcreq")); - WAIT(&manager->work_available[queue], manager->locks[queue]); + WAIT(&manager->queues[threadid].work_available, &manager->queues[threadid].lock); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_AWAKE, "awake")); @@ -1000,7 +1002,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) { XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, ISC_MSG_WORKING, "working")); - task = pop_readyq(manager, queue); + if (manager->pause_requested || manager->exclusive_requested) { + UNLOCK(&manager->queues[threadid].lock); + XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_TASK, + ISC_MSG_WORKING, "halting")); + + /* + * First we increase 'halted' and signal the thread + * that's waiting on exclusivity/pause. Then we + * try locking posthalt lock, which will be locked + * by exclusive/pausing thread. It is only unlocked + * after exclusivity/pause is done. + */ + LOCK(&manager->prehalt_lock); + manager->halted++; + SIGNAL(&manager->halt_cond); + UNLOCK(&manager->prehalt_lock); + + LOCK(&manager->posthalt_lock); + manager->halted--; + UNLOCK(&manager->posthalt_lock); + + LOCK(&manager->queues[threadid].lock); + /* Restart the loop after */ + continue; + } + + task = pop_readyq(manager, threadid); if (task != NULL) { unsigned int dispatch_count = 0; bool done = false; @@ -1015,7 +1043,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * have a task to do. We must reacquire the manager * lock before exiting the 'if (task != NULL)' block. */ - UNLOCK(manager->locks[queue]); + UNLOCK(&manager->queues[threadid].lock); atomic_fetch_add_explicit(&manager->tasks_ready, -1, memory_order_relaxed); atomic_fetch_add_explicit(&manager->tasks_running, 1, memory_order_relaxed); @@ -1132,14 +1160,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { task_finished(task); atomic_fetch_add_explicit(&manager->tasks_running, -1, memory_order_relaxed); - if (manager->exclusive_requested && - manager->tasks_running == 1) { - SIGNAL(&manager->exclusive_granted); - } else if (manager->pause_requested && - manager->tasks_running == 0) { - SIGNAL(&manager->paused); - } - LOCK(manager->locks[queue]); + LOCK(&manager->queues[threadid].lock); if (requeue) { /* * We know we're awake, so we don't have @@ -1160,7 +1181,7 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * were usually nonempty, the 'optimization' * might even hurt rather than help. */ - push_readyq(manager, task, queue); + push_readyq(manager, task, threadid); } } @@ -1170,39 +1191,33 @@ dispatch(isc__taskmgr_t *manager, int threadid) { * we're stuck. Automatically drop privileges at that * point and continue with the regular ready queue. */ - if (manager->tasks_running == 0 && empty_readyq(manager, queue)) { + if (manager->tasks_running == 0 && empty_readyq(manager, threadid)) { manager->mode = isc_taskmgrmode_normal; for (unsigned i=0; i < manager->workers; i++) { - BROADCAST(&manager->work_available[i]); + BROADCAST(&manager->queues[i].work_available); } } } - UNLOCK(manager->locks[queue]); + UNLOCK(&manager->queues[threadid].lock); /* * There might be other dispatchers waiting on empty tasks, * wake them up. */ for (unsigned i=0; i < manager->workers; i++) { - LOCK(manager->locks[i]); - BROADCAST(&manager->work_available[i]); - UNLOCK(manager->locks[i]); + LOCK(&manager->queues[i].lock); + BROADCAST(&manager->queues[i].work_available); + UNLOCK(&manager->queues[i].lock); } } -typedef struct st { - isc__taskmgr_t *manager; - int threadid; -} stt; - static isc_threadresult_t #ifdef _WIN32 WINAPI #endif -run(void *uap) { - stt *st = uap; - isc__taskmgr_t *manager = st->manager; - int threadid = st->threadid; - isc_mem_put(manager->mctx, st, sizeof(*st)); +run(void *queuep) { + isc__taskqueue_t *tq = queuep; + isc__taskmgr_t *manager = tq->manager; + int threadid = tq->threadid; isc_thread_setaffinity(threadid); XTHREADTRACE(isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -1222,19 +1237,15 @@ run(void *uap) { static void manager_free(isc__taskmgr_t *manager) { - isc_mem_t *mctx; + /* TODO */ - (void)isc_condition_destroy(&manager->exclusive_granted); - (void)isc_condition_destroy(&manager->work_available[0]); - (void)isc_condition_destroy(&manager->paused); - isc_mem_free(manager->mctx, manager->threads); DESTROYLOCK(&manager->lock); - DESTROYLOCK(&manager->excl_lock); + DESTROYLOCK(&manager->prehalt_lock); + DESTROYLOCK(&manager->posthalt_lock); + isc_mem_put(manager->mctx, manager->queues, manager->workers * sizeof(isc__taskqueue_t)); manager->common.impmagic = 0; manager->common.magic = 0; - mctx = manager->mctx; - isc_mem_put(mctx, manager, sizeof(*manager)); - isc_mem_detach(&mctx); + isc_mem_putanddetach(&manager->mctx, manager, sizeof(*manager)); } isc_result_t @@ -1242,7 +1253,7 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, unsigned int default_quantum, isc_taskmgr_t **managerp) { isc_result_t result; - unsigned int i, started = 0; + unsigned int i; isc__taskmgr_t *manager; /* @@ -1268,45 +1279,26 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, goto cleanup_mgr; } - manager->workers = 0; - manager->queues = 0; - manager->threads = isc_mem_allocate(mctx, - workers * sizeof(isc_thread_t)); - if (manager->threads == NULL) { - result = ISC_R_NOMEMORY; - goto cleanup_lock; - } - if (isc_condition_init(&manager->exclusive_granted) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_workavailable; - } - if (isc_condition_init(&manager->paused) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_exclusivegranted; - } + RUNTIME_CHECK(isc_mutex_init(&manager->prehalt_lock) == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_mutex_init(&manager->posthalt_lock) == ISC_R_SUCCESS); + + RUNTIME_CHECK(isc_condition_init(&manager->halt_cond) == ISC_R_SUCCESS); + + manager->workers = workers; + if (default_quantum == 0) default_quantum = DEFAULT_DEFAULT_QUANTUM; manager->default_quantum = default_quantum; INIT_LIST(manager->tasks); - manager->ready_tasks = malloc(workers * sizeof(isc__tasklist_t)); - manager->locks = malloc(workers * sizeof(isc_mutex_t *)); - manager->work_available = malloc(workers * sizeof(isc_condition_t)); - manager->ready_priority_tasks = malloc(workers * sizeof(isc__tasklist_t)); + manager->queues = isc_mem_get(mctx, workers * sizeof(isc__taskqueue_t)); manager->tasks_running = 0; manager->tasks_ready = 0; - manager->exclusive_requested = false; - manager->pause_requested = false; manager->curq = 0; manager->exiting = false; manager->excl = NULL; + manager->halted = 0; + manager->exclusive_requested = false; + manager->pause_requested = false; isc_mem_attach(mctx, &manager->mctx); @@ -1315,53 +1307,30 @@ isc_taskmgr_create(isc_mem_t *mctx, unsigned int workers, * Start workers. */ for (i = 0; i < workers; i++) { - INIT_LIST(manager->ready_tasks[i]); - INIT_LIST(manager->ready_priority_tasks[i]); - manager->locks[i] = malloc(4096); - isc_mutex_init(manager->locks[i]); - if (isc_condition_init(&manager->work_available[i]) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_condition_init() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - result = ISC_R_UNEXPECTED; - goto cleanup_threads; - } - stt *st = isc_mem_get(mctx, sizeof(stt)); - st->manager = manager; - st->threadid = i; - if (isc_thread_create(run, st, - &manager->threads[i]) != ISC_R_SUCCESS) { - goto cleanup_threads; - } - char name[16]; /* thread name limit on Linux */ + INIT_LIST(manager->queues[i].ready_tasks); + INIT_LIST(manager->queues[i].ready_priority_tasks); + RUNTIME_CHECK(isc_mutex_init(&manager->queues[i].lock) + == ISC_R_SUCCESS); + RUNTIME_CHECK(isc_condition_init( + &manager->queues[i].work_available) + == ISC_R_SUCCESS); + manager->queues[i].manager = manager; + manager->queues[i].threadid = i; + RUNTIME_CHECK(isc_thread_create(run, &manager->queues[i], + &manager->queues[i].thread) + == ISC_R_SUCCESS); + char name[16]; snprintf(name, sizeof(name), "isc-worker%04u", i); - isc_thread_setname(manager->threads[manager->workers], - name); - manager->workers++; - started++; + isc_thread_setname(manager->queues[i].thread, name); } - manager->queues = manager->workers; UNLOCK(&manager->lock); - if (started == 0) { - manager_free(manager); - return (ISC_R_NOTHREADS); - } isc_thread_setconcurrency(workers); *managerp = (isc_taskmgr_t *)manager; return (ISC_R_SUCCESS); - cleanup_exclusivegranted: - (void)isc_condition_destroy(&manager->exclusive_granted); - cleanup_workavailable: - (void)isc_condition_destroy(&manager->work_available[0]); - cleanup_threads: - isc_mem_free(mctx, manager->threads); - cleanup_lock: - DESTROYLOCK(&manager->lock); cleanup_mgr: isc_mem_put(mctx, manager, sizeof(*manager)); return (result); @@ -1429,8 +1398,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { task = NEXT(task, link)) { LOCK(&task->lock); if (task_shutdown(task)) { - int queue = task->threadid % manager->queues; - push_readyq(manager, task, queue); + push_readyq(manager, task, task->threadid); } UNLOCK(&task->lock); } @@ -1439,10 +1407,10 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * there's work left to do, and if there are already no tasks left * it will cause the workers to see manager->exiting. */ - for (i = 0; i < manager->queues; i++) { - LOCK(manager->locks[i]); - BROADCAST(&manager->work_available[i]); - UNLOCK(manager->locks[i]); + for (i = 0; i < manager->workers; i++) { + LOCK(&manager->queues[i].lock); + BROADCAST(&manager->queues[i].work_available); + UNLOCK(&manager->queues[i].lock); } UNLOCK(&manager->lock); @@ -1450,7 +1418,7 @@ isc_taskmgr_destroy(isc_taskmgr_t **managerp) { * Wait for all the worker threads to exit. */ for (i = 0; i < manager->workers; i++) - (void)isc_thread_join(manager->threads[i], NULL); + (void)isc_thread_join(manager->queues[i].thread, NULL); manager_free(manager); @@ -1479,24 +1447,33 @@ isc_taskmgr_mode(isc_taskmgr_t *manager0) { void isc__taskmgr_pause(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; - manager->pause_requested = true; - LOCK(&manager->lock); - while (manager->tasks_running > 0) { - WAIT(&manager->paused, &manager->lock); + unsigned int i; + + LOCK(&manager->posthalt_lock); + while (manager->exclusive_requested || manager->pause_requested) { + UNLOCK(&manager->posthalt_lock); + /* This is ugly but pause is used EXCLUSIVELY in tests */ + isc_thread_yield(); + LOCK(&manager->posthalt_lock); } - UNLOCK(&manager->lock); + manager->pause_requested = true; + LOCK(&manager->prehalt_lock); + while (manager->halted < manager->workers) { + for (i = 0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } + WAIT(&manager->halt_cond, &manager->prehalt_lock); + } + UNLOCK(&manager->prehalt_lock); } void isc__taskmgr_resume(isc_taskmgr_t *manager0) { isc__taskmgr_t *manager = (isc__taskmgr_t *)manager0; - - LOCK(&manager->lock); if (manager->pause_requested) { manager->pause_requested = false; - BROADCAST(&manager->work_available[0]); + UNLOCK(&manager->posthalt_lock); } - UNLOCK(&manager->lock); } void @@ -1535,24 +1512,32 @@ isc_result_t isc_task_beginexclusive(isc_task_t *task0) { isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; + unsigned int i; + REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); + /* * TODO REQUIRE(task == task->manager->excl); * it should be here, it fails on shutdown server->task */ - LOCK(&manager->lock); - if (manager->exclusive_requested) { - UNLOCK(&manager->lock); + if (manager->exclusive_requested || manager->pause_requested) { return (ISC_R_LOCKBUSY); } + + LOCK(&manager->posthalt_lock); + INSIST(!manager->exclusive_requested && !manager->pause_requested); manager->exclusive_requested = true; - while (manager->tasks_running > 1) { - WAIT(&manager->exclusive_granted, &manager->lock); + LOCK(&manager->prehalt_lock); + while (manager->halted + 1 < manager->workers) { + for (i = 0; i < manager->workers; i++) { + BROADCAST(&manager->queues[i].work_available); + } + WAIT(&manager->halt_cond, &manager->prehalt_lock); } - UNLOCK(&manager->lock); + UNLOCK(&manager->prehalt_lock); return (ISC_R_SUCCESS); } @@ -1563,13 +1548,9 @@ isc_task_endexclusive(isc_task_t *task0) { REQUIRE(VALID_TASK(task)); REQUIRE(task->state == task_state_running); - LOCK(&manager->lock); REQUIRE(manager->exclusive_requested); manager->exclusive_requested = false; - for (unsigned int i=0; i < manager->workers; i++) { - BROADCAST(&manager->work_available[i]); - } - UNLOCK(&manager->lock); + UNLOCK(&manager->posthalt_lock); } void @@ -1577,7 +1558,6 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { REQUIRE(ISCAPI_TASK_VALID(task0)); isc__task_t *task = (isc__task_t *)task0; isc__taskmgr_t *manager = task->manager; - int queue = task->threadid % manager->queues; bool oldpriv; LOCK(&task->lock); @@ -1591,14 +1571,14 @@ isc_task_setprivilege(isc_task_t *task0, bool priv) { if (priv == oldpriv) return; - LOCK(manager->locks[queue]); + LOCK(&manager->queues[task->threadid].lock); if (priv && ISC_LINK_LINKED(task, ready_link)) - ENQUEUE(manager->ready_priority_tasks[queue], task, - ready_priority_link); + ENQUEUE(manager->queues[task->threadid].ready_priority_tasks, + task, ready_priority_link); else if (!priv && ISC_LINK_LINKED(task, ready_priority_link)) - DEQUEUE(manager->ready_priority_tasks[queue], task, - ready_priority_link); - UNLOCK(manager->locks[queue]); + DEQUEUE(manager->queues[task->threadid].ready_priority_tasks, + task, ready_priority_link); + UNLOCK(&manager->queues[task->threadid].lock); } bool