commit 7ee52cc7d195433bb8f55972e2a8ab29668f7bce Author: Bob Halley Date: Mon Aug 17 22:05:58 1998 +0000 base diff --git a/bin/tests/mem_test.c b/bin/tests/mem_test.c new file mode 100644 index 0000000000..064255a614 --- /dev/null +++ b/bin/tests/mem_test.c @@ -0,0 +1,145 @@ + +#include +#include +#include +#include +#ifdef SOLARIS +#include +#endif + +#include +#include "memcluster.h" + +char *ptr1[50000]; +char *ptr2[50000]; + +#define ALLOCSZ 100 + +#undef THREADS +#undef LOCKMUTEX +#undef FINELOCK +#undef GLOBALMUTEX +#undef GLOBALMEMCTX +#undef USE_MALLOC +#undef FILL +#define STATS + +pthread_mutex_t global_mutex = PTHREAD_MUTEX_INITIALIZER; +mem_context_t global_ctx = NULL; + +static void +work(int n, char **p, mem_context_t m, pthread_mutex_t *mutex) { + int i; + +#if !defined(LOCKMUTEX) + /* Always "use" mutex, so compilers don't complain. */ + mutex = NULL; +#endif +#if defined(THREADS) && defined(LOCKMUTEX) && !defined(FINELOCK) + INSIST(pthread_mutex_lock(mutex) == 0); +#endif + for (i = 0; i < n; i++) { +#if defined(THREADS) && defined(LOCKMUTEX) && defined(FINELOCK) + INSIST(pthread_mutex_lock(mutex) == 0); +#endif +#ifdef USE_MALLOC + p[i] = malloc(ALLOCSZ); +#else + p[i] = mem_allocate(m, ALLOCSZ); +#endif +#if defined(THREADS) && defined(LOCKMUTEX) && defined(FINELOCK) + INSIST(pthread_mutex_unlock(mutex) == 0); +#endif + INSIST(p[i] != NULL); +#if defined(FILL) + { + int j; + + for (j = 0; j < ALLOCSZ; j++) + p[i][j] = j; + } +#endif + } +#if defined(THREADS) && defined(LOCKMUTEX) && !defined(FINELOCK) + INSIST(pthread_mutex_unlock(mutex) == 0); +#endif +#if defined(THREADS) && defined(LOCKMUTEX) && !defined(FINELOCK) + INSIST(pthread_mutex_lock(mutex) == 0); +#endif + for (i = 0; i < n; i++) { +#if defined(THREADS) && defined(LOCKMUTEX) && defined(FINELOCK) + INSIST(pthread_mutex_lock(mutex) == 0); +#endif +#ifdef USE_MALLOC + free(p[i]); +#else + mem_free(m, p[i]); +#endif +#if defined(THREADS) && defined(LOCKMUTEX) && defined(FINELOCK) + INSIST(pthread_mutex_unlock(mutex) == 0); +#endif + p[i] = NULL; + } +#if defined(THREADS) && defined(LOCKMUTEX) && !defined(FINELOCK) + INSIST(pthread_mutex_unlock(mutex) == 0); +#endif +} + +static void * +run(void *arg) { + char **p = arg; + mem_context_t m; + pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + pthread_mutex_t *mutexp; + +#ifdef GLOBALMUTEX + mutexp = &global_mutex; +#else + mutexp = &mutex; +#endif + +#ifdef GLOBALMEMCTX + m = global_ctx; +#else + INSIST(mem_context_create(0, 0, &m) == 0); +#endif + + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); + work(50000, p, m, mutexp); +#ifdef STATS + mem_stats(m, stdout); +#endif + return (NULL); +} + +int +main(void) { +#ifdef THREADS + pthread_t t1, t2; +#endif + +#ifdef GLOBALMEMCTX + INSIST(mem_context_create(0, 0, &global_ctx) == 0); +#endif +#ifdef SOLARIS + thr_setconcurrency(2); /* Ick. */ +#endif +#ifdef THREADS + INSIST(pthread_create(&t1, NULL, run, ptr1) == 0); + INSIST(pthread_create(&t2, NULL, run, ptr2) == 0); + (void)pthread_join(t1, NULL); + (void)pthread_join(t2, NULL); +#else + run(ptr1); + run(ptr2); +#endif + return (0); +} diff --git a/bin/tests/task_test.c b/bin/tests/task_test.c new file mode 100644 index 0000000000..36de876d98 --- /dev/null +++ b/bin/tests/task_test.c @@ -0,0 +1,71 @@ + +#include +#include +#include + +#include "memcluster.h" +#include "task.h" + +boolean_t +my_callback(generic_event_t event) { + int i; + + printf("my callback, event type %d\n", event->type); + for (i = 0; i < 1000000; i++); + return (FALSE); +} + +boolean_t +my_shutdown(generic_event_t event) { + printf("shutdown\n"); + return (TRUE); +} + +generic_event_t +event_allocate(mem_context_t mctx, event_type_t type, event_action_t action, + size_t size) { + generic_event_t event; + + if (size < sizeof *event) + return (NULL); + event = mem_get(mctx, size); + if (event == NULL) + return (NULL); + event->mctx = mctx; + event->type = type; + event->action = action; + + return (event); +} + +void +main(void) { + mem_context_t mctx = NULL; + task_manager_t manager = NULL; + task_t task = NULL; + generic_event_t event; + + INSIST(mem_context_create(0, 0, &mctx) == 0); + + INSIST(task_manager_create(mctx, 2, 0, &manager) == 2); + INSIST(task_allocate(manager, my_shutdown, 0, &task)); + + event = event_allocate(mctx, 1, my_callback, sizeof *event); + task_send_event(task, event); + event = event_allocate(mctx, 1, my_callback, sizeof *event); + task_send_event(task, event); + event = event_allocate(mctx, 1, my_callback, sizeof *event); + task_send_event(task, event); + event = event_allocate(mctx, 1, my_callback, sizeof *event); + task_send_event(task, event); + + printf("presleep\n"); + sleep(4); + printf("postsleep\n"); + + task_shutdown(task); + task_detach(&task); + task_manager_destroy(&manager); + + mem_stats(mctx, stdout); +} diff --git a/lib/isc/mem.c b/lib/isc/mem.c new file mode 100644 index 0000000000..a0e762cee0 --- /dev/null +++ b/lib/isc/mem.c @@ -0,0 +1,501 @@ +/* + * Copyright (c) 1997, 1998 by Internet Software Consortium. + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SOFTWARE CONSORTIUM DISCLAIMS + * ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES + * OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL INTERNET SOFTWARE + * CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL + * DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR + * PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS + * ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS + * SOFTWARE. + */ + +/* #include "port_before.h" */ + +#include + +#include +#include +#include + +#include "attribute.h" +#include + +#include "thread.h" +#include "mutex.h" +#include "memcluster.h" + +/* #include "port_after.h" */ + +#if !defined(LINT) && !defined(CODECENTER) +static char rcsid[] __attribute__((unused)) = "$Id: mem.c,v 1.1 1998/08/17 22:05:58 halley Exp $"; +#endif /* not lint */ + +/* + * Types. + */ + +typedef struct { + void * next; +} memcluster_element; + +typedef struct { + size_t size; + /* + * This structure must be ALIGNMENT_SIZE bytes. + */ +} *size_info; + +struct stats { + u_long gets; + u_long totalgets; + u_long blocks; + u_long freefrags; +}; + +#ifdef MEMCLUSTER_RANGES +typedef struct range { + u_char * first; + u_char * last; + struct range * next; +} range; +#endif + +struct mem_context { + size_t max_size; + size_t mem_target; + memcluster_element ** freelists; + memcluster_element * basic_blocks; +#ifdef MEMCLUSTER_RANGES + range * ranges; + range * freeranges; +#else + u_char * lowest; + u_char * highest; +#endif + struct stats * stats; + os_mutex_t mutex; +}; + +/* Private Data. */ +static mem_context_t default_context = NULL; + +/* Forward. */ + +static size_t quantize(size_t); + +/* Macros. */ + +#define DEF_MAX_SIZE 1100 +#define DEF_MEM_TARGET 4096 +#define ALIGNMENT_SIZE sizeof (void *) +#define NUM_BASIC_BLOCKS 64 /* must be > 1 */ + +#define LOCK_CONTEXT(ctx) os_mutex_lock(&(ctx)->mutex) +#define UNLOCK_CONTEXT(ctx) os_mutex_unlock(&(ctx)->mutex) + +/* Private Inline-able. */ + +static __inline__ size_t +quantize(size_t size) { + int remainder; + + /* + * If there is no remainder for the integer division of + * + * (rightsize/ALIGNMENT_SIZE) + * + * then we already have a good size; if not, then we need + * to round up the result in order to get a size big + * enough to satisfy the request and be aligned on ALIGNMENT_SIZE + * byte boundaries. + */ + remainder = size % ALIGNMENT_SIZE; + if (remainder != 0) + size += ALIGNMENT_SIZE - remainder; + return (size); +} + +/* Public. */ + +int +mem_context_create(size_t init_max_size, size_t target_size, + mem_context_t *ctxp) { + mem_context_t ctx; + + ctx = malloc(sizeof *ctx); + if (init_max_size == 0) + ctx->max_size = DEF_MAX_SIZE; + else + ctx->max_size = init_max_size; + if (target_size == 0) + ctx->mem_target = DEF_MEM_TARGET; + else + ctx->mem_target = target_size; + ctx->freelists = malloc(ctx->max_size * sizeof (memcluster_element *)); + if (ctx->freelists == NULL) { + free(ctx); + return (-1); + } + memset(ctx->freelists, 0, + ctx->max_size * sizeof (memcluster_element *)); + ctx->stats = malloc((ctx->max_size+1) * sizeof (struct stats)); + if (ctx->stats == NULL) { + free(ctx->freelists); + free(ctx); + return (-1); + } + memset(ctx->stats, 0, (ctx->max_size + 1) * sizeof (struct stats)); + ctx->basic_blocks = NULL; + ctx->lowest = NULL; + ctx->highest = NULL; + os_mutex_init(&ctx->mutex); + *ctxp = ctx; + return (0); +} + +void +mem_context_destroy(mem_context_t *ctxp) { + REQUIRE(ctxp != NULL); + + /* XXX Free Basic Blocks. XXX */ + + *ctxp = NULL; +} + +void * +__mem_get(mem_context_t ctx, size_t size) { + size_t new_size = quantize(size); + void *ret; + + REQUIRE(size > 0); + + LOCK_CONTEXT(ctx); + + if (size >= ctx->max_size || new_size >= ctx->max_size) { + /* memget() was called on something beyond our upper limit. */ + ret = malloc(size); + if (ret != NULL) { + ctx->stats[ctx->max_size].gets++; + ctx->stats[ctx->max_size].totalgets++; + } + goto done; + } + + /* + * If there are no blocks in the free list for this size, get a chunk + * of memory and then break it up into "new_size"-sized blocks, adding + * them to the free list. + */ + if (ctx->freelists[new_size] == NULL) { + int i, frags; + size_t total_size; + void *new; + u_char *curr, *next; + u_char *first; +#ifdef MEMCLUSTER_RANGES + range *r; +#else + u_char *last; +#endif + + if (ctx->basic_blocks == NULL) { + new = malloc(NUM_BASIC_BLOCKS * ctx->mem_target); + if (new == NULL) { + ret = NULL; + goto done; + } + curr = new; + next = curr + ctx->mem_target; + for (i = 0; i < (NUM_BASIC_BLOCKS - 1); i++) { + ((memcluster_element *)curr)->next = next; + curr = next; + next += ctx->mem_target; + } + /* + * curr is now pointing at the last block in the + * array. + */ + ((memcluster_element *)curr)->next = NULL; + first = new; +#ifdef MEMCLUSTER_RANGES + if (ctx->freeranges == NULL) { + int nsize = quantize(sizeof(range)); + new = ((memcluster_element *)new)->next; + curr = first; + next = curr + nsize; + frags = ctx->mem_target / nsize; + for (i = 0; i < (frags - 1); i++) { + ((range *)curr)->next = (range *)next; + curr = next; + next += nsize; + } + /* + * curr is now pointing at the last block in + * the array. + */ + ((range *)curr)->next = NULL; + ctx->freeranges = (range *)first; + } + r = ctx->freeranges; + ctx->freeranges = r->next; + r->first = first; + r->last = r->first + + NUM_BASIC_BLOCKS * ctx->mem_target - 1; + r->next = ctx->ranges; + ctx->ranges = r; +#else + last = first + NUM_BASIC_BLOCKS * ctx->mem_target - 1; + if (first < ctx->lowest || ctx->lowest == NULL) + ctx->lowest = first; + if (last > ctx->highest) + ctx->highest = last; +#endif + ctx->basic_blocks = new; + } + total_size = ctx->mem_target; + new = ctx->basic_blocks; + ctx->basic_blocks = ctx->basic_blocks->next; + frags = total_size / new_size; + ctx->stats[new_size].blocks++; + ctx->stats[new_size].freefrags += frags; + /* Set up a linked-list of blocks of size "new_size". */ + curr = new; + next = curr + new_size; + for (i = 0; i < (frags - 1); i++) { + ((memcluster_element *)curr)->next = next; + curr = next; + next += new_size; + } + /* curr is now pointing at the last block in the array. */ + ((memcluster_element *)curr)->next = NULL; + ctx->freelists[new_size] = new; + } + + /* The free list uses the "rounded-up" size "new_size": */ + ret = ctx->freelists[new_size]; + ctx->freelists[new_size] = ctx->freelists[new_size]->next; + + /* + * The stats[] uses the _actual_ "size" requested by the + * caller, with the caveat (in the code above) that "size" >= the + * max. size (max_size) ends up getting recorded as a call to + * max_size. + */ + ctx->stats[size].gets++; + ctx->stats[size].totalgets++; + ctx->stats[new_size].freefrags--; + + done: + UNLOCK_CONTEXT(ctx); + + return (ret); +} + +/* + * This is a call from an external caller, + * so we want to count this as a user "put". + */ +void +__mem_put(mem_context_t ctx, void *mem, size_t size) { + size_t new_size = quantize(size); + + REQUIRE(size > 0); + + LOCK_CONTEXT(ctx); + + if (size == ctx->max_size || new_size >= ctx->max_size) { + /* memput() called on something beyond our upper limit */ + free(mem); + INSIST(ctx->stats[ctx->max_size].gets != 0); + ctx->stats[ctx->max_size].gets--; + goto done; + } + + /* The free list uses the "rounded-up" size "new_size": */ + ((memcluster_element *)mem)->next = ctx->freelists[new_size]; + ctx->freelists[new_size] = (memcluster_element *)mem; + + /* + * The stats[] uses the _actual_ "size" requested by the + * caller, with the caveat (in the code above) that "size" >= the + * max. size (max_size) ends up getting recorded as a call to + * max_size. + */ + INSIST(ctx->stats[size].gets != 0); + ctx->stats[size].gets--; + ctx->stats[new_size].freefrags++; + + done: + UNLOCK_CONTEXT(ctx); +} + +void * +__mem_get_debug(mem_context_t ctx, size_t size, const char *file, int line) { + void *ptr; + ptr = __mem_get(ctx, size); + fprintf(stderr, "%s:%d: mem_get(%p, %lu) -> %p\n", file, line, + ctx, (u_long)size, ptr); + return (ptr); +} + +void +__mem_put_debug(mem_context_t ctx, void *ptr, size_t size, const char *file, + int line) +{ + fprintf(stderr, "%s:%d: mem_put(%p, %p, %lu)\n", file, line, + ctx, ptr, (u_long)size); + __mem_put(ctx, ptr, size); +} + +/* + * Print the stats[] on the stream "out" with suitable formatting. + */ +void +mem_stats(mem_context_t ctx, FILE *out) { + size_t i; + + LOCK_CONTEXT(ctx); + + if (ctx->freelists == NULL) + return; + for (i = 1; i <= ctx->max_size; i++) { + const struct stats *s = &ctx->stats[i]; + + if (s->totalgets == 0 && s->gets == 0) + continue; + fprintf(out, "%s%5d: %11lu gets, %11lu rem", + (i == ctx->max_size) ? ">=" : " ", + i, s->totalgets, s->gets); + if (s->blocks != 0) + fprintf(out, " (%lu bl, %lu ff)", + s->blocks, s->freefrags); + fputc('\n', out); + } + + UNLOCK_CONTEXT(ctx); +} + +int +mem_valid(mem_context_t ctx, void *ptr) { + u_char *cp = ptr; + int ret; +#ifdef MEMCLUSTER_RANGES + range *r; +#endif + + LOCK_CONTEXT(ctx); + + ret = 0; +#ifdef MEMCLUSTER_RANGES + /* should use a tree for this... */ + for (r = ctx->ranges; r != NULL; r = r->next) { + if (cp >= r->first && cp <= r->last) { + ret = 1; + break; + } + } +#else + if (ctx->lowest != NULL && cp >= ctx->lowest && cp <= ctx->highest) + ret = 1; +#endif + + UNLOCK_CONTEXT(ctx); + + return (ret); +} + +/* + * Replacements for malloc() and free(). + */ + +void * +mem_allocate(mem_context_t ctx, size_t size) { + size_info si; + + size += ALIGNMENT_SIZE; + si = mem_get(ctx, size); + if (si == NULL) + return (NULL); + si->size = size; + return (&si[1]); +} + +void +mem_free(mem_context_t ctx, void *ptr) { + size_info si; + + si = &(((size_info)ptr)[-1]); + mem_put(ctx, si, si->size); +} + +/* + * Public Legacy. + */ + +int +meminit(size_t init_max_size, size_t target_size) { + /* need default_context lock here */ + if (default_context != NULL) + return (-1); + return (mem_context_create(init_max_size, target_size, + &default_context)); +} + +mem_context_t +mem_default_context(void) { + /* need default_context lock here */ + if (default_context == NULL && meminit(0, 0) == -1) + return (NULL); + return (default_context); +} + +void * +__memget(size_t size) { + /* need default_context lock here */ + if (default_context == NULL && meminit(0, 0) == -1) + return (NULL); + return (__mem_get(default_context, size)); +} + +void +__memput(void *mem, size_t size) { + /* need default_context lock here */ + REQUIRE(default_context != NULL); + __mem_put(default_context, mem, size); +} + +void * +__memget_debug(size_t size, const char *file, int line) { + void *ptr; + ptr = __memget(size); + fprintf(stderr, "%s:%d: memget(%lu) -> %p\n", file, line, + (u_long)size, ptr); + return (ptr); +} + +void +__memput_debug(void *ptr, size_t size, const char *file, int line) { + fprintf(stderr, "%s:%d: memput(%p, %lu)\n", file, line, + ptr, (u_long)size); + __memput(ptr, size); +} + +int +memvalid(void *ptr) { + /* need default_context lock here */ + REQUIRE(default_context != NULL); + return (mem_valid(default_context, ptr)); +} + +void +memstats(FILE *out) { + /* need default_context lock here */ + REQUIRE(default_context != NULL); + mem_stats(default_context, out); +} diff --git a/lib/isc/task.c b/lib/isc/task.c new file mode 100644 index 0000000000..f6b8755989 --- /dev/null +++ b/lib/isc/task.c @@ -0,0 +1,610 @@ + +#include + +#include "task.h" +#include "thread.h" + +#define VALID_MANAGER(m) ((m) != NULL && \ + (m)->magic == TASK_MANAGER_MAGIC) +#define VALID_TASK(t) ((t) != NULL && \ + (t)->magic == TASK_MAGIC) + +#define LOCK(lp) os_mutex_lock((lp)) +#define UNLOCK(lp) os_mutex_unlock((lp)) +#define WAIT(cvp, lp) os_condition_wait((cvp), (lp)) +#define BROADCAST(cvp) os_condition_broadcast((cvp)) + +#define DEFAULT_DEFAULT_QUANTUM 5 + +#define FINISHED(m) ((m)->exiting && EMPTY((m)->tasks)) + + +/*** + *** Tasks. + ***/ + +static void +task_free(task_t task) { + task_manager_t manager = task->manager; + + printf("free task\n"); + REQUIRE(EMPTY(task->events)); + + LOCK(&manager->lock); + UNLINK(manager->tasks, task, link); + if (FINISHED(manager)) { + /* + * All tasks have completed and the + * task manager is exiting. Wake up + * any idle worker threads so they + * can exit. + */ + BROADCAST(&manager->work_available); + } + UNLOCK(&manager->lock); + os_mutex_destroy(&task->lock); + task->magic = 0; + mem_put(manager->mctx, task, sizeof *task); +} + +boolean_t +task_allocate(task_manager_t manager, event_action_t shutdown_action, + u_int32_t quantum, task_t *taskp) { + task_t task; + + REQUIRE(VALID_MANAGER(manager)); + REQUIRE(taskp != NULL && *taskp == NULL); + + task = mem_get(manager->mctx, sizeof *task); + if (task == NULL) + return (FALSE); + + task->magic = TASK_MAGIC; + task->manager = manager; + os_mutex_init(&task->lock); + task->state = task_state_idle; + task->references = 1; + INIT_LIST(task->events); + task->quantum = quantum; + task->shutdown_pending = FALSE; + task->shutdown_action = shutdown_action; + INIT_LINK(task, link); + INIT_LINK(task, ready_link); + + LOCK(&manager->lock); + if (task->quantum == 0) + task->quantum = manager->default_quantum; + APPEND(manager->tasks, task, link); + UNLOCK(&manager->lock); + + *taskp = task; + + return (TRUE); +} + +boolean_t +task_attach(task_t task, task_t *taskp) { + + REQUIRE(VALID_TASK(task)); + REQUIRE(taskp != NULL && *taskp == NULL); + + LOCK(&task->lock); + task->references++; + UNLOCK(&task->lock); + + *taskp = task; + + return (TRUE); +} + +boolean_t +task_detach(task_t *taskp) { + boolean_t free_task = FALSE; + task_manager_t manager; + task_t task; + + printf("task_detach\n"); + + REQUIRE(taskp != NULL); + task = *taskp; + REQUIRE(VALID_TASK(task)); + + LOCK(&task->lock); + REQUIRE(task->references > 0); + task->references--; + if (task->state == task_state_zombie && + task->references == 0) { + manager = task->manager; + INSIST(VALID_MANAGER(manager)); + free_task = TRUE; + } + UNLOCK(&task->lock); + + if (free_task) + task_free(task); + + *taskp = NULL; + + return (TRUE); +} + +boolean_t +task_send_event(task_t task, generic_event_t event) { + boolean_t was_idle = FALSE; + boolean_t discard = FALSE; + + REQUIRE(VALID_TASK(task)); + REQUIRE(event != NULL); + + printf("sending\n"); + /* + * We're trying hard to hold locks for as short a time as possible. + * We're also trying to hold as few locks as possible. This is why + * some processing is deferred until after a lock is released. + */ + LOCK(&task->lock); + if (task->state != task_state_zombie && !task->shutdown_pending) { + if (task->state == task_state_idle) { + was_idle = TRUE; + INSIST(EMPTY(task->events)); + task->state = task_state_ready; + } + INSIST(task->state == task_state_ready || + task->state == task_state_running); + ENQUEUE(task->events, event, link); + } else + discard = TRUE; + UNLOCK(&task->lock); + + if (discard) { + mem_put(event->mctx, event, sizeof *event); + return (TRUE); + } + + if (was_idle) { + boolean_t need_wakeup = FALSE; + task_manager_t manager; + + /* + * We need to add this task to the ready queue. + * + * We've waited until now to do it, rather than doing it + * while holding the task lock, because we don't want to + * block while holding the task lock. + * + * We've changed the state to ready, so no one else will + * be trying to add this task to the ready queue. It + * thus doesn't matter if more events have been added to + * the queue after we gave up the task lock. + * + * Shutting down a task requires posting a shutdown event + * to the task's queue and then executing it, so there's + * no way the task can disappear. A task is always on the + * task manager's 'tasks' list, so the task manager can + * always post a shutdown event to all tasks if it is + * requested to shutdown. + */ + manager = task->manager; + INSIST(VALID_MANAGER(manager)); + LOCK(&manager->lock); + if (EMPTY(manager->ready_tasks)) + need_wakeup = TRUE; + ENQUEUE(manager->ready_tasks, task, ready_link); + UNLOCK(&manager->lock); + + /* + * If the runnable queue is empty, the worker threads could + * either be executing tasks or waiting for something to do. + * We wakeup anyone who is sleeping. + */ + if (need_wakeup) + BROADCAST(&manager->work_available); + } + + printf("sent\n"); + return (TRUE); +} + +boolean_t +task_shutdown(task_t task) { + boolean_t was_idle = FALSE; + boolean_t zombie = FALSE; + + REQUIRE(VALID_TASK(task)); + + /* + * This routine is very similar to task_send_event() above. + */ + + LOCK(&task->lock); + if (task->state != task_state_zombie) { + if (task->state == task_state_idle) { + was_idle = TRUE; + INSIST(EMPTY(task->events)); + task->state = task_state_ready; + } + INSIST(task->state == task_state_ready || + task->state == task_state_running); + task->shutdown_pending = TRUE; + } else + zombie = TRUE; + UNLOCK(&task->lock); + + if (zombie) + return (TRUE); + + if (was_idle) { + boolean_t need_wakeup = FALSE; + task_manager_t manager; + + manager = task->manager; + INSIST(VALID_MANAGER(manager)); + LOCK(&manager->lock); + if (EMPTY(manager->ready_tasks)) + need_wakeup = TRUE; + ENQUEUE(manager->ready_tasks, task, ready_link); + UNLOCK(&manager->lock); + + if (need_wakeup) + BROADCAST(&manager->work_available); + } + + return (TRUE); +} + + +/*** + *** Task Manager. + ***/ + +static +void *task_manager_run(void *uap) { + task_manager_t manager = uap; + task_t task; + boolean_t no_workers = FALSE; + int spin = 0; + + printf("start %p\n", pthread_self()); + + REQUIRE(VALID_MANAGER(manager)); + + /* + * Again we're trying to hold the lock for as short a time as possible + * and to do as little locking and unlocking as possible. + * + * In both while loops, the appropriate lock must be held before the + * while body starts. Code which acquired the lock at the top of + * the loop would be more readable, but would result in a lot of + * extra locking. Compare: + * + * Straightforward: + * + * LOCK(); + * ... + * UNLOCK(); + * while (expression) { + * LOCK(); + * ... + * UNLOCK(); + * + * Unlocked part here... + * + * LOCK(); + * ... + * UNLOCK(); + * } + * + * Note how if the loop continues we unlock and then immediately lock. + * For N iterations of the loop, this code does 2N+1 locks and 2N+1 + * unlocks. Also note that the lock is not held when the while + * condition is tested, which may or may not be important, depending + * on the expression. + * + * As written: + * + * LOCK(); + * while (expression) { + * ... + * UNLOCK(); + * + * Unlocked part here... + * + * LOCK(); + * ... + * } + * UNLOCK(); + * + * For N iterations of the loop, this code does N+1 locks and N+1 + * unlocks. The while expression is always protected by the lock. + */ + + LOCK(&manager->lock); + while (!FINISHED(manager)) { + /* + * For reasons similar to those given in the comment in + * task_send_event() above, it is safe for us to dequeue + * the task while only holding the manager lock, and then + * change the task to running state while only holding the + * task lock. + */ + while (EMPTY(manager->ready_tasks) && !FINISHED(manager)) { + printf("wait %p\n", pthread_self()); + WAIT(&manager->work_available, &manager->lock); + printf("awake %p\n", pthread_self()); + } + printf("working %p\n", pthread_self()); + + task = HEAD(manager->ready_tasks); + if (task != NULL) { + u_int32_t dispatch_count = 0; + boolean_t done = FALSE; + boolean_t requeue = FALSE; + boolean_t wants_shutdown; + boolean_t free_task = FALSE; + generic_event_t event; + event_action_t action; + event_list_t remaining_events; + boolean_t discard_remaining = FALSE; + + INSIST(VALID_TASK(task)); + + /* + * Note we only unlock the manager lock if we actually + * have a task to do. We must reacquire the manager + * lock before exiting the 'if (task != NULL)' block. + */ + DEQUEUE(manager->ready_tasks, task, ready_link); + UNLOCK(&manager->lock); + + LOCK(&task->lock); + task->state = task_state_running; + while (!done) { + INSIST(task->shutdown_pending || + !EMPTY(task->events)); + if (task->shutdown_pending && + EMPTY(task->events)) { + event = NULL; + action = task->shutdown_action; + } else { + event = HEAD(task->events); + action = event->action; + DEQUEUE(task->events, event, link); + } + UNLOCK(&task->lock); + + printf("dispatch %p\n", pthread_self()); + /* + * Execute the event action. + */ + if (action != NULL) + wants_shutdown = (*action)(event); + else + wants_shutdown = FALSE; + + /* + * If this wasn't a shutdown event, we + * need to free it. + */ + if (event != NULL) + mem_put(event->mctx, event, + sizeof *event); + else + wants_shutdown = TRUE; + + LOCK(&task->lock); + if (wants_shutdown) { + printf("wants shutdown\n"); + if (!EMPTY(task->events)) { + remaining_events = + task->events; + INIT_LIST(task->events); + discard_remaining = TRUE; + } + if (task->references == 0) + free_task = TRUE; + task->state = task_state_zombie; + done = TRUE; + } else if (EMPTY(task->events) && + !task->shutdown_pending) { + task->state = task_state_idle; + done = TRUE; + } else if (dispatch_count >= task->quantum) { + /* + * Our quantum has expired, but + * there is more work to be done. + * We'll requeue it to the ready + * queue later. + * + * We don't check quantum until + * dispatching at least one event, + * so the minimum quantum is one. + */ + task->state = task_state_ready; + requeue = TRUE; + done = TRUE; + } + } + UNLOCK(&task->lock); + + if (discard_remaining) { + generic_event_t next_event; + + for (event = HEAD(remaining_events); + event != NULL; + event = next_event) { + next_event = NEXT(event, link); + mem_put(event->mctx, event, + sizeof *event); + } + } + + if (free_task) + task_free(task); + + LOCK(&manager->lock); + if (requeue) { + /* + * We know we're awake, so we don't have + * to wakeup any sleeping threads if the + * ready queue is empty before we requeue. + * + * A possible optimization if the queue is + * empty is to 'goto' the 'if (task != NULL)' + * block, avoiding the ENQUEUE of the task + * and the subsequent immediate DEQUEUE + * (since it is the only executable task). + * We don't do this because then we'd be + * skipping the exit_requested check. The + * cost of ENQUEUE is low anyway, especially + * when you consider that we'd have to do + * an extra EMPTY check to see if we could + * do the optimization. If the ready queue + * were usually nonempty, the 'optimization' + * might even hurt rather than help. + */ + ENQUEUE(manager->ready_tasks, task, + ready_link); + } + } + } + INSIST(manager->workers > 0); + manager->workers--; + if (manager->workers == 0) + no_workers = TRUE; + UNLOCK(&manager->lock); + + if (no_workers) + BROADCAST(&manager->no_workers); + + printf("exit %p\n", pthread_self()); + + return (NULL); +} + +static void +manager_free(task_manager_t manager) { + os_condition_destroy(&manager->work_available); + os_condition_destroy(&manager->no_workers); + os_mutex_destroy(&manager->lock); + manager->magic = 0; + mem_put(manager->mctx, manager, sizeof *manager); +} + +u_int32_t +task_manager_create(mem_context_t mctx, int workers, int default_quantum, + task_manager_t *managerp) { + int i; + u_int32_t started = 0; + task_manager_t manager; + os_thread_t thread; + + manager = mem_get(mctx, sizeof *manager); + if (manager == NULL) + return (0); + manager->magic = TASK_MANAGER_MAGIC; + manager->mctx = mctx; + os_mutex_init(&manager->lock); + if (default_quantum == 0) + default_quantum = DEFAULT_DEFAULT_QUANTUM; + manager->default_quantum = default_quantum; + INIT_LIST(manager->tasks); + INIT_LIST(manager->ready_tasks); + os_condition_init(&manager->work_available); + manager->exiting = FALSE; + manager->workers = 0; + os_condition_init(&manager->no_workers); + + LOCK(&manager->lock); + /* + * Start workers. + */ + for (i = 0; i < workers; i++) { + if (os_thread_create(task_manager_run, manager, &thread)) { + manager->workers++; + started++; + os_thread_detach(thread); + } + } + UNLOCK(&manager->lock); + + if (started == 0) { + manager_free(manager); + return (0); + } + + *managerp = manager; + + return (started); +} + +boolean_t +task_manager_destroy(task_manager_t *managerp) { + task_manager_t manager; + task_t task; + + REQUIRE(managerp != NULL); + manager = *managerp; + REQUIRE(VALID_MANAGER(manager)); + + printf("task_manager_destroy %p\n", pthread_self()); + /* + * Only one non-worker thread may ever call this routine. + * If a worker thread wants to initiate shutdown of the + * task manager, it should ask some non-worker thread to call + * task_manager_destroy(), e.g. by signalling a condition variable + * that the startup thread is sleeping on. + */ + + /* + * Unlike elsewhere, we're going to hold this lock a long time. + * We need to do so, because otherwise the list of tasks could + * change while we were traversing it. + * + * This is also the only function where we will hold both the + * task manager lock and a task lock at the same time. + */ + + LOCK(&manager->lock); + + /* + * Make sure we only get called once. + */ + INSIST(!manager->exiting); + manager->exiting = TRUE; + + /* + * Post a shutdown event to every task. + */ + for (task = HEAD(manager->tasks); + task != NULL; + task = NEXT(task, link)) { + LOCK(&task->lock); + task->shutdown_pending = TRUE; + if (task->state == task_state_idle) { + task->state = task_state_ready; + ENQUEUE(manager->ready_tasks, task, ready_link); + } + UNLOCK(&task->lock); + } + + /* + * Wake up any sleeping workers. This ensures we get work done if + * there's work left to do, and if there are already no tasks left + * it will cause the workers to see manager->exiting. + */ + BROADCAST(&manager->work_available); + + /* + * Wait for all the worker threads to exit. + */ + while (manager->workers > 0) + WAIT(&manager->no_workers, &manager->lock); + + UNLOCK(&manager->lock); + + manager_free(manager); + + *managerp = NULL; + + return (TRUE); +}