diff --git a/lib/isc/include/isc/quota.h b/lib/isc/include/isc/quota.h index 6f593c0ba9..44353b38c4 100644 --- a/lib/isc/include/isc/quota.h +++ b/lib/isc/include/isc/quota.h @@ -120,11 +120,13 @@ isc_quota_attach_cb(isc_quota_t *quota, isc_quota_t **p, isc_quota_cb_t *cb); * * Like isc_quota_attach(), but if there's no quota left then cb->cb_func will * be called when we are attached to quota. - * Note: It's the callee responsibility to make sure that we don't end up with - * extremely huge number of callbacks waiting - making it easy to create a - * resource exhaustion attack. For example in case of TCP listening we simply - * don't accept new connections - so the number of callbacks waiting in the - * queue is limited by listen() backlog. + * + * Note: It's the caller's responsibility to make sure that we don't end up + * with a huge number of callbacks waiting, making it easy to create a + * resource exhaustion attack. For example, in the case of TCP listening, + * we simply don't accept new connections when the quota is exceeded, so + * the number of callbacks waiting in the queue will be limited by the + * listen() backlog. * * Returns: * \li #ISC_R_SUCCESS Success diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index c825a5854d..b15e6fac29 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -131,8 +132,8 @@ typedef enum isc__netievent_type { netievent_tcprecv, netievent_tcpstartread, netievent_tcppauseread, - netievent_tcpchildlisten, - netievent_tcpchildstop, + netievent_tcpchildaccept, + netievent_tcpaccept, netievent_tcpstop, netievent_tcpclose, netievent_tcpdnsclose, @@ -211,7 +212,6 @@ typedef struct isc__netievent__socket { typedef isc__netievent__socket_t isc__netievent_udplisten_t; typedef isc__netievent__socket_t isc__netievent_udpstop_t; typedef isc__netievent__socket_t isc__netievent_tcpstop_t; -typedef isc__netievent__socket_t isc__netievent_tcpchildstop_t; typedef isc__netievent__socket_t isc__netievent_tcpclose_t; typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t; typedef isc__netievent__socket_t isc__netievent_startread_t; @@ -228,13 +228,15 @@ typedef isc__netievent__socket_req_t isc__netievent_tcpconnect_t; typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; -typedef struct isc__netievent__socket_streaminfo { +typedef struct isc__netievent__socket_streaminfo_quota { isc__netievent_type type; isc_nmsocket_t *sock; isc_uv_stream_info_t streaminfo; -} isc__netievent__socket_streaminfo_t; + isc_quota_t *quota; +} isc__netievent__socket_streaminfo_quota_t; -typedef isc__netievent__socket_streaminfo_t isc__netievent_tcpchildlisten_t; +typedef isc__netievent__socket_streaminfo_quota_t + isc__netievent_tcpchildaccept_t; typedef struct isc__netievent__socket_handle { isc__netievent_type type; @@ -242,6 +244,14 @@ typedef struct isc__netievent__socket_handle { isc_nmhandle_t *handle; } isc__netievent__socket_handle_t; +typedef struct isc__netievent__socket_quota { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc_quota_t *quota; +} isc__netievent__socket_quota_t; + +typedef isc__netievent__socket_quota_t isc__netievent_tcpaccept_t; + typedef struct isc__netievent_udpsend { isc__netievent_type type; isc_nmsocket_t *sock; @@ -261,7 +271,8 @@ typedef union { isc__netievent__socket_t nis; isc__netievent__socket_req_t nisr; isc__netievent_udpsend_t nius; - isc__netievent__socket_streaminfo_t niss; + isc__netievent__socket_quota_t nisq; + isc__netievent__socket_streaminfo_quota_t nissq; } isc__netievent_storage_t; /* @@ -324,7 +335,6 @@ typedef enum isc_nmsocket_type { isc_nm_udplistener, /* Aggregate of nm_udpsocks */ isc_nm_tcpsocket, isc_nm_tcplistener, - isc_nm_tcpchildlistener, isc_nm_tcpdnslistener, isc_nm_tcpdnssocket } isc_nmsocket_type; @@ -370,16 +380,7 @@ struct isc_nmsocket { */ isc_quota_t *quota; isc_quota_t *pquota; - - /*% - * How many connections we have not accepted due to quota? - * When we close a connection we need to accept a new one. - */ - int overquota; - /*% - * How many active connections we have? - */ - int conns; + isc_quota_cb_t quotacb; /*% * Socket statistics @@ -704,12 +705,12 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0); +isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0); +void +isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0); void -isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0); -void isc__nm_async_tcpsend(isc__networker_t *worker, isc__netievent_t *ev0); void isc__nm_async_startread(isc__networker_t *worker, isc__netievent_t *ev0); diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index 66ddeb8e9c..538022a9ae 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -603,8 +603,11 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcplisten: isc__nm_async_tcplisten(worker, ievent); break; - case netievent_tcpchildlisten: - isc__nm_async_tcpchildlisten(worker, ievent); + case netievent_tcpchildaccept: + isc__nm_async_tcpchildaccept(worker, ievent); + break; + case netievent_tcpaccept: + isc__nm_async_tcpaccept(worker, ievent); break; case netievent_tcpstartread: isc__nm_async_tcp_startread(worker, ievent); @@ -618,9 +621,6 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { case netievent_tcpstop: isc__nm_async_tcpstop(worker, ievent); break; - case netievent_tcpchildstop: - isc__nm_async_tcpchildstop(worker, ievent); - break; case netievent_tcpclose: isc__nm_async_tcpclose(worker, ievent); break; @@ -922,6 +922,7 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, sock->ah_size * sizeof(size_t)); sock->ah_handles = isc_mem_allocate( mgr->mctx, sock->ah_size * sizeof(isc_nmhandle_t *)); + ISC_LINK_INIT(&sock->quotacb, link); for (size_t i = 0; i < 32; i++) { sock->ah_frees[i] = i; sock->ah_handles[i] = NULL; @@ -939,7 +940,6 @@ isc__nmsocket_init(isc_nmsocket_t *sock, isc_nm_t *mgr, isc_nmsocket_type type, break; case isc_nm_tcpsocket: case isc_nm_tcplistener: - case isc_nm_tcpchildlistener: if (family == AF_INET) { sock->statsindex = tcp4statsindex; } else { diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 4f73620d8e..7bd91db8ce 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -68,10 +68,10 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); static void tcp_close_cb(uv_handle_t *uvhandle); -static void -stoplistening(isc_nmsocket_t *sock); static void tcp_listenclose_cb(uv_handle_t *handle); +static isc_result_t +accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota); static int tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { @@ -167,11 +167,6 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); isc__nmsocket_init(nsock, mgr, isc_nm_tcplistener, iface); - nsock->nchildren = mgr->nworkers; - atomic_init(&nsock->rchildren, mgr->nworkers); - nsock->children = isc_mem_get(mgr->mctx, - mgr->nworkers * sizeof(*nsock)); - memset(nsock->children, 0, mgr->nworkers * sizeof(*nsock)); nsock->rcb.accept = cb; nsock->rcbarg = cbarg; nsock->extrahandlesize = extrahandlesize; @@ -216,15 +211,10 @@ isc_nm_listentcp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_cb_t cb, } /* - * For multi-threaded TCP listening, we create a single "parent" socket, - * bind to it, and then pass its uv_handle to a set of child sockets, one - * per worker. For thread safety, the passing of the socket's uv_handle has - * to be done via IPC socket. - * - * This design pattern is ugly but it's what's recommended by the libuv - * documentation. (A prior version of libuv had uv_export() and - * uv_import() functions which would have simplified this greatly, but - * they have been deprecated and removed.) + * For multi-threaded TCP listening, we create a single socket, + * bind to it, and start listening. On an incoming connection we accept + * it, and then pass the accepted socket using the uv_export/uv_import + * mechanism to a child thread. */ void isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { @@ -236,24 +226,6 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->type == isc_nm_tcplistener); - /* Initialize children now to make cleaning up easier */ - for (int i = 0; i < sock->nchildren; i++) { - isc_nmsocket_t *csock = &sock->children[i]; - - isc__nmsocket_init(csock, sock->mgr, isc_nm_tcpchildlistener, - sock->iface); - csock->parent = sock; - csock->tid = i; - csock->pquota = sock->pquota; - csock->backlog = sock->backlog; - csock->extrahandlesize = sock->extrahandlesize; - - INSIST(csock->rcb.recv == NULL && csock->rcbarg == NULL); - csock->rcb.accept = sock->rcb.accept; - csock->rcbarg = sock->rcbarg; - csock->fd = -1; - } - r = uv_tcp_init(&worker->loop, &sock->uv_handle.tcp); if (r != 0) { /* It was never opened */ @@ -295,39 +267,25 @@ isc__nm_async_tcplisten(isc__networker_t *worker, isc__netievent_t *ev0) { goto done; } - uv_handle_set_data(&sock->uv_handle.handle, sock); - /* - * For each worker, we send a 'tcpchildlisten' event with - * the exported socket. + * The callback will run in the same thread uv_listen() was called + * from, so a race with tcp_connection_cb() isn't possible. */ - for (int i = 0; i < sock->nchildren; i++) { - isc_nmsocket_t *csock = &sock->children[i]; - isc__netievent_tcpchildlisten_t *event = NULL; - - event = isc__nm_get_ievent(csock->mgr, - netievent_tcpchildlisten); - r = isc_uv_export(&sock->uv_handle.stream, &event->streaminfo); - if (r != 0) { - isc_log_write( - isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, - "uv_export failed: %s", - isc_result_totext(isc__nm_uverr2result(r))); - isc__nm_put_ievent(sock->mgr, event); - continue; - } - event->sock = csock; - if (csock->tid == isc_nm_tid()) { - isc__nm_async_tcpchildlisten(&sock->mgr->workers[i], - (isc__netievent_t *)event); - isc__nm_put_ievent(sock->mgr, event); - } else { - isc__nm_enqueue_ievent(&sock->mgr->workers[i], - (isc__netievent_t *)event); - } + r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog, + tcp_connection_cb); + if (r != 0) { + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "uv_listen failed: %s", + isc_result_totext(isc__nm_uverr2result(r))); + uv_close(&sock->uv_handle.handle, tcp_close_cb); + sock->result = isc__nm_uverr2result(r); + atomic_store(&sock->listen_error, true); + goto done; } + uv_handle_set_data(&sock->uv_handle.handle, sock); + atomic_store(&sock->listening, true); done: @@ -337,43 +295,113 @@ done: return; } -/* - * Connect to the parent socket and be ready to receive the uv_handle - * for the socket we'll be listening on. - */ +static void +tcp_connection_cb(uv_stream_t *server, int status) { + isc_nmsocket_t *psock = uv_handle_get_data((uv_handle_t *)server); + isc_result_t result; + + UNUSED(status); + + result = accept_connection(psock, NULL); + if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) { + if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) || + can_log_tcp_quota()) { + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, + ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, + "TCP connection failed: %s", + isc_result_totext(result)); + } + } +} + void -isc__nm_async_tcpchildlisten(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpchildlisten_t *ievent = - (isc__netievent_tcpchildlisten_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; +isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_tcpchildaccept_t *ievent = + (isc__netievent_tcpchildaccept_t *)ev0; + isc_nmsocket_t *ssock = ievent->sock; + isc_nmsocket_t *csock = NULL; + isc_nmhandle_t *handle; + isc_result_t result; + struct sockaddr_storage ss; + isc_sockaddr_t local; int r; REQUIRE(isc__nm_in_netthread()); - REQUIRE(sock->type == isc_nm_tcpchildlistener); + REQUIRE(ssock->type == isc_nm_tcplistener); - worker = &sock->mgr->workers[isc_nm_tid()]; + csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); + isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); + csock->tid = isc_nm_tid(); + csock->extrahandlesize = ssock->extrahandlesize; - uv_tcp_init(&worker->loop, (uv_tcp_t *)&sock->uv_handle.tcp); - uv_handle_set_data(&sock->uv_handle.handle, sock); - r = isc_uv_import(&sock->uv_handle.stream, &ievent->streaminfo); + csock->quota = ievent->quota; + ievent->quota = NULL; + + worker = &ssock->mgr->workers[isc_nm_tid()]; + uv_tcp_init(&worker->loop, &csock->uv_handle.tcp); + + r = isc_uv_import(&csock->uv_handle.stream, &ievent->streaminfo); if (r != 0) { isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, "uv_import failed: %s", isc_result_totext(isc__nm_uverr2result(r))); - return; + result = isc__nm_uverr2result(r); + goto error; } - r = uv_listen((uv_stream_t *)&sock->uv_handle.tcp, sock->backlog, - tcp_connection_cb); - + r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + &(int){ sizeof(ss) }); if (r != 0) { - isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, ISC_LOG_ERROR, - "uv_listen failed: %s", - isc_result_totext(isc__nm_uverr2result(r))); - return; + result = isc__nm_uverr2result(r); + goto error; } + + result = isc_sockaddr_fromsockaddr(&csock->peer, + (struct sockaddr *)&ss); + if (result != ISC_R_SUCCESS) { + goto error; + } + + r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss, + &(int){ sizeof(ss) }); + if (r != 0) { + result = isc__nm_uverr2result(r); + goto error; + } + + result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss); + if (result != ISC_R_SUCCESS) { + goto error; + } + + isc_nmsocket_attach(ssock, &csock->server); + + handle = isc__nmhandle_get(csock, NULL, &local); + + INSIST(ssock->rcb.accept != NULL); + csock->read_timeout = ssock->mgr->init; + ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg); + isc_nmsocket_detach(&csock); + return; + +error: + /* + * Detach the quota early to make room for other connections; + * otherwise it'd be detached later asynchronously, and clog + * the quota unnecessarily. + */ + if (csock->quota != NULL) { + isc_quota_detach(&csock->quota); + } + isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR, + ISC_LOG_ERROR, "Accepting TCP connection failed: %s", + isc_result_totext(result)); + + /* + * Detach the socket properly to make sure uv_close() is called. + */ + isc_nmsocket_detach(&csock); } void @@ -411,83 +439,24 @@ isc__nm_async_tcpstop(isc__networker_t *worker, isc__netievent_t *ev0) { isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], (isc__netievent_t *)event); } else { - stoplistening(sock); + uv_close((uv_handle_t *)&sock->uv_handle.tcp, + tcp_listenclose_cb); isc__nm_drop_interlocked(sock->mgr); } } -static void -stoplistening(isc_nmsocket_t *sock) { - for (int i = 0; i < sock->nchildren; i++) { - isc__netievent_tcpchildstop_t *event = NULL; - - /* - * We can ignore the overhead of event allocation because - * stoplistening is a rare event, and doing it this way - * simplifies sock reference counting. - */ - event = isc__nm_get_ievent(sock->mgr, netievent_tcpchildstop); - isc_nmsocket_attach(&sock->children[i], &event->sock); - - if (isc_nm_tid() == sock->children[i].tid) { - isc__nm_async_tcpchildstop(&sock->mgr->workers[i], - (isc__netievent_t *)event); - isc__nm_put_ievent(sock->mgr, event); - } else { - isc__nm_enqueue_ievent(&sock->mgr->workers[i], - (isc__netievent_t *)event); - } - } - - LOCK(&sock->lock); - while (atomic_load_relaxed(&sock->rchildren) > 0) { - WAIT(&sock->cond, &sock->lock); - } - UNLOCK(&sock->lock); - uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb); -} - -void -isc__nm_async_tcpchildstop(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_tcpchildstop_t *ievent = - (isc__netievent_tcpchildstop_t *)ev0; - isc_nmsocket_t *sock = ievent->sock; - - UNUSED(worker); - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(isc_nm_tid() == sock->tid); - REQUIRE(sock->type == isc_nm_tcpchildlistener); - REQUIRE(sock->parent != NULL); - - /* - * rchildren is atomic, but we still need to change it - * under a lock because the parent is waiting on conditional - * and without it we might deadlock. - */ - LOCK(&sock->parent->lock); - atomic_fetch_sub(&sock->parent->rchildren, 1); - UNLOCK(&sock->parent->lock); - - uv_close((uv_handle_t *)&sock->uv_handle.tcp, tcp_listenclose_cb); - BROADCAST(&sock->parent->cond); -} - /* - * This callback is used for closing both child and parent listening - * sockets; that's why we need to choose the proper lock. + * This callback is used for closing listening sockets. */ static void tcp_listenclose_cb(uv_handle_t *handle) { isc_nmsocket_t *sock = uv_handle_get_data(handle); - isc_mutex_t *lock = ((sock->parent != NULL) ? &sock->parent->lock - : &sock->lock); - LOCK(lock); + LOCK(&sock->lock); atomic_store(&sock->closed, true); atomic_store(&sock->listening, false); sock->pquota = NULL; - UNLOCK(lock); + UNLOCK(&sock->lock); isc_nmsocket_detach(&sock); } @@ -683,130 +652,34 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { */ } -static isc_result_t -accept_connection(isc_nmsocket_t *ssock) { - isc_result_t result; - isc_quota_t *quota = NULL; - isc_nmsocket_t *csock = NULL; - isc__networker_t *worker = NULL; - isc_nmhandle_t *handle = NULL; - struct sockaddr_storage ss; - isc_sockaddr_t local; - int r; - bool overquota = false; +static void +quota_accept_cb(isc_quota_t *quota, void *sock0) { + isc_nmsocket_t *sock = (isc_nmsocket_t *)sock0; + isc__netievent_tcpaccept_t *ievent = NULL; - REQUIRE(VALID_NMSOCK(ssock)); - REQUIRE(ssock->tid == isc_nm_tid()); + REQUIRE(VALID_NMSOCK(sock)); - if (!atomic_load_relaxed(&ssock->active) || - atomic_load_relaxed(&ssock->mgr->closing)) - { - /* We're closing, bail */ - return (ISC_R_CANCELED); - } - - if (ssock->pquota != NULL) { - result = isc_quota_attach(ssock->pquota, "a); - - /* - * We share the quota between all TCP sockets. Others - * may have used up all the quota slots, in which case - * this socket could starve. So we only fail here if we - * already had at least one active connection on this - * socket. This guarantees that we'll maintain some level - * of service while over quota, and will resume normal - * service when the quota comes back down. - */ - if (result != ISC_R_SUCCESS) { - ssock->overquota++; - overquota = true; - if (ssock->conns > 0) { - isc__nm_incstats( - ssock->mgr, - ssock->statsindex[STATID_ACCEPTFAIL]); - return (result); - } - } - } - - isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]); - - csock = isc_mem_get(ssock->mgr->mctx, sizeof(isc_nmsocket_t)); - isc__nmsocket_init(csock, ssock->mgr, isc_nm_tcpsocket, ssock->iface); - csock->tid = isc_nm_tid(); - csock->extrahandlesize = ssock->extrahandlesize; - csock->quota = quota; - quota = NULL; - - worker = &ssock->mgr->workers[isc_nm_tid()]; - uv_tcp_init(&worker->loop, &csock->uv_handle.tcp); - - r = uv_accept(&ssock->uv_handle.stream, &csock->uv_handle.stream); - if (r != 0) { - result = isc__nm_uverr2result(r); - goto error; - } - - r = uv_tcp_getpeername(&csock->uv_handle.tcp, (struct sockaddr *)&ss, - &(int){ sizeof(ss) }); - if (r != 0) { - result = isc__nm_uverr2result(r); - goto error; - } - - result = isc_sockaddr_fromsockaddr(&csock->peer, - (struct sockaddr *)&ss); - if (result != ISC_R_SUCCESS) { - goto error; - } - - r = uv_tcp_getsockname(&csock->uv_handle.tcp, (struct sockaddr *)&ss, - &(int){ sizeof(ss) }); - if (r != 0) { - result = isc__nm_uverr2result(r); - goto error; - } - result = isc_sockaddr_fromsockaddr(&local, (struct sockaddr *)&ss); - if (result != ISC_R_SUCCESS) { - goto error; - } - - isc_nmsocket_attach(ssock, &csock->server); - ssock->conns++; - - handle = isc__nmhandle_get(csock, NULL, &local); - - INSIST(ssock->rcb.accept != NULL); - csock->read_timeout = ssock->mgr->init; - ssock->rcb.accept(handle, ISC_R_SUCCESS, ssock->rcbarg); - isc_nmsocket_detach(&csock); - - return (ISC_R_SUCCESS); - -error: /* - * Detach it early to make room for other connections, otherwise - * it'd be detached later asynchronously clogging the quota. + * Create a tcpaccept event and pass it using the async channel. */ - if (csock->quota != NULL) { - isc_quota_detach(&csock->quota); - } - if (overquota) { - ssock->overquota--; - } - /* We need to detach it properly to make sure uv_close is called. */ - isc_nmsocket_detach(&csock); - return (result); + ievent = isc__nm_get_ievent(sock->mgr, netievent_tcpaccept); + ievent->sock = sock; + ievent->quota = quota; + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); } -static void -tcp_connection_cb(uv_stream_t *server, int status) { - isc_nmsocket_t *ssock = uv_handle_get_data((uv_handle_t *)server); +/* + * This is called after we get a quota_accept_cb() callback. + */ +void +isc__nm_async_tcpaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc_result_t result; + isc__netievent_tcpaccept_t *ievent = (isc__netievent_tcpaccept_t *)ev0; - UNUSED(status); + REQUIRE(worker->id == ievent->sock->tid); - result = accept_connection(ssock); + result = accept_connection(ievent->sock, ievent->quota); if (result != ISC_R_SUCCESS && result != ISC_R_NOCONN) { if ((result != ISC_R_QUOTA && result != ISC_R_SOFTQUOTA) || can_log_tcp_quota()) { @@ -816,6 +689,112 @@ tcp_connection_cb(uv_stream_t *server, int status) { isc_result_totext(result)); } } + + /* + * The socket was attached just before we called isc_quota_attach_cb(). + */ + isc_nmsocket_detach(&ievent->sock); +} + +/* + * Close callback for uv_tcp_t strutures created in accept_connection(). + */ +static void +free_uvtcpt(uv_handle_t *uvs) { + isc_mem_t *mctx = (isc_mem_t *)uv_handle_get_data(uvs); + isc_mem_putanddetach(&mctx, uvs, sizeof(uv_tcp_t)); +} + +static isc_result_t +accept_connection(isc_nmsocket_t *ssock, isc_quota_t *quota) { + isc_result_t result; + isc__netievent_tcpchildaccept_t *event = NULL; + isc__networker_t *worker = NULL; + uv_tcp_t *uvstream = NULL; + isc_mem_t *mctx = NULL; + int r, w; + + REQUIRE(VALID_NMSOCK(ssock)); + REQUIRE(ssock->tid == isc_nm_tid()); + + if (!atomic_load_relaxed(&ssock->active) || + atomic_load_relaxed(&ssock->mgr->closing)) + { + /* We're closing, bail */ + if (quota != NULL) { + isc_quota_detach("a); + } + return (ISC_R_CANCELED); + } + + /* We can be called directly or as a callback from quota */ + if (ssock->pquota != NULL && quota == NULL) { + /* + * We need to attach to ssock, because it might be queued + * waiting for a TCP quota slot. If so, then we'll detach it + * later when the connection is accepted. (XXX: This may be + * suboptimal, it might be better to attach unless + * we need to.) + */ + isc_nmsocket_t *tsock = NULL; + isc_nmsocket_attach(ssock, &tsock); + isc_quota_cb_init(&ssock->quotacb, quota_accept_cb, tsock); + result = isc_quota_attach_cb(ssock->pquota, "a, + &ssock->quotacb); + if (result == ISC_R_QUOTA) { + isc__nm_incstats(ssock->mgr, + ssock->statsindex[STATID_ACCEPTFAIL]); + return (result); + } + + /* + * We're under quota, so there's no need to wait; + * clear the quota callback and and detach the socket. + */ + isc_quota_cb_init(&ssock->quotacb, NULL, NULL); + isc_nmsocket_detach(&tsock); + } + + isc__nm_incstats(ssock->mgr, ssock->statsindex[STATID_ACCEPT]); + + worker = &ssock->mgr->workers[isc_nm_tid()]; + uvstream = isc_mem_get(ssock->mgr->mctx, sizeof(uv_tcp_t)); + + isc_mem_attach(ssock->mgr->mctx, &mctx); + uv_handle_set_data((uv_handle_t *)uvstream, mctx); + mctx = NULL; /* Detached later in free_uvtcpt() */ + + uv_tcp_init(&worker->loop, uvstream); + + r = uv_accept(&ssock->uv_handle.stream, (uv_stream_t *)uvstream); + if (r != 0) { + result = isc__nm_uverr2result(r); + uv_close((uv_handle_t *)uvstream, free_uvtcpt); + isc_quota_detach("a); + return (result); + } + + /* We have an accepted TCP socket, pass it to a random worker */ + w = isc_random_uniform(ssock->mgr->nworkers); + event = isc__nm_get_ievent(ssock->mgr, netievent_tcpchildaccept); + event->sock = ssock; + event->quota = quota; + + r = isc_uv_export((uv_stream_t *)uvstream, &event->streaminfo); + RUNTIME_CHECK(r == 0); + + uv_close((uv_handle_t *)uvstream, free_uvtcpt); + + if (w == isc_nm_tid()) { + isc__nm_async_tcpchildaccept(&ssock->mgr->workers[w], + (isc__netievent_t *)event); + isc__nm_put_ievent(ssock->mgr, event); + } else { + isc__nm_enqueue_ievent(&ssock->mgr->workers[w], + (isc__netievent_t *)event); + } + + return (ISC_R_SUCCESS); } isc_result_t @@ -943,30 +922,9 @@ tcp_close_direct(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_tcpsocket); - isc_nmsocket_t *ssock = sock->server; - if (sock->quota != NULL) { isc_quota_detach(&sock->quota); } - if (ssock != NULL) { - ssock->conns--; - while (ssock->conns == 0 && ssock->overquota > 0) { - ssock->overquota--; - isc_result_t result = accept_connection(ssock); - if (result == ISC_R_SUCCESS || result == ISC_R_NOCONN) { - continue; - } - if ((result != ISC_R_QUOTA && - result != ISC_R_SOFTQUOTA) || - can_log_tcp_quota()) { - isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, - ISC_LOGMODULE_NETMGR, - ISC_LOG_ERROR, - "TCP connection failed: %s", - isc_result_totext(result)); - } - } - } if (sock->timer_initialized) { sock->timer_initialized = false; uv_timer_stop(&sock->timer);