mirror of
https://gitlab.isc.org/isc-projects/bind9
synced 2025-08-31 22:45:39 +00:00
Add isc_helper API that adds 1:1 thread for each loop
Add an extra thread that can be used to offload operations that would affect latency, but are not long-running tasks; those are handled by isc_work API. Each isc_loop now has matching isc_helper thread that also built on top of uv_loop. In fact, it matches most of the isc_loop functionality, but only the `isc_helper_run()` asynchronous call is exposed.
This commit is contained in:
@@ -31,6 +31,7 @@ libisc_la_HEADERS = \
|
|||||||
include/isc/hash.h \
|
include/isc/hash.h \
|
||||||
include/isc/hashmap.h \
|
include/isc/hashmap.h \
|
||||||
include/isc/heap.h \
|
include/isc/heap.h \
|
||||||
|
include/isc/helper.h \
|
||||||
include/isc/hex.h \
|
include/isc/hex.h \
|
||||||
include/isc/histo.h \
|
include/isc/histo.h \
|
||||||
include/isc/hmac.h \
|
include/isc/hmac.h \
|
||||||
@@ -135,6 +136,7 @@ libisc_la_SOURCES = \
|
|||||||
hash.c \
|
hash.c \
|
||||||
hashmap.c \
|
hashmap.c \
|
||||||
heap.c \
|
heap.c \
|
||||||
|
helper.c \
|
||||||
hex.c \
|
hex.c \
|
||||||
histo.c \
|
histo.c \
|
||||||
hmac.c \
|
hmac.c \
|
||||||
|
68
lib/isc/helper.c
Normal file
68
lib/isc/helper.c
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: MPL-2.0
|
||||||
|
*
|
||||||
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* See the COPYRIGHT file distributed with this work for additional
|
||||||
|
* information regarding copyright ownership.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <isc/atomic.h>
|
||||||
|
#include <isc/barrier.h>
|
||||||
|
#include <isc/condition.h>
|
||||||
|
#include <isc/helper.h>
|
||||||
|
#include <isc/job.h>
|
||||||
|
#include <isc/loop.h>
|
||||||
|
#include <isc/magic.h>
|
||||||
|
#include <isc/mem.h>
|
||||||
|
#include <isc/mutex.h>
|
||||||
|
#include <isc/refcount.h>
|
||||||
|
#include <isc/result.h>
|
||||||
|
#include <isc/signal.h>
|
||||||
|
#include <isc/strerr.h>
|
||||||
|
#include <isc/thread.h>
|
||||||
|
#include <isc/util.h>
|
||||||
|
#include <isc/uv.h>
|
||||||
|
#include <isc/work.h>
|
||||||
|
|
||||||
|
#include "async_p.h"
|
||||||
|
#include "job_p.h"
|
||||||
|
#include "loop_p.h"
|
||||||
|
|
||||||
|
void
|
||||||
|
isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
|
||||||
|
REQUIRE(VALID_LOOP(loop));
|
||||||
|
REQUIRE(cb != NULL);
|
||||||
|
|
||||||
|
isc_loop_t *helper = &loop->loopmgr->helpers[loop->tid];
|
||||||
|
|
||||||
|
isc_job_t *job = isc_mem_get(helper->mctx, sizeof(*job));
|
||||||
|
*job = (isc_job_t){
|
||||||
|
.cb = cb,
|
||||||
|
.cbarg = cbarg,
|
||||||
|
};
|
||||||
|
|
||||||
|
cds_wfcq_node_init(&job->wfcq_node);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* cds_wfcq_enqueue() is non-blocking and enqueues the job to async
|
||||||
|
* queue.
|
||||||
|
*
|
||||||
|
* The function returns 'false' in case the queue was empty - in such
|
||||||
|
* case we need to trigger the async callback.
|
||||||
|
*/
|
||||||
|
if (!cds_wfcq_enqueue(&helper->async_jobs.head,
|
||||||
|
&helper->async_jobs.tail, &job->wfcq_node))
|
||||||
|
{
|
||||||
|
int r = uv_async_send(&helper->async_trigger);
|
||||||
|
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||||
|
}
|
||||||
|
}
|
45
lib/isc/include/isc/helper.h
Normal file
45
lib/isc/include/isc/helper.h
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (C) Internet Systems Consortium, Inc. ("ISC")
|
||||||
|
*
|
||||||
|
* SPDX-License-Identifier: MPL-2.0
|
||||||
|
*
|
||||||
|
* This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||||
|
* file, you can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
*
|
||||||
|
* See the COPYRIGHT file distributed with this work for additional
|
||||||
|
* information regarding copyright ownership.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*! \file isc/helper.h */
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <inttypes.h>
|
||||||
|
|
||||||
|
#include <isc/job.h>
|
||||||
|
#include <isc/lang.h>
|
||||||
|
#include <isc/loop.h>
|
||||||
|
#include <isc/mem.h>
|
||||||
|
#include <isc/types.h>
|
||||||
|
|
||||||
|
ISC_LANG_BEGINDECLS
|
||||||
|
|
||||||
|
void
|
||||||
|
isc_helper_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg);
|
||||||
|
/*%<
|
||||||
|
* Schedule the job callback 'cb' to be run on the 'loop' event loop.
|
||||||
|
*
|
||||||
|
* Requires:
|
||||||
|
*
|
||||||
|
*\li 'loop' is a valid isc event loop
|
||||||
|
*\li 'cb' is a callback function, must be non-NULL
|
||||||
|
*\li 'cbarg' is passed to the 'cb' as the only argument, may be NULL
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define isc_helper_current(cb, cbarg) isc_async_run(isc_loop(), cb, cbarg)
|
||||||
|
/*%<
|
||||||
|
* Helper macro to run the job on the current loop
|
||||||
|
*/
|
||||||
|
|
||||||
|
ISC_LANG_ENDDECLS
|
@@ -186,7 +186,8 @@ shutdown_cb(uv_async_t *handle) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid,
|
||||||
|
const char *kind) {
|
||||||
*loop = (isc_loop_t){
|
*loop = (isc_loop_t){
|
||||||
.tid = tid,
|
.tid = tid,
|
||||||
.loopmgr = loopmgr,
|
.loopmgr = loopmgr,
|
||||||
@@ -225,7 +226,7 @@ loop_init(isc_loop_t *loop, isc_loopmgr_t *loopmgr, uint32_t tid) {
|
|||||||
uv_handle_set_data(&loop->quiescent, loop);
|
uv_handle_set_data(&loop->quiescent, loop);
|
||||||
|
|
||||||
char name[16];
|
char name[16];
|
||||||
snprintf(name, sizeof(name), "loop-%08" PRIx32, tid);
|
snprintf(name, sizeof(name), "%s-%08" PRIx32, kind, tid);
|
||||||
isc_mem_create(&loop->mctx);
|
isc_mem_create(&loop->mctx);
|
||||||
isc_mem_setname(loop->mctx, name);
|
isc_mem_setname(loop->mctx, name);
|
||||||
|
|
||||||
@@ -249,6 +250,16 @@ quiescent_cb(uv_prepare_t *handle) {
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
helper_close(isc_loop_t *loop) {
|
||||||
|
int r = uv_loop_close(&loop->loop);
|
||||||
|
UV_RUNTIME_CHECK(uv_loop_close, r);
|
||||||
|
|
||||||
|
INSIST(cds_wfcq_empty(&loop->async_jobs.head, &loop->async_jobs.tail));
|
||||||
|
|
||||||
|
isc_mem_detach(&loop->mctx);
|
||||||
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
loop_close(isc_loop_t *loop) {
|
loop_close(isc_loop_t *loop) {
|
||||||
int r = uv_loop_close(&loop->loop);
|
int r = uv_loop_close(&loop->loop);
|
||||||
@@ -262,9 +273,32 @@ loop_close(isc_loop_t *loop) {
|
|||||||
isc_mem_detach(&loop->mctx);
|
isc_mem_detach(&loop->mctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *
|
||||||
|
helper_thread(void *arg) {
|
||||||
|
isc_loop_t *helper = (isc_loop_t *)arg;
|
||||||
|
|
||||||
|
int r = uv_prepare_start(&helper->quiescent, quiescent_cb);
|
||||||
|
UV_RUNTIME_CHECK(uv_prepare_start, r);
|
||||||
|
|
||||||
|
isc_barrier_wait(&helper->loopmgr->starting);
|
||||||
|
|
||||||
|
r = uv_run(&helper->loop, UV_RUN_DEFAULT);
|
||||||
|
UV_RUNTIME_CHECK(uv_run, r);
|
||||||
|
|
||||||
|
/* Invalidate the helper early */
|
||||||
|
helper->magic = 0;
|
||||||
|
|
||||||
|
isc_barrier_wait(&helper->loopmgr->stopping);
|
||||||
|
|
||||||
|
return (NULL);
|
||||||
|
}
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
loop_thread(void *arg) {
|
loop_thread(void *arg) {
|
||||||
isc_loop_t *loop = (isc_loop_t *)arg;
|
isc_loop_t *loop = (isc_loop_t *)arg;
|
||||||
|
isc_loopmgr_t *loopmgr = loop->loopmgr;
|
||||||
|
isc_loop_t *helper = &loopmgr->helpers[loop->tid];
|
||||||
|
char name[32];
|
||||||
/* Initialize the thread_local variables*/
|
/* Initialize the thread_local variables*/
|
||||||
|
|
||||||
REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
|
REQUIRE(isc__loop_local == NULL || isc__loop_local == loop);
|
||||||
@@ -272,10 +306,15 @@ loop_thread(void *arg) {
|
|||||||
|
|
||||||
isc__tid_init(loop->tid);
|
isc__tid_init(loop->tid);
|
||||||
|
|
||||||
|
/* Start the helper thread */
|
||||||
|
isc_thread_create(helper_thread, helper, &helper->thread);
|
||||||
|
snprintf(name, sizeof(name), "isc-helper-%04" PRIu32, loop->tid);
|
||||||
|
isc_thread_setname(helper->thread, name);
|
||||||
|
|
||||||
int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
|
int r = uv_prepare_start(&loop->quiescent, quiescent_cb);
|
||||||
UV_RUNTIME_CHECK(uv_prepare_start, r);
|
UV_RUNTIME_CHECK(uv_prepare_start, r);
|
||||||
|
|
||||||
isc_barrier_wait(&loop->loopmgr->starting);
|
isc_barrier_wait(&loopmgr->starting);
|
||||||
|
|
||||||
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
|
enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
|
||||||
&loop->async_jobs.head, &loop->async_jobs.tail,
|
&loop->async_jobs.head, &loop->async_jobs.tail,
|
||||||
@@ -293,7 +332,11 @@ loop_thread(void *arg) {
|
|||||||
/* Invalidate the loop early */
|
/* Invalidate the loop early */
|
||||||
loop->magic = 0;
|
loop->magic = 0;
|
||||||
|
|
||||||
isc_barrier_wait(&loop->loopmgr->stopping);
|
/* Shutdown the helper thread */
|
||||||
|
r = uv_async_send(&helper->shutdown_trigger);
|
||||||
|
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||||
|
|
||||||
|
isc_barrier_wait(&loopmgr->stopping);
|
||||||
|
|
||||||
return (NULL);
|
return (NULL);
|
||||||
}
|
}
|
||||||
@@ -342,16 +385,24 @@ isc_loopmgr_create(isc_mem_t *mctx, uint32_t nloops, isc_loopmgr_t **loopmgrp) {
|
|||||||
|
|
||||||
isc_mem_attach(mctx, &loopmgr->mctx);
|
isc_mem_attach(mctx, &loopmgr->mctx);
|
||||||
|
|
||||||
isc_barrier_init(&loopmgr->pausing, loopmgr->nloops);
|
/* We need to double the number for loops and helpers */
|
||||||
isc_barrier_init(&loopmgr->resuming, loopmgr->nloops);
|
isc_barrier_init(&loopmgr->pausing, loopmgr->nloops * 2);
|
||||||
isc_barrier_init(&loopmgr->starting, loopmgr->nloops);
|
isc_barrier_init(&loopmgr->resuming, loopmgr->nloops * 2);
|
||||||
isc_barrier_init(&loopmgr->stopping, loopmgr->nloops);
|
isc_barrier_init(&loopmgr->starting, loopmgr->nloops * 2);
|
||||||
|
isc_barrier_init(&loopmgr->stopping, loopmgr->nloops * 2);
|
||||||
|
|
||||||
loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
|
loopmgr->loops = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
|
||||||
sizeof(loopmgr->loops[0]));
|
sizeof(loopmgr->loops[0]));
|
||||||
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
isc_loop_t *loop = &loopmgr->loops[i];
|
isc_loop_t *loop = &loopmgr->loops[i];
|
||||||
loop_init(loop, loopmgr, i);
|
loop_init(loop, loopmgr, i, "loop");
|
||||||
|
}
|
||||||
|
|
||||||
|
loopmgr->helpers = isc_mem_cget(loopmgr->mctx, loopmgr->nloops,
|
||||||
|
sizeof(loopmgr->helpers[0]));
|
||||||
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
|
isc_loop_t *loop = &loopmgr->helpers[i];
|
||||||
|
loop_init(loop, loopmgr, i, "helper");
|
||||||
}
|
}
|
||||||
|
|
||||||
loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
|
loopmgr->sigint = isc_signal_new(loopmgr, isc__loopmgr_signal, loopmgr,
|
||||||
@@ -465,6 +516,7 @@ isc_loopmgr_run(isc_loopmgr_t *loopmgr) {
|
|||||||
void
|
void
|
||||||
isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
|
isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
|
||||||
REQUIRE(VALID_LOOPMGR(loopmgr));
|
REQUIRE(VALID_LOOPMGR(loopmgr));
|
||||||
|
REQUIRE(isc_tid() != ISC_TID_UNKNOWN);
|
||||||
|
|
||||||
if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
|
if (isc_log_wouldlog(ISC_LOG_DEBUG(1))) {
|
||||||
isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER,
|
isc_log_write(ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_OTHER,
|
||||||
@@ -472,6 +524,13 @@ isc_loopmgr_pause(isc_loopmgr_t *loopmgr) {
|
|||||||
"loop exclusive mode: starting");
|
"loop exclusive mode: starting");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
|
isc_loop_t *helper = &loopmgr->helpers[i];
|
||||||
|
|
||||||
|
int r = uv_async_send(&helper->pause_trigger);
|
||||||
|
UV_RUNTIME_CHECK(uv_async_send, r);
|
||||||
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
isc_loop_t *loop = &loopmgr->loops[i];
|
isc_loop_t *loop = &loopmgr->loops[i];
|
||||||
|
|
||||||
@@ -526,6 +585,12 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) {
|
|||||||
RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
|
RUNTIME_CHECK(atomic_compare_exchange_strong(&loopmgr->running,
|
||||||
&(bool){ true }, false));
|
&(bool){ true }, false));
|
||||||
|
|
||||||
|
/* Wait for all helpers to finish */
|
||||||
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
|
isc_loop_t *helper = &loopmgr->helpers[i];
|
||||||
|
isc_thread_join(helper->thread, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
/* First wait for all loops to finish */
|
/* First wait for all loops to finish */
|
||||||
for (size_t i = 1; i < loopmgr->nloops; i++) {
|
for (size_t i = 1; i < loopmgr->nloops; i++) {
|
||||||
isc_loop_t *loop = &loopmgr->loops[i];
|
isc_loop_t *loop = &loopmgr->loops[i];
|
||||||
@@ -534,6 +599,13 @@ isc_loopmgr_destroy(isc_loopmgr_t **loopmgrp) {
|
|||||||
|
|
||||||
loopmgr->magic = 0;
|
loopmgr->magic = 0;
|
||||||
|
|
||||||
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
|
isc_loop_t *helper = &loopmgr->helpers[i];
|
||||||
|
helper_close(helper);
|
||||||
|
}
|
||||||
|
isc_mem_cput(loopmgr->mctx, loopmgr->helpers, loopmgr->nloops,
|
||||||
|
sizeof(loopmgr->helpers[0]));
|
||||||
|
|
||||||
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
for (size_t i = 0; i < loopmgr->nloops; i++) {
|
||||||
isc_loop_t *loop = &loopmgr->loops[i];
|
isc_loop_t *loop = &loopmgr->loops[i];
|
||||||
loop_close(loop);
|
loop_close(loop);
|
||||||
|
@@ -110,6 +110,7 @@ struct isc_loopmgr {
|
|||||||
|
|
||||||
/* per-thread objects */
|
/* per-thread objects */
|
||||||
isc_loop_t *loops;
|
isc_loop_t *loops;
|
||||||
|
isc_loop_t *helpers;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
Reference in New Issue
Block a user