2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-08-31 22:45:39 +00:00

Refactor isc_job_run to not-make any allocations

Change the isc_job_run() to not-make any allocations.  The caller must
make sure that it allocates isc_job_t - usually as part of the argument
passed to the callback.

For simple jobs, using isc_async_run() is advised as it allocates its
own separate isc_job_t.
This commit is contained in:
Ondřej Surý
2023-03-27 22:40:57 +02:00
parent 639d5065a3
commit 1844590ad9
25 changed files with 294 additions and 282 deletions

View File

@@ -25,6 +25,7 @@
#include <sys/types.h>
#include <unistd.h>
#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/base64.h>
#include <isc/buffer.h>
@@ -2168,7 +2169,8 @@ run_server(void *arg) {
ns_client_request, ifp, accept_cb, ifp, 10,
NULL, NULL, &ifp->tcplistensocket));
ifp->flags |= NS_INTERFACEFLAG_LISTENING;
isc_job_run(loopmgr, sendquery, ifp->tcplistensocket);
isc_async_run(isc_loop_current(loopmgr), sendquery,
ifp->tcplistensocket);
return;

View File

@@ -16,11 +16,11 @@
#include <stdlib.h>
#include <unistd.h>
#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/buffer.h>
#include <isc/commandline.h>
#include <isc/condition.h>
#include <isc/job.h>
#include <isc/loop.h>
#include <isc/netaddr.h>
#include <isc/parseint.h>
@@ -889,12 +889,13 @@ start_next_command(void);
static void
process_next_command(void *arg __attribute__((__unused__))) {
isc_loop_t *loop = isc_loop_main(loopmgr);
if (cmdline == NULL) {
in_use = false;
} else {
do_next_command(cmdline);
if (ISC_LIST_HEAD(lookup_list) != NULL) {
isc_job_run(loopmgr, run_loop, NULL);
isc_async_run(loop, run_loop, NULL);
return;
}
}

View File

@@ -33,6 +33,7 @@
#include <time.h>
#include <unistd.h>
#include <isc/async.h>
#include <isc/atomic.h>
#include <isc/attributes.h>
#include <isc/base32.h>
@@ -41,7 +42,6 @@
#include <isc/file.h>
#include <isc/hash.h>
#include <isc/hex.h>
#include <isc/job.h>
#include <isc/loop.h>
#include <isc/managers.h>
#include <isc/md.h>
@@ -1667,7 +1667,7 @@ assignwork(void *arg) {
lock_and_dumpnode(dns_fixedname_name(&fname), node);
dns_db_detachnode(gdb, &node);
isc_job_run(loopmgr, assignwork, NULL);
isc_async_run(isc_loop_current(loopmgr), assignwork, NULL);
}
/*%

View File

@@ -21,6 +21,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <isc/async.h>
#include <isc/attributes.h>
#include <isc/base64.h>
#include <isc/buffer.h>
@@ -28,7 +29,6 @@
#include <isc/file.h>
#include <isc/getaddresses.h>
#include <isc/hash.h>
#include <isc/job.h>
#include <isc/lex.h>
#include <isc/log.h>
#include <isc/loop.h>
@@ -2424,7 +2424,7 @@ static void
done_update(void) {
ddebug("done_update()");
isc_job_run(loopmgr, getinput, NULL);
isc_async_run(isc_loop_main(loopmgr), getinput, NULL);
}
static void

View File

@@ -74,7 +74,7 @@
struct dns_validator {
unsigned int magic;
dns_view_t *view;
isc_loopmgr_t *loopmgr;
isc_loop_t *loop;
uint32_t tid;
isc_refcount_t references;
@@ -160,7 +160,7 @@ isc_result_t
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
dns_message_t *message, unsigned int options,
isc_loopmgr_t *loop, isc_job_cb cb, void *arg,
isc_loop_t *loop, isc_job_cb cb, void *arg,
dns_validator_t **validatorp);
/*%<
* Start a DNSSEC validation.

View File

@@ -977,7 +977,7 @@ valcreate(fetchctx_t *fctx, dns_message_t *message, dns_adbaddrinfo_t *addrinfo,
result = dns_validator_create(
fctx->res->view, name, type, rdataset, sigrdataset, message,
valoptions, fctx->res->loopmgr, validated, valarg, &validator);
valoptions, fctx->loop, validated, valarg, &validator);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
inc_stats(fctx->res, dns_resstatscounter_val);
if ((valoptions & DNS_VALIDATOR_DEFER) == 0) {
@@ -10420,7 +10420,7 @@ dns_resolver_createfetch(dns_resolver_t *res, const dns_name_t *name,
if (new_fctx) {
fetchctx_ref(fctx);
isc_job_run(res->loopmgr, (isc_job_cb)fctx_start, fctx);
isc_async_run(fctx->loop, (isc_job_cb)fctx_start, fctx);
}
unlock:

View File

@@ -227,7 +227,7 @@ validator_done(dns_validator_t *val, isc_result_t result) {
val->result = result;
dns_validator_ref(val);
isc_job_run(val->loopmgr, validator_done_cb, val);
isc_async_run(val->loop, validator_done_cb, val);
}
/*%
@@ -950,8 +950,8 @@ create_fetch(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
dns_validator_ref(val);
return (dns_resolver_createfetch(
val->view->resolver, name, type, NULL, NULL, NULL, NULL, 0,
fopts, 0, NULL, isc_loop_current(val->loopmgr), callback, val,
&val->frdataset, &val->fsigrdataset, &val->fetch));
fopts, 0, NULL, val->loop, callback, val, &val->frdataset,
&val->fsigrdataset, &val->fetch));
}
/*%
@@ -981,7 +981,7 @@ create_validator(dns_validator_t *val, dns_name_t *name, dns_rdatatype_t type,
validator_logcreate(val, name, type, caller, "validator");
result = dns_validator_create(val->view, name, type, rdataset, sig,
NULL, vopts, val->loopmgr, cb, val,
NULL, vopts, val->loop, cb, val,
&val->subvalidator);
if (result == ISC_R_SUCCESS) {
dns_validator_attach(val, &val->subvalidator->parent);
@@ -2978,7 +2978,7 @@ isc_result_t
dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
dns_rdataset_t *rdataset, dns_rdataset_t *sigrdataset,
dns_message_t *message, unsigned int options,
isc_loopmgr_t *loopmgr, isc_job_cb cb, void *arg,
isc_loop_t *loop, isc_job_cb cb, void *arg,
dns_validator_t **validatorp) {
isc_result_t result = ISC_R_FAILURE;
dns_validator_t *val = NULL;
@@ -2997,7 +2997,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
.type = type,
.options = options,
.link = ISC_LINK_INITIALIZER,
.loopmgr = loopmgr,
.loop = loop,
.cb = cb,
.arg = arg };
@@ -3024,7 +3024,7 @@ dns_validator_create(dns_view_t *view, dns_name_t *name, dns_rdatatype_t type,
if ((options & DNS_VALIDATOR_DEFER) == 0) {
dns_validator_ref(val);
isc_job_run(val->loopmgr, validator_start, val);
isc_async_run(val->loop, validator_start, val);
}
*validatorp = val;
@@ -3050,7 +3050,7 @@ dns_validator_send(dns_validator_t *validator) {
validator->options &= ~DNS_VALIDATOR_DEFER;
dns_validator_ref(validator);
isc_job_run(validator->loopmgr, validator_start, validator);
isc_async_run(validator->loop, validator_start, validator);
}
void

View File

@@ -115,6 +115,7 @@ libisc_la_SOURCES = \
ascii.c \
assertions.c \
async.c \
async_p.h \
backtrace.c \
base32.c \
base64.c \

View File

@@ -34,28 +34,62 @@
#include <isc/uv.h>
#include <isc/work.h>
#include "async_p.h"
#include "job_p.h"
#include "loop_p.h"
void
isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
int r;
isc_job_t *job = NULL;
REQUIRE(VALID_LOOP(loop));
REQUIRE(cb != NULL);
job = isc__job_new(loop, cb, cbarg);
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){
.link = ISC_LINK_INITIALIZER,
.cb = cb,
.cbarg = cbarg,
};
/*
* Now send the half-initialized job to the loop queue.
*
* The ISC_ASTACK_PUSH is counterintuitive here, but uv_idle
* drains its queue backwards, so if there's more than one event
* to be processed then they need to be in reverse order.
*/
ISC_ASTACK_PUSH(loop->queue_jobs, job, link);
ISC_ASTACK_PUSH(loop->async_jobs, job, link);
r = uv_async_send(&loop->queue_trigger);
int r = uv_async_send(&loop->async_trigger);
UV_RUNTIME_CHECK(uv_async_send, r);
}
void
isc__async_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
REQUIRE(VALID_LOOP(loop));
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->async_jobs);
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
isc_job_t *job = ISC_STACK_POP(drain, link);
isc_job_t *next = NULL;
while (job != NULL) {
ISC_LIST_PREPEND(jobs, job, link);
job = ISC_STACK_POP(drain, link);
}
for (job = ISC_LIST_HEAD(jobs),
next = (job ? ISC_LIST_NEXT(job, link) : NULL);
job != NULL;
job = next, next = (job ? ISC_LIST_NEXT(job, link) : NULL))
{
job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job));
}
}
void
isc__async_close(uv_handle_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
isc__async_cb(&loop->async_trigger);
}

29
lib/isc/async_p.h Normal file
View File

@@ -0,0 +1,29 @@
/*
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
*
* SPDX-License-Identifier: MPL-2.0
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
*
* See the COPYRIGHT file distributed with this work for additional
* information regarding copyright ownership.
*/
#pragma once
#include <isc/async.h>
#include <isc/job.h>
#include <isc/loop.h>
#include <isc/mem.h>
#include <isc/stack.h>
#include <isc/uv.h>
typedef ISC_ASTACK(isc_job_t) isc_asyncstack_t;
void
isc__async_cb(uv_async_t *handle);
void
isc__async_close(uv_handle_t *handle);

View File

@@ -15,7 +15,7 @@
* \brief The isc_async unit provides a way to schedule jobs on any isc
* event loop (isc_loop unit)
*
* The unit is built around the uv_async_t primitive and locked list with
* The unit is built around the uv_async_t primitive and lock-free stack with
* isc_job_cb. Jobs are first scheduled onto the locked list, then the
* uv_async_send() is called and the uv_async_t callback processes the enqueued
* jobs are scheduled to be run on the isc event loop.

View File

@@ -30,23 +30,31 @@
#include <isc/types.h>
typedef void (*isc_job_cb)(void *);
typedef struct isc_job isc_job_t;
struct isc_job {
isc_job_cb cb;
void *cbarg;
ISC_LINK(isc_job_t) link;
};
#define ISC_JOB_INITIALIZER \
{ \
.link = ISC_LINK_INITIALIZER \
}
ISC_LANG_BEGINDECLS
void
isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg);
isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg);
/*%<
* Schedule the job callback 'cb' to be run on the currently
* running event loop.
*
* Note: Because of the design of uv_idle_start(), if more than one
* job is posted at once, the jobs will be pushed onto a stack and
* executed in last-in-first-out order. To post events that are
* executed in order posted, use isc_async_run() instead.
*
* Requires:
*
*\li 'loopmgr' is the active loop manager.
*\li 'loop' is the current loop
*\li 'job' is initialized
*\li 'cb' is a callback function, must be non-NULL
*\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
*/

View File

@@ -15,6 +15,7 @@
#include <inttypes.h>
#include <isc/job.h>
#include <isc/lang.h>
#include <isc/mem.h>
#include <isc/refcount.h>

View File

@@ -48,7 +48,6 @@ typedef struct isc_httpdurl isc_httpdurl_t; /*%< HTTP URL */
typedef void(isc_httpdondestroy_t)(void *); /*%< Callback on destroying httpd */
typedef struct isc_interface isc_interface_t; /*%< Interface */
typedef struct isc_interfaceiter isc_interfaceiter_t; /*%< Interface Iterator */
typedef struct isc_job isc_job_t;
typedef struct isc_lex isc_lex_t; /*%< Lex */
typedef struct isc_log isc_log_t; /*%< Log */
typedef struct isc_logcategory isc_logcategory_t; /*%< Log Category */

View File

@@ -36,100 +36,51 @@
#include "job_p.h"
#include "loop_p.h"
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
/*
* Private: static
*/
static void
isc__job_close_cb(uv_handle_t *handle) {
isc_job_t *job = uv_handle_get_data(handle);
isc_loop_t *loop = job->loop;
REQUIRE(loop == isc_loop_current(job->loop->loopmgr));
isc_mem_put(loop->mctx, job, sizeof(*job));
isc_loop_detach(&loop);
}
static void
isc__job_destroy(isc_job_t *job) {
REQUIRE(VALID_JOB(job));
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
job->magic = 0;
uv_close(&job->idle, isc__job_close_cb);
}
static void
isc__job_cb(uv_idle_t *idle) {
isc_job_t *job = uv_handle_get_data(idle);
int r;
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
job->cb(job->cbarg);
r = uv_idle_stop(idle);
UV_RUNTIME_CHECK(uv_idle_stop, r);
isc__job_destroy(job);
}
/*
* Public: #include <isc/job.h>
*/
void
isc_job_run(isc_loopmgr_t *loopmgr, isc_job_cb cb, void *cbarg) {
isc_loop_t *loop = isc_loop_current(loopmgr);
isc_job_t *job = isc__job_new(loop, cb, cbarg);
isc__job_init(loop, job);
isc__job_run(job);
isc_job_run(isc_loop_t *loop, isc_job_t *job, isc_job_cb cb, void *cbarg) {
if (ISC_LIST_EMPTY(loop->run_jobs)) {
uv_idle_start(&loop->run_trigger, isc__job_cb);
}
job->cb = cb;
job->cbarg = cbarg;
ISC_LIST_APPEND(loop->run_jobs, job, link);
}
/*
* Protected: #include <job_p.h>
*/
isc_job_t *
isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
isc_job_t *job = NULL;
void
isc__job_cb(uv_idle_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
ISC_LIST(isc_job_t) jobs = ISC_LIST_INITIALIZER;
REQUIRE(VALID_LOOP(loop));
REQUIRE(cb != NULL);
ISC_LIST_MOVE(jobs, loop->run_jobs);
job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){
.magic = JOB_MAGIC,
.cb = cb,
.cbarg = cbarg,
.link = ISC_LINK_INITIALIZER,
};
isc_job_t *job, *next;
for (job = ISC_LIST_HEAD(jobs),
next = (job != NULL) ? ISC_LIST_NEXT(job, link) : NULL;
job != NULL;
job = next, next = job ? ISC_LIST_NEXT(job, link) : NULL)
{
ISC_LIST_UNLINK(jobs, job, link);
job->cb(job->cbarg);
}
isc_loop_attach(loop, &job->loop);
return (job);
if (ISC_LIST_EMPTY(loop->run_jobs)) {
uv_idle_stop(&loop->run_trigger);
}
}
void
isc__job_init(isc_loop_t *loop, isc_job_t *job) {
int r = uv_idle_init(&loop->loop, &job->idle);
UV_RUNTIME_CHECK(uv_idle_init, r);
uv_handle_set_data(&job->idle, job);
}
void
isc__job_run(isc_job_t *job) {
int r;
REQUIRE(VALID_JOB(job));
REQUIRE(job->loop == isc_loop_current(job->loop->loopmgr));
r = uv_idle_start(&job->idle, isc__job_cb);
UV_RUNTIME_CHECK(uv_idle_start, r);
isc__job_close(uv_handle_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
isc__job_cb(&loop->run_trigger);
}

View File

@@ -15,12 +15,12 @@
#include <isc/job.h>
#include <isc/loop.h>
#include <isc/uv.h>
isc_job_t *
isc__job_new(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
typedef ISC_LIST(isc_job_t) isc_joblist_t;
void
isc__job_init(isc_loop_t *loop, isc_job_t *job);
isc__job_cb(uv_idle_t *handle);
void
isc__job_run(isc_job_t *job);
isc__job_close(uv_handle_t *handle);

View File

@@ -38,6 +38,7 @@
#include <isc/uv.h>
#include <isc/work.h>
#include "async_p.h"
#include "job_p.h"
#include "loop_p.h"
@@ -140,8 +141,10 @@ static void
destroy_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
/* Again, the first close callback here is called last */
uv_close(&loop->async_trigger, isc__async_close);
uv_close(&loop->run_trigger, isc__job_close);
uv_close(&loop->destroy_trigger, NULL);
uv_close(&loop->queue_trigger, NULL);
uv_close(&loop->pause_trigger, NULL);
uv_close(&loop->wakeup_trigger, NULL);
uv_close(&loop->quiescent, NULL);
@@ -175,29 +178,14 @@ shutdown_cb(uv_async_t *handle) {
isc_job_t *prev = ISC_LIST_PREV(job, link);
ISC_LIST_UNLINK(loop->teardown_jobs, job, link);
isc__job_run(job);
job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job));
job = prev;
}
}
static void
queue_cb(uv_async_t *handle) {
isc_loop_t *loop = uv_handle_get_data(handle);
REQUIRE(VALID_LOOP(loop));
ISC_STACK(isc_job_t) drain = ISC_ASTACK_TO_STACK(loop->queue_jobs);
isc_job_t *job = ISC_STACK_POP(drain, link);
while (job != NULL) {
isc__job_init(loop, job);
isc__job_run(job);
job = ISC_STACK_POP(drain, link);
}
}
static void
wakeup_cb(uv_async_t *handle) {
/* we only woke up to make the loop take a spin */
@@ -209,7 +197,8 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
*loop = (isc_loop_t){
.tid = tid,
.loopmgr = loopmgr,
.queue_jobs = ISC_ASTACK_INITIALIZER,
.async_jobs = ISC_ASTACK_INITIALIZER,
.run_jobs = ISC_LIST_INITIALIZER,
.setup_jobs = ISC_LIST_INITIALIZER,
.teardown_jobs = ISC_LIST_INITIALIZER,
};
@@ -225,9 +214,13 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
UV_RUNTIME_CHECK(uv_async_init, r);
uv_handle_set_data(&loop->shutdown_trigger, loop);
r = uv_async_init(&loop->loop, &loop->queue_trigger, queue_cb);
r = uv_async_init(&loop->loop, &loop->async_trigger, isc__async_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
uv_handle_set_data(&loop->queue_trigger, loop);
uv_handle_set_data(&loop->async_trigger, loop);
r = uv_idle_init(&loop->loop, &loop->run_trigger);
UV_RUNTIME_CHECK(uv_idle_init, r);
uv_handle_set_data(&loop->run_trigger, loop);
r = uv_async_init(&loop->loop, &loop->destroy_trigger, destroy_cb);
UV_RUNTIME_CHECK(uv_async_init, r);
@@ -251,25 +244,31 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
}
static void
loop_run(isc_loop_t *loop) {
int r;
isc_job_t *job;
setup_jobs_cb(void *arg) {
isc_loop_t *loop = arg;
isc_job_t *job = ISC_LIST_HEAD(loop->setup_jobs);
job = ISC_LIST_HEAD(loop->setup_jobs);
while (job != NULL) {
isc_job_t *next = ISC_LIST_NEXT(job, link);
ISC_LIST_UNLINK(loop->setup_jobs, job, link);
isc__job_run(job);
job->cb(job->cbarg);
isc_mem_put(loop->mctx, job, sizeof(*job));
job = next;
}
}
r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
static void
loop_run(isc_loop_t *loop) {
int r = uv_prepare_start(&loop->quiescent, isc__qsbr_quiescent_cb);
UV_RUNTIME_CHECK(uv_prepare_start, r);
isc_barrier_wait(&loop->loopmgr->starting);
isc_async_run(loop, setup_jobs_cb, loop);
r = uv_run(&loop->loop, UV_RUN_DEFAULT);
UV_RUNTIME_CHECK(uv_run, r);
@@ -281,7 +280,8 @@ loop_close(isc_loop_t *loop) {
int r = uv_loop_close(&loop->loop);
UV_RUNTIME_CHECK(uv_loop_close, r);
INSIST(ISC_ASTACK_EMPTY(loop->queue_jobs));
INSIST(ISC_ASTACK_EMPTY(loop->async_jobs));
INSIST(ISC_LIST_EMPTY(loop->run_jobs));
loop->magic = 0;
@@ -382,53 +382,41 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
isc_job_t *
isc_loop_setup(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
isc_loopmgr_t *loopmgr = NULL;
isc_job_t *job = NULL;
REQUIRE(VALID_LOOP(loop));
REQUIRE(cb != NULL);
loopmgr = loop->loopmgr;
isc_loopmgr_t *loopmgr = loop->loopmgr;
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){
.cb = cb,
.cbarg = cbarg,
.link = ISC_LINK_INITIALIZER,
};
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
job = isc__job_new(loop, cb, cbarg);
isc__job_init(loop, job);
/*
* The ISC_LIST_PREPEND is counterintuitive here, but actually, the
* uv_idle_start() puts the item on the HEAD of the internal list, so we
* want to store items here in reverse order, so on the uv_loop, they
* are scheduled in the correct order
*/
ISC_LIST_PREPEND(loop->setup_jobs, job, link);
ISC_LIST_APPEND(loop->setup_jobs, job, link);
return (job);
}
isc_job_t *
isc_loop_teardown(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
isc_loopmgr_t *loopmgr = NULL;
isc_job_t *job = NULL;
REQUIRE(VALID_LOOP(loop));
loopmgr = loop->loopmgr;
isc_loopmgr_t *loopmgr = loop->loopmgr;
isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
*job = (isc_job_t){
.cb = cb,
.cbarg = cbarg,
.link = ISC_LINK_INITIALIZER,
};
REQUIRE(loop->tid == isc_tid() || !atomic_load(&loopmgr->running) ||
atomic_load(&loopmgr->paused));
job = isc__job_new(loop, cb, cbarg);
isc__job_init(loop, job);
/*
* The ISC_LIST_PREPEND is counterintuitive here, but actually, the
* uv_idle_start() puts the item on the HEAD of the internal list, so we
* want to store items here in reverse order, so on the uv_loop, they
* are scheduled in the correct order
*/
ISC_LIST_PREPEND(loop->teardown_jobs, job, link);
ISC_LIST_APPEND(loop->teardown_jobs, job, link);
return (job);
}

View File

@@ -16,6 +16,7 @@
#include <inttypes.h>
#include <isc/barrier.h>
#include <isc/job.h>
#include <isc/lang.h>
#include <isc/loop.h>
#include <isc/magic.h>
@@ -30,15 +31,15 @@
#include <isc/uv.h>
#include <isc/work.h>
#include "async_p.h"
#include "job_p.h"
/*
* Per-thread loop
*/
#define LOOP_MAGIC ISC_MAGIC('L', 'O', 'O', 'P')
#define VALID_LOOP(t) ISC_MAGIC_VALID(t, LOOP_MAGIC)
typedef ISC_LIST(isc_job_t) isc_joblist_t;
typedef ISC_ASTACK(isc_job_t) isc_jobstack_t;
struct isc_loop {
int magic;
isc_refcount_t references;
@@ -56,8 +57,12 @@ struct isc_loop {
bool shuttingdown;
/* Async queue */
uv_async_t queue_trigger;
isc_jobstack_t queue_jobs;
uv_async_t async_trigger;
isc_asyncstack_t async_jobs;
/* Jobs queue */
uv_idle_t run_trigger;
isc_joblist_t run_jobs;
/* Pause */
uv_async_t pause_trigger;
@@ -130,15 +135,6 @@ struct isc_signal {
#define JOB_MAGIC ISC_MAGIC('J', 'O', 'B', ' ')
#define VALID_JOB(t) ISC_MAGIC_VALID(t, JOB_MAGIC)
struct isc_job {
int magic;
uv_idle_t idle;
isc_loop_t *loop;
isc_job_cb cb;
void *cbarg;
ISC_LINK(isc_job_t) link;
};
/*
* Work to be offloaded to an external thread.
*/

View File

@@ -252,7 +252,6 @@ typedef union {
isc_nm_recv_cb_t recv;
isc_nm_cb_t send;
isc_nm_cb_t connect;
isc_nm_accept_cb_t accept;
} isc__nm_cb_t;
/*
@@ -278,7 +277,6 @@ struct isc__nm_uvreq {
isc_nm_timer_t *timer; /* TCP write timer */
int connect_tries; /* connect retries */
isc_result_t result;
uv_idle_t idle;
union {
uv_handle_t handle;
@@ -293,6 +291,8 @@ struct isc__nm_uvreq {
} uv_req;
ISC_LINK(isc__nm_uvreq_t) link;
ISC_LINK(isc__nm_uvreq_t) active_link;
isc_job_t job;
};
/*

View File

@@ -120,9 +120,6 @@ nmsocket_maybe_destroy(isc_nmsocket_t *sock FLARG);
static void
nmhandle_free(isc_nmsocket_t *sock, isc_nmhandle_t *handle);
static void
uvreq_free(uv_handle_t *handle);
/*%<
* Issue a 'handle closed' callback on the socket.
*/
@@ -1021,8 +1018,8 @@ nmhandle_destroy(isc_nmhandle_t *handle) {
* call it now asynchronously.
*/
if (sock->closehandle_cb != NULL) {
isc_job_run(sock->worker->netmgr->loopmgr,
isc__nm_closehandle_job, sock);
isc_async_run(sock->worker->loop, isc__nm_closehandle_job,
sock);
} else {
isc___nmsocket_detach(&sock FLARG_PASS);
}
@@ -1600,14 +1597,11 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
.connect_tries = 3,
.link = ISC_LINK_INITIALIZER,
.active_link = ISC_LINK_INITIALIZER,
.job = ISC_JOB_INITIALIZER,
.magic = UVREQ_MAGIC,
};
uv_handle_set_data(&req->uv_req.handle, req);
int r = uv_idle_init(&worker->loop->loop, &req->idle);
UV_RUNTIME_CHECK(uv_idle_init, r);
uv_handle_set_data(&req->idle, req);
isc___nmsocket_attach(sock, &req->sock FLARG_PASS);
ISC_LIST_APPEND(sock->active_uvreqs, req, active_link);
@@ -1615,16 +1609,6 @@ isc___nm_uvreq_get(isc_nmsocket_t *sock FLARG) {
return (req);
}
static void
uvreq_free(uv_handle_t *handle) {
isc__nm_uvreq_t *req = uv_handle_get_data(handle);
isc_nmsocket_t *sock = req->sock;
isc_mempool_put(sock->worker->uvreq_pool, req);
isc___nmsocket_detach(&sock FLARG_PASS);
}
void
isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
REQUIRE(reqp != NULL && VALID_UVREQ(*reqp));
@@ -1644,7 +1628,9 @@ isc___nm_uvreq_put(isc__nm_uvreq_t **reqp FLARG) {
isc__nmhandle_detach(&handle FLARG_PASS);
}
uv_close(&req->idle, uvreq_free);
isc_mempool_put(sock->worker->uvreq_pool, req);
isc___nmsocket_detach(&sock FLARG_PASS);
}
void
@@ -1838,12 +1824,10 @@ isc__nmsocket_barrier_init(isc_nmsocket_t *listener) {
}
static void
isc___nm_connectcb(uv_idle_t *handle) {
isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
isc___nm_connectcb(void *arg) {
isc__nm_uvreq_t *uvreq = arg;
uvreq->cb.connect(uvreq->handle, uvreq->result, uvreq->cbarg);
uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
@@ -1855,28 +1839,25 @@ isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(uvreq->cb.connect != NULL);
uvreq->result = eresult;
if (!async) {
uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq);
isc___nm_connectcb(uvreq);
return;
}
uvreq->result = eresult;
uv_idle_start(&uvreq->idle, isc___nm_connectcb);
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_connectcb, uvreq);
}
static void
isc___nm_readcb(uv_idle_t *handle) {
isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
isc___nm_readcb(void *arg) {
isc__nm_uvreq_t *uvreq = arg;
isc_region_t region;
region.base = (unsigned char *)uvreq->uvbuf.base;
region.length = uvreq->uvbuf.len;
uvreq->cb.recv(uvreq->handle, uvreq->result, &region, uvreq->cbarg);
uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
@@ -1887,29 +1868,21 @@ isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
uvreq->result = eresult;
if (!async) {
isc_region_t region;
region.base = (unsigned char *)uvreq->uvbuf.base;
region.length = uvreq->uvbuf.len;
uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq);
isc___nm_readcb(uvreq);
return;
}
uvreq->result = eresult;
uv_idle_start(&uvreq->idle, isc___nm_readcb);
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_readcb, uvreq);
}
static void
isc___nm_sendcb(uv_idle_t *handle) {
isc__nm_uvreq_t *uvreq = uv_handle_get_data(handle);
isc___nm_sendcb(void *arg) {
isc__nm_uvreq_t *uvreq = arg;
uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
uv_idle_stop(handle);
isc__nm_uvreq_put(&uvreq);
}
@@ -1919,15 +1892,15 @@ isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
uvreq->result = eresult;
if (!async) {
uvreq->cb.send(uvreq->handle, uvreq->result, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq);
isc___nm_sendcb(uvreq);
return;
}
uv_idle_start(&uvreq->idle, isc___nm_sendcb);
isc_job_run(sock->worker->loop, &uvreq->job, isc___nm_sendcb, uvreq);
}
static void

View File

@@ -702,7 +702,8 @@ doh_receive_send_reply_cb(isc_nmhandle_t *handle, isc_result_t eresult,
assert_true(eresult == ISC_R_SUCCESS);
}
isc_job_run(loopmgr, doh_connect_thread, connect_nm);
isc_async_run(isc_loop_current(loopmgr), doh_connect_thread,
connect_nm);
}
if (sends <= 0) {
isc_loopmgr_shutdown(loopmgr);

View File

@@ -39,37 +39,55 @@ static atomic_uint executed;
#define MAX_EXECUTED 1000000
struct test_arg {
isc_job_t job;
union {
int n;
void *ptr;
} arg;
};
static void
shutdown_cb(void *arg) {
UNUSED(arg);
struct test_arg *ta = arg;
isc_mem_put(mctx, ta, sizeof(*ta));
isc_loopmgr_shutdown(loopmgr);
}
static void
job_cb(void *arg __attribute__((__unused__))) {
job_cb(void *arg) {
struct test_arg *ta = arg;
unsigned int n = atomic_fetch_add(&executed, 1);
if (n <= MAX_EXECUTED) {
atomic_fetch_add(&scheduled, 1);
isc_job_run(loopmgr, job_cb, loopmgr);
isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
} else {
isc_job_run(loopmgr, shutdown_cb, loopmgr);
isc_job_run(isc_loop_current(loopmgr), &ta->job, shutdown_cb,
ta);
}
}
static void
job_run_cb(void *arg __attribute__((__unused__))) {
job_run_cb(void *arg) {
struct test_arg *ta = arg;
atomic_fetch_add(&scheduled, 1);
isc_job_run(loopmgr, job_cb, loopmgr);
if (arg == NULL) {
ta = isc_mem_get(mctx, sizeof(*ta));
*ta = (struct test_arg){ .job = ISC_JOB_INITIALIZER };
}
isc_job_run(isc_loop_current(loopmgr), &ta->job, job_cb, ta);
}
ISC_RUN_TEST_IMPL(isc_job_run) {
atomic_init(&scheduled, 0);
atomic_init(&executed, 0);
isc_loopmgr_setup(loopmgr, job_run_cb, loopmgr);
isc_loopmgr_setup(loopmgr, job_run_cb, NULL);
isc_loopmgr_run(loopmgr);
@@ -77,12 +95,18 @@ ISC_RUN_TEST_IMPL(isc_job_run) {
}
static char string[32] = "";
int n1 = 1, n2 = 2, n3 = 3, n4 = 4, n5 = 5;
struct test_arg n1 = { .job = ISC_JOB_INITIALIZER, .arg.n = 1 };
struct test_arg n2 = { .job = ISC_JOB_INITIALIZER, .arg.n = 2 };
struct test_arg n3 = { .job = ISC_JOB_INITIALIZER, .arg.n = 3 };
struct test_arg n4 = { .job = ISC_JOB_INITIALIZER, .arg.n = 4 };
struct test_arg n5 = { .job = ISC_JOB_INITIALIZER, .arg.n = 5 };
static void
append(void *arg) {
struct test_arg *ta = arg;
char value[32];
sprintf(value, "%d", *(int *)arg);
sprintf(value, "%d", ta->arg.n);
strlcat(string, value, 10);
}
@@ -90,12 +114,12 @@ static void
job_multiple(void *arg) {
UNUSED(arg);
/* These will be processed in reverse order */
isc_job_run(loopmgr, append, &n1);
isc_job_run(loopmgr, append, &n2);
isc_job_run(loopmgr, append, &n3);
isc_job_run(loopmgr, append, &n4);
isc_job_run(loopmgr, append, &n5);
/* These will be processed in normal order */
isc_job_run(mainloop, &n1.job, append, &n1);
isc_job_run(mainloop, &n2.job, append, &n2);
isc_job_run(mainloop, &n3.job, append, &n3);
isc_job_run(mainloop, &n4.job, append, &n4);
isc_job_run(mainloop, &n5.job, append, &n5);
isc_loopmgr_shutdown(loopmgr);
}
@@ -103,7 +127,7 @@ ISC_RUN_TEST_IMPL(isc_job_multiple) {
string[0] = '\0';
isc_loop_setup(isc_loop_main(loopmgr), job_multiple, loopmgr);
isc_loopmgr_run(loopmgr);
assert_string_equal(string, "54321");
assert_string_equal(string, "12345");
}
ISC_TEST_LIST_START

View File

@@ -66,11 +66,12 @@ ISC_RUN_TEST_IMPL(isc_loopmgr) {
static void
runjob(void *arg __attribute__((__unused__))) {
if (isc_tid() == 0) {
isc_job_run(loopmgr, shutdown_loopmgr, loopmgr);
}
isc_async_run(isc_loop_current(loopmgr), count, loopmgr);
isc_job_run(loopmgr, count, loopmgr);
if (isc_tid() == 0) {
isc_async_run(isc_loop_current(loopmgr), shutdown_loopmgr,
loopmgr);
}
}
ISC_RUN_TEST_IMPL(isc_loopmgr_runjob) {

View File

@@ -368,7 +368,8 @@ connect_connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
if (have_expected_cconnects(atomic_fetch_add(&cconnects, 1) + 1)) {
do_cconnects_shutdown(loopmgr);
} else if (do_send) {
isc_job_run(loopmgr, stream_recv_send_connect,
isc_async_run(isc_loop_current(loopmgr),
stream_recv_send_connect,
(cbarg == NULL ? get_stream_connect_function()
: (stream_connect_function)cbarg));
}

View File

@@ -540,7 +540,8 @@ ISC_TEARDOWN_TEST_IMPL(udp_shutdown_connect) {
ISC_LOOP_TEST_IMPL(udp_shutdown_connect) {
isc_loopmgr_shutdown(loopmgr);
isc_refcount_increment0(&active_cconnects);
isc_job_run(loopmgr, udp_connect_udpconnect, netmgr);
isc_async_run(isc_loop_current(loopmgr), udp_connect_udpconnect,
netmgr);
}
static void
@@ -864,7 +865,8 @@ udp__connect_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
{
do_cconnects_shutdown(loopmgr);
} else if (do_send) {
isc_job_run(loopmgr, udp__connect, cbarg);
isc_async_run(isc_loop_current(loopmgr), udp__connect,
cbarg);
}
isc_refcount_increment0(&active_creads);