2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-02 07:35:26 +00:00

Add asynchronous work API to the network manager

The libuv has a support for running long running tasks in the dedicated
threadpools, so it doesn't affect networking IO.

This commit adds isc_nm_work_enqueue() wrapper that would wraps around
the libuv API and runs it on top of associated worker loop.

The only limitation is that the function must be called from inside
network manager thread, so the call to the function should be wrapped
inside a (bound) task.
This commit is contained in:
Ondřej Surý
2021-05-27 09:45:07 +02:00
committed by Ondřej Surý
parent 211bfefbaa
commit 87fe97ed91
4 changed files with 125 additions and 4 deletions

View File

@@ -68,6 +68,12 @@ typedef void (*isc_nm_opaquecb_t)(void *arg);
* callbacks.
*/
typedef void (*isc_nm_workcb_t)(void *arg);
typedef void (*isc_nm_after_workcb_t)(void *arg, isc_result_t result);
/*%<
* Callback functions for libuv threadpool work (see uv_work_t)
*/
void
isc_nm_attach(isc_nm_t *mgr, isc_nm_t **dst);
void
@@ -529,6 +535,19 @@ isc_nm_task_enqueue(isc_nm_t *mgr, isc_task_t *task, int threadid);
* maximum number of 'workers' as specifed in isc_nm_start()
*/
void
isc_nm_work_offload(isc_nm_t *mgr, isc_nm_workcb_t work_cb,
isc_nm_after_workcb_t after_work_cb, void *data);
/*%<
* Schedules a job to be handled by the libuv thread pool (see uv_work_t).
* The function specified in `work_cb` will be run by a thread in the
* thread pool; when complete, the `after_work_cb` function will run.
*
* Requires:
* \li 'mgr' is a valid netmgr object.
* \li We are currently running in a network manager thread.
*/
void
isc__nm_force_tid(int tid);
/*%<

View File

@@ -646,6 +646,17 @@ typedef union {
isc__netievent_tlsconnect_t nitc;
} isc__netievent_storage_t;
/*
* Work item for a uv_work threadpool.
*/
typedef struct isc__nm_work {
isc_nm_t *netmgr;
uv_work_t req;
isc_nm_workcb_t cb;
isc_nm_after_workcb_t after_cb;
void *data;
} isc__nm_work_t;
/*
* Network manager
*/

View File

@@ -40,6 +40,7 @@
#include "netmgr-int.h"
#include "netmgr_p.h"
#include "openssl_shim.h"
#include "trampoline_p.h"
#include "uv-compat.h"
/*%
@@ -199,6 +200,14 @@ static void
isc__nm_async_detach(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_async_close(isc__networker_t *worker, isc__netievent_t *ev0);
static void
isc__nm_threadpool_initialize(uint32_t workers);
static void
isc__nm_work_cb(uv_work_t *req);
static void
isc__nm_after_work_cb(uv_work_t *req, int status);
/*%<
* Issue a 'handle closed' callback on the socket.
*/
@@ -256,6 +265,17 @@ isc__nm_winsock_destroy(void) {
}
#endif /* WIN32 */
static void
isc__nm_threadpool_initialize(uint32_t workers) {
char buf[11];
int r = uv_os_getenv("UV_THREADPOOL_SIZE", buf,
&(size_t){ sizeof(buf) });
if (r == UV_ENOENT) {
snprintf(buf, sizeof(buf), "%" PRIu32, workers);
uv_os_setenv("UV_THREADPOOL_SIZE", buf);
}
}
void
isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc_nm_t *mgr = NULL;
@@ -267,6 +287,8 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
isc__nm_winsock_initialize();
#endif /* WIN32 */
isc__nm_threadpool_initialize(workers);
mgr = isc_mem_get(mctx, sizeof(*mgr));
*mgr = (isc_nm_t){ .nworkers = workers };
@@ -280,8 +302,6 @@ isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
atomic_init(&mgr->workers_paused, 0);
atomic_init(&mgr->paused, false);
atomic_init(&mgr->closing, false);
atomic_init(&mgr->idle, false);
atomic_init(&mgr->keepalive, false);
atomic_init(&mgr->recv_tcp_buffer_size, 0);
atomic_init(&mgr->send_tcp_buffer_size, 0);
atomic_init(&mgr->recv_udp_buffer_size, 0);
@@ -818,7 +838,8 @@ async_cb(uv_async_t *handle) {
isc__networker_t *worker = (isc__networker_t *)handle->loop->data;
if (process_all_queues(worker)) {
/* If we didn't process all the events, we need to enqueue
/*
* If we didn't process all the events, we need to enqueue
* async_cb to be run in the next iteration of the uv_loop
*/
uv_async_send(handle);
@@ -3242,6 +3263,73 @@ isc__nm_set_network_buffers(isc_nm_t *nm, uv_handle_t *handle) {
}
}
static isc_threadresult_t
isc__nm_work_run(isc_threadarg_t arg) {
isc__nm_work_t *work = (isc__nm_work_t *)arg;
work->cb(work->data);
return ((isc_threadresult_t)0);
}
static void
isc__nm_work_cb(uv_work_t *req) {
isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
if (isc_tid_v == SIZE_MAX) {
isc__trampoline_t *trampoline_arg =
isc__trampoline_get(isc__nm_work_run, work);
(void)isc__trampoline_run(trampoline_arg);
} else {
(void)isc__nm_work_run((isc_threadarg_t)work);
}
}
static void
isc__nm_after_work_cb(uv_work_t *req, int status) {
isc_result_t result = ISC_R_SUCCESS;
isc__nm_work_t *work = uv_req_get_data((uv_req_t *)req);
isc_nm_t *netmgr = work->netmgr;
if (status != 0) {
result = isc__nm_uverr2result(status);
}
work->after_cb(work->data, result);
isc_mem_put(netmgr->mctx, work, sizeof(*work));
isc_nm_detach(&netmgr);
}
void
isc_nm_work_offload(isc_nm_t *netmgr, isc_nm_workcb_t work_cb,
isc_nm_after_workcb_t after_work_cb, void *data) {
isc__networker_t *worker = NULL;
isc__nm_work_t *work = NULL;
int r;
REQUIRE(isc__nm_in_netthread());
REQUIRE(VALID_NM(netmgr));
worker = &netmgr->workers[isc_nm_tid()];
work = isc_mem_get(netmgr->mctx, sizeof(*work));
*work = (isc__nm_work_t){
.cb = work_cb,
.after_cb = after_work_cb,
.data = data,
};
isc_nm_attach(netmgr, &work->netmgr);
uv_req_set_data((uv_req_t *)&work->req, work);
r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
isc__nm_after_work_cb);
RUNTIME_CHECK(r == 0);
}
#ifdef NETMGR_TRACE
/*
* Dump all active sockets in netmgr. We output to stderr
@@ -3302,7 +3390,8 @@ nmsocket_dump(isc_nmsocket_t *sock) {
nmsocket_type_totext(sock->type),
isc_refcount_current(&sock->references));
fprintf(stderr,
"Parent %p, listener %p, server %p, statichandle = %p\n",
"Parent %p, listener %p, server %p, statichandle = "
"%p\n",
sock->parent, sock->listener, sock->server, sock->statichandle);
fprintf(stderr, "Flags:%s%s%s%s%s\n",
atomic_load(&sock->active) ? " active" : "",

View File

@@ -474,11 +474,13 @@ isc_nm_tcpdnsconnect
isc_nm_gettimeouts
isc_nm_setnetbuffers
isc_nm_settimeouts
isc_nm_task_enqueue
isc_nm_tcpdns_keepalive
isc_nm_tcpdns_sequential
isc_nm_tid
isc_nm_tlsdnsconnect
isc_nm_udpconnect
isc_nm_work_offload
isc_nmsocket_close
isc__nm_acquire_interlocked
isc__nm_drop_interlocked