From 1d066e4bc5dfcd72a8d82c5318247efb9ed86aae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Sur=C3=BD?= Date: Wed, 2 Dec 2020 15:37:18 +0100 Subject: [PATCH] Distribute queries among threads even on platforms without lb sockets On platforms without load-balancing socket all the queries would be handle by a single thread. Currently, the support for load-balanced sockets is present in Linux with SO_REUSEPORT and FreeBSD 12 with SO_REUSEPORT_LB. This commit adds workaround for such platforms that: 1. setups single shared listening socket for all listening nmthreads for UDP, TCP and TCPDNS netmgr transports 2. Calls uv_udp_bind/uv_tcp_bind on the underlying socket just once and for rest of the nmthreads only copy the internal libuv flags (should be just UV_HANDLE_BOUND and optionally UV_HANDLE_IPV6). 3. start reading on UDP socket or listening on TCP socket The load distribution among the nmthreads is uneven, but it's still better than utilizing just one thread for processing all the incoming queries --- lib/isc/netmgr/netmgr-int.h | 4 ++ lib/isc/netmgr/netmgr.c | 24 -------- lib/isc/netmgr/tcp.c | 91 +++++++++++++++++++----------- lib/isc/netmgr/tcpdns.c | 101 +++++++++++++++++++++------------- lib/isc/netmgr/udp.c | 85 ++++++++++++++++++---------- lib/isc/netmgr/uverr2result.c | 2 + 6 files changed, 182 insertions(+), 125 deletions(-) diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 4ffe61a1b8..d5f048b2d0 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -52,6 +52,10 @@ #define ISC_NETMGR_RECVBUF_SIZE (65536) #endif +#if defined(SO_REUSEPORT_LB) || (defined(SO_REUSEPORT) && defined(__linux__)) +#define HAVE_REUSEPORT_LB 1 +#endif + /* * Define NETMGR_TRACE to activate tracing of handles and sockets. * This will impair performance but enables us to quickly determine, diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index a4e54b3f5c..9ed966a6ed 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -169,22 +169,6 @@ isc__nm_in_netthread(void) { return (isc__nm_tid_v >= 0); } -static bool -isc__nm_test_lb_socket(sa_family_t sa_family, int protocol) { - isc_result_t result; - uv_os_sock_t fd = -1; - - result = isc__nm_socket(sa_family, protocol, 0, &fd); - REQUIRE(result == ISC_R_SUCCESS); - - result = isc__nm_socket_reuse_lb(fd); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); - - isc__nm_closesocket(fd); - - return (result == ISC_R_SUCCESS); -} - #ifdef WIN32 static void isc__nm_winsock_initialize(void) { @@ -231,14 +215,6 @@ isc_nm_start(isc_mem_t *mctx, uint32_t workers) { isc__nm_tls_initialize(); - if (!isc__nm_test_lb_socket(AF_INET, SOCK_DGRAM) || - !isc__nm_test_lb_socket(AF_INET, SOCK_STREAM) || - !isc__nm_test_lb_socket(AF_INET6, SOCK_DGRAM) || - !isc__nm_test_lb_socket(AF_INET6, SOCK_STREAM)) - { - workers = 1; - } - mgr = isc_mem_get(mctx, sizeof(*mgr)); *mgr = (isc_nm_t){ .nworkers = workers }; diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index b6c25dafad..02b276c874 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -378,27 +378,27 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, return (result); } -static isc_result_t -isc__nm_tcp_lb_socket(sa_family_t sa_family, uv_os_sock_t *sockp) { +static uv_os_sock_t +isc__nm_tcp_lb_socket(sa_family_t sa_family) { isc_result_t result; uv_os_sock_t sock; result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock); - REQUIRE(result == ISC_R_SUCCESS); + RUNTIME_CHECK(result == ISC_R_SUCCESS); (void)isc__nm_socket_incoming_cpu(sock); /* FIXME: set mss */ result = isc__nm_socket_reuse(sock); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#if HAVE_SO_REUSEPORT_LB result = isc__nm_socket_reuse_lb(sock); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#endif - *sockp = sock; - - return (result); + return (sock); } isc_result_t @@ -410,6 +410,7 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock = NULL; sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; + uv_os_sock_t fd = -1; REQUIRE(VALID_NM(mgr)); @@ -417,7 +418,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc__nmsocket_init(sock, mgr, isc_nm_tcplistener, iface); sock->rchildren = 0; +#if defined(WIN32) + sock->nchildren = 1; +#else sock->nchildren = mgr->nworkers; +#endif children_size = sock->nchildren * sizeof(sock->children[0]); sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); @@ -426,6 +431,10 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, sock->tid = isc_random_uniform(mgr->nworkers); sock->fd = -1; +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + fd = isc__nm_tcp_lb_socket(sa_family); +#endif + for (size_t i = 0; i < mgr->nworkers; i++) { isc__netievent_tcplisten_t *ievent = NULL; isc_nmsocket_t *csock = &sock->children[i]; @@ -444,9 +453,11 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, csock->pquota = quota; isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); - result = isc__nm_tcp_lb_socket(sa_family, &csock->fd); - REQUIRE(result == ISC_R_SUCCESS || - result == ISC_R_NOTIMPLEMENTED); +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + csock->fd = isc__nm_tcp_lb_socket(sa_family); +#else + csock->fd = dup(fd); +#endif REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcplisten(mgr, csock); @@ -454,6 +465,10 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, (isc__netievent_t *)ievent); } +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + isc__nm_closesocket(fd); +#endif + LOCK(&sock->lock); while (sock->rchildren != mgr->nworkers) { WAIT(&sock->cond, &sock->lock); @@ -479,11 +494,12 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcplisten_t *ievent = (isc__netievent_tcplisten_t *)ev0; - isc_nmiface_t *iface; + isc_nmiface_t *iface = NULL; sa_family_t sa_family; int r; int flags = 0; isc_nmsocket_t *sock = NULL; + isc_result_t result; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -502,14 +518,18 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); RUNTIME_CHECK(r == 0); + uv_handle_set_data(&sock->uv_handle.handle, sock); /* This keeps the socket alive after everything else is gone */ isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL }); r = uv_timer_init(&worker->loop, &sock->timer); RUNTIME_CHECK(r == 0); + uv_handle_set_data((uv_handle_t *)&sock->timer, sock); + LOCK(&sock->parent->lock); + r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd); if (r < 0) { isc__nm_closesocket(sock->fd); @@ -522,12 +542,29 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { flags = UV_TCP_IPV6ONLY; } +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, flags); - if (r < 0 && r != UV_EINVAL) { + if (r < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto failure; } +#else + if (sock->parent->fd == -1) { + r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, + &sock->iface->addr.type.sa, flags); + if (r < 0) { + isc__nm_incstats(sock->mgr, + sock->statsindex[STATID_BINDFAIL]); + goto failure; + } + sock->parent->uv_handle.tcp.flags = sock->uv_handle.tcp.flags; + sock->parent->fd = sock->fd; + } else { + /* The socket is already bound, just copy the flags */ + sock->uv_handle.tcp.flags = sock->parent->uv_handle.tcp.flags; + } +#endif /* * The callback will run in the same thread uv_listen() was called @@ -535,7 +572,7 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { */ r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog, tcp_connection_cb); - if (r < 0) { + if (r != 0) { isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "uv_listen failed: %s", @@ -546,27 +583,15 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { atomic_store(&sock->listening, true); - LOCK(&sock->parent->lock); - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = ISC_R_SUCCESS; - } - SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); - UNLOCK(&sock->parent->lock); - - return; - failure: - sock->pquota = NULL; + result = isc__nm_uverr2result(r); + if (result != ISC_R_SUCCESS) { + sock->pquota = NULL; + } - LOCK(&sock->parent->lock); sock->parent->rchildren += 1; if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = isc__nm_uverr2result(r); + sock->parent->result = result; } SIGNAL(&sock->parent->cond); if (!atomic_load(&sock->parent->active)) { @@ -926,7 +951,7 @@ isc__nm_tcp_resumeread(isc_nmhandle_t *handle) { static void read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)stream); - isc__nm_uvreq_t *req; + isc__nm_uvreq_t *req = NULL; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -1027,7 +1052,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { isc_result_t result; struct sockaddr_storage ss; isc_sockaddr_t local; - isc_nmhandle_t *handle; + isc_nmhandle_t *handle = NULL; REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 9d55f9412d..dcd24316ca 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -421,27 +421,27 @@ isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, return (result); } -static isc_result_t -isc__nm_tcpdns_lb_socket(sa_family_t sa_family, uv_os_sock_t *sockp) { +static uv_os_sock_t +isc__nm_tcpdns_lb_socket(sa_family_t sa_family) { isc_result_t result; uv_os_sock_t sock; result = isc__nm_socket(sa_family, SOCK_STREAM, 0, &sock); - REQUIRE(result == ISC_R_SUCCESS); + RUNTIME_CHECK(result == ISC_R_SUCCESS); (void)isc__nm_socket_incoming_cpu(sock); /* FIXME: set mss */ result = isc__nm_socket_reuse(sock); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#if HAVE_SO_REUSEPORT_LB result = isc__nm_socket_reuse_lb(sock); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#endif - *sockp = sock; - - return (result); + return (sock); } isc_result_t @@ -454,6 +454,7 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nmsocket_t *sock = NULL; sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; + uv_os_sock_t fd = -1; REQUIRE(VALID_NM(mgr)); @@ -461,7 +462,11 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, isc__nmsocket_init(sock, mgr, isc_nm_tcpdnslistener, iface); sock->rchildren = 0; +#if defined(WIN32) + sock->nchildren = 1; +#else sock->nchildren = mgr->nworkers; +#endif children_size = sock->nchildren * sizeof(sock->children[0]); sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); @@ -470,6 +475,10 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, sock->tid = isc_random_uniform(mgr->nworkers); sock->fd = -1; +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + fd = isc__nm_tcpdns_lb_socket(sa_family); +#endif + for (size_t i = 0; i < mgr->nworkers; i++) { isc__netievent_tcpdnslisten_t *ievent = NULL; isc_nmsocket_t *csock = &sock->children[i]; @@ -490,9 +499,11 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, csock->pquota = quota; isc_quota_cb_init(&csock->quotacb, quota_accept_cb, csock); - result = isc__nm_tcpdns_lb_socket(sa_family, &csock->fd); - REQUIRE(result == ISC_R_SUCCESS || - result == ISC_R_NOTIMPLEMENTED); +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + csock->fd = isc__nm_tcpdns_lb_socket(sa_family); +#else + csock->fd = dup(fd); +#endif REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_tcpdnslisten(mgr, csock); @@ -500,6 +511,10 @@ isc_nm_listentcpdns(isc_nm_t *mgr, isc_nmiface_t *iface, (isc__netievent_t *)ievent); } +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + isc__nm_closesocket(fd); +#endif + LOCK(&sock->lock); while (sock->rchildren != mgr->nworkers) { WAIT(&sock->cond, &sock->lock); @@ -526,11 +541,12 @@ void isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpdnslisten_t *ievent = (isc__netievent_tcpdnslisten_t *)ev0; - isc_nmiface_t *iface; + isc_nmiface_t *iface = NULL; sa_family_t sa_family; int r; int flags = 0; isc_nmsocket_t *sock = NULL; + isc_result_t result = ISC_R_DEFAULT; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -557,6 +573,8 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { RUNTIME_CHECK(r == 0); uv_handle_set_data((uv_handle_t *)&sock->timer, sock); + LOCK(&sock->parent->lock); + r = uv_tcp_open(&sock->uv_handle.tcp, sock->fd); if (r < 0) { isc__nm_closesocket(sock->fd); @@ -569,12 +587,29 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { flags = UV_TCP_IPV6ONLY; } +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, &sock->iface->addr.type.sa, flags); - if (r < 0 && r != UV_EINVAL) { + if (r < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto failure; } +#else + if (sock->parent->fd == -1) { + r = isc_uv_tcp_freebind(&sock->uv_handle.tcp, + &sock->iface->addr.type.sa, flags); + if (r < 0) { + isc__nm_incstats(sock->mgr, + sock->statsindex[STATID_BINDFAIL]); + goto failure; + } + sock->parent->uv_handle.tcp.flags = sock->uv_handle.tcp.flags; + sock->parent->fd = sock->fd; + } else { + /* The socket is already bound, just copy the flags */ + sock->uv_handle.tcp.flags = sock->parent->uv_handle.tcp.flags; + } +#endif /* * The callback will run in the same thread uv_listen() was called @@ -582,7 +617,7 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { */ r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog, tcpdns_connection_cb); - if (r < 0) { + if (r != 0) { isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "uv_listen failed: %s", @@ -593,27 +628,15 @@ isc__nm_async_tcpdnslisten(isc__networker_t *worker, isc__netievent_t *ev0) { atomic_store(&sock->listening, true); - LOCK(&sock->parent->lock); - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = ISC_R_SUCCESS; - } - SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); - UNLOCK(&sock->parent->lock); - - return; - failure: - sock->pquota = NULL; + result = isc__nm_uverr2result(r); + if (result != ISC_R_SUCCESS) { + sock->pquota = NULL; + } - LOCK(&sock->parent->lock); sock->parent->rchildren += 1; if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = isc__nm_uverr2result(r); + sock->parent->result = result; } SIGNAL(&sock->parent->cond); if (!atomic_load(&sock->parent->active)) { @@ -921,7 +944,7 @@ isc__nm_async_tcpdnsread(isc__networker_t *worker, isc__netievent_t *ev0) { static isc_result_t processbuffer(isc_nmsocket_t *sock) { size_t len; - isc__nm_uvreq_t *req; + isc__nm_uvreq_t *req = NULL; isc_nmhandle_t *handle = NULL; REQUIRE(VALID_NMSOCK(sock)); @@ -1049,14 +1072,15 @@ free: static void quota_accept_cb(isc_quota_t *quota, void *sock0) { isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0; - isc__netievent_tcpdnsaccept_t *ievent = NULL; REQUIRE(VALID_NMSOCK(sock)); /* * Create a tcpdnsaccept event and pass it using the async channel. */ - ievent = isc__nm_get_netievent_tcpdnsaccept(sock->mgr, sock, quota); + + isc__netievent_tcpdnsaccept_t *ievent = + isc__nm_get_netievent_tcpdnsaccept(sock->mgr, sock, quota); isc__nm_maybe_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)ievent); } @@ -1068,15 +1092,14 @@ void isc__nm_async_tcpdnsaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpdnsaccept_t *ievent = (isc__netievent_tcpdnsaccept_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; isc_result_t result; UNUSED(worker); - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(VALID_NMSOCK(ievent->sock)); + REQUIRE(ievent->sock->tid == isc_nm_tid()); - result = accept_connection(sock, ievent->quota); + result = accept_connection(ievent->sock, ievent->quota); if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) { if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) || can_log_tcpdns_quota()) @@ -1098,7 +1121,7 @@ accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { struct sockaddr_storage peer_ss; struct sockaddr_storage local_ss; isc_sockaddr_t local; - isc_nmhandle_t *handle; + isc_nmhandle_t *handle = NULL; REQUIRE(VALID_NMSOCK(ssock)); REQUIRE(ssock->tid == isc_nm_tid()); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 726efd6f0f..22f96d135d 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -77,27 +77,26 @@ inactive(isc_nmsocket_t *sock) { (sock->server != NULL && !isc__nmsocket_active(sock->server))); } -static isc_result_t -isc__nm_udp_lb_socket(sa_family_t sa_family, uv_os_sock_t *sockp) { +static uv_os_sock_t +isc__nm_udp_lb_socket(sa_family_t sa_family) { isc_result_t result; uv_os_sock_t sock; result = isc__nm_socket(sa_family, SOCK_DGRAM, 0, &sock); - REQUIRE(result == ISC_R_SUCCESS); + RUNTIME_CHECK(result == ISC_R_SUCCESS); (void)isc__nm_socket_incoming_cpu(sock); - (void)isc__nm_socket_dontfrag(sock, sa_family); result = isc__nm_socket_reuse(sock); - REQUIRE(result == ISC_R_SUCCESS); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#if HAVE_SO_REUSEPORT_LB result = isc__nm_socket_reuse_lb(sock); - REQUIRE(result == ISC_R_SUCCESS || result == ISC_R_NOTIMPLEMENTED); + RUNTIME_CHECK(result == ISC_R_SUCCESS); +#endif - *sockp = sock; - - return (result); + return (sock); } isc_result_t @@ -107,6 +106,7 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc_nmsocket_t *sock = NULL; sa_family_t sa_family = iface->addr.type.sa.sa_family; size_t children_size = 0; + uv_os_sock_t fd = -1; REQUIRE(VALID_NM(mgr)); @@ -118,7 +118,12 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc__nmsocket_init(sock, mgr, isc_nm_udplistener, iface); sock->rchildren = 0; +#if defined(WIN32) + sock->nchildren = 1; +#else sock->nchildren = mgr->nworkers; +#endif + children_size = sock->nchildren * sizeof(sock->children[0]); sock->children = isc_mem_get(mgr->mctx, children_size); memset(sock->children, 0, children_size); @@ -130,6 +135,10 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, sock->tid = isc_random_uniform(mgr->nworkers); sock->fd = -1; +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + fd = isc__nm_udp_lb_socket(sa_family); +#endif + for (size_t i = 0; i < mgr->nworkers; i++) { isc__netievent_udplisten_t *ievent = NULL; isc_nmsocket_t *csock = &sock->children[i]; @@ -143,7 +152,11 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, csock->extrahandlesize = sock->extrahandlesize; csock->tid = i; - (void)isc__nm_udp_lb_socket(sa_family, &csock->fd); +#if HAVE_SO_REUSEPORT_LB || defined(WIN32) + csock->fd = isc__nm_udp_lb_socket(sa_family); +#else + csock->fd = dup(fd); +#endif REQUIRE(csock->fd >= 0); ievent = isc__nm_get_netievent_udplisten(mgr, csock); @@ -151,6 +164,10 @@ isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, (isc__netievent_t *)ievent); } +#if !HAVE_SO_REUSEPORT_LB && !defined(WIN32) + isc__nm_closesocket(fd); +#endif + LOCK(&sock->lock); while (sock->rchildren != mgr->nworkers) { WAIT(&sock->cond, &sock->lock); @@ -205,11 +222,12 @@ udp_alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) { void isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udplisten_t *ievent = (isc__netievent_udplisten_t *)ev0; - isc_nmiface_t *iface; + isc_nmiface_t *iface = NULL; isc_nmsocket_t *sock = NULL; int r, uv_bind_flags = 0; int uv_init_flags = 0; sa_family_t sa_family; + isc_result_t result = ISC_R_DEFAULT; REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(ievent->sock->tid == isc_nm_tid()); @@ -237,6 +255,8 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { RUNTIME_CHECK(r == 0); uv_handle_set_data((uv_handle_t *)&sock->timer, sock); + LOCK(&sock->parent->lock); + r = uv_udp_open(&sock->uv_handle.udp, sock->fd); if (r < 0) { isc__nm_closesocket(sock->fd); @@ -249,13 +269,33 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { uv_bind_flags |= UV_UDP_IPV6ONLY; } +#if HAVE_SO_REUSEPORT_LB || WIN32 r = isc_uv_udp_freebind(&sock->uv_handle.udp, &sock->parent->iface->addr.type.sa, uv_bind_flags); - if (r < 0 && r != UV_EINVAL) { + if (r < 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); goto failure; } +#else + if (sock->parent->fd == -1) { + /* This thread is first, bind the socket */ + r = isc_uv_udp_freebind(&sock->uv_handle.udp, + &sock->parent->iface->addr.type.sa, + uv_bind_flags); + if (r < 0) { + isc__nm_incstats(sock->mgr, + sock->statsindex[STATID_BINDFAIL]); + goto failure; + } + sock->parent->uv_handle.udp.flags = sock->uv_handle.udp.flags; + sock->parent->fd = sock->fd; + } else { + /* The socket is already bound, just copy the flags */ + sock->uv_handle.udp.flags = sock->parent->uv_handle.udp.flags; + } +#endif + #ifdef ISC_RECV_BUFFER_SIZE uv_recv_buffer_size(&sock->uv_handle.handle, &(int){ ISC_RECV_BUFFER_SIZE }); @@ -272,24 +312,11 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) { atomic_store(&sock->listening, true); - LOCK(&sock->parent->lock); - sock->parent->rchildren += 1; - if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = ISC_R_SUCCESS; - } - SIGNAL(&sock->parent->cond); - if (!atomic_load(&sock->parent->active)) { - WAIT(&sock->parent->scond, &sock->parent->lock); - } - INSIST(atomic_load(&sock->parent->active)); - UNLOCK(&sock->parent->lock); - - return; failure: - LOCK(&sock->parent->lock); + result = isc__nm_uverr2result(r); sock->parent->rchildren += 1; if (sock->parent->result == ISC_R_DEFAULT) { - sock->parent->result = isc__nm_uverr2result(r); + sock->parent->result = result; } SIGNAL(&sock->parent->cond); if (!atomic_load(&sock->parent->active)) { @@ -359,7 +386,7 @@ static void udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); - isc__nm_uvreq_t *req; + isc__nm_uvreq_t *req = NULL; uint32_t maxudp; bool free_buf; isc_sockaddr_t sockaddr; @@ -1224,7 +1251,7 @@ isc__nm_udp_cancelread(isc_nmhandle_t *handle) { void isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpcancel_t *ievent = (isc__netievent_udpcancel_t *)ev0; - isc_nmsocket_t *sock; + isc_nmsocket_t *sock = NULL; UNUSED(worker); diff --git a/lib/isc/netmgr/uverr2result.c b/lib/isc/netmgr/uverr2result.c index 3cd34a5e1f..8a5c8f6699 100644 --- a/lib/isc/netmgr/uverr2result.c +++ b/lib/isc/netmgr/uverr2result.c @@ -29,6 +29,8 @@ isc_result_t isc___nm_uverr2result(int uverr, bool dolog, const char *file, unsigned int line, const char *func) { switch (uverr) { + case 0: + return (ISC_R_SUCCESS); case UV_ENOTDIR: case UV_ELOOP: case UV_EINVAL: /* XXX sometimes this is not for files */