diff --git a/lib/isc/netmgr/netmgr-int.h b/lib/isc/netmgr/netmgr-int.h index 5b63dfc98d..4b56b1b772 100644 --- a/lib/isc/netmgr/netmgr-int.h +++ b/lib/isc/netmgr/netmgr-int.h @@ -172,6 +172,11 @@ typedef enum isc__netievent_type { netievent_stop, netievent_pause, + netievent_connectcb, + netievent_acceptcb, + netievent_readcb, + netievent_sendcb, + netievent_prio = 0xff, /* event type values higher than this * will be treated as high-priority * events, which can be processed @@ -187,6 +192,7 @@ typedef union { isc_nm_recv_cb_t recv; isc_nm_cb_t send; isc_nm_cb_t connect; + isc_nm_accept_cb_t accept; } isc__nm_cb_t; /* @@ -260,6 +266,18 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t; +typedef struct isc__netievent__socket_req_result { + isc__netievent_type type; + isc_nmsocket_t *sock; + isc__nm_uvreq_t *req; + isc_result_t result; +} isc__netievent__socket_req_result_t; + +typedef isc__netievent__socket_req_result_t isc__netievent_connectcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_acceptcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_readcb_t; +typedef isc__netievent__socket_req_result_t isc__netievent_sendcb_t; + typedef struct isc__netievent__socket_streaminfo_quota { isc__netievent_type type; isc_nmsocket_t *sock; @@ -746,6 +764,48 @@ isc__nmsocket_clearcb(isc_nmsocket_t *sock); * Clear the recv and accept callbacks in 'sock'. */ +void +isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a connect callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + +void +isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a accept callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + +void +isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0); + +/*%< + * Issue a read callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + * + */ + +void +isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult); +void +isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0); +/*%< + * Issue a write callback on the socket, used to call the callback + * on failed conditions when the event can't be scheduled on the uv loop. + */ + void isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); /*%< diff --git a/lib/isc/netmgr/netmgr.c b/lib/isc/netmgr/netmgr.c index b7a8a29d8c..632b636733 100644 --- a/lib/isc/netmgr/netmgr.c +++ b/lib/isc/netmgr/netmgr.c @@ -700,9 +700,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) { isc__nm_async_tls_do_bio(worker, ievent); break; + case netievent_connectcb: + isc__nm_async_connectcb(worker, ievent); + break; + case netievent_acceptcb: + isc__nm_async_acceptcb(worker, ievent); + break; + case netievent_readcb: + isc__nm_async_readcb(worker, ievent); + break; + case netievent_sendcb: + isc__nm_async_sendcb(worker, ievent); + break; case netievent_closecb: isc__nm_async_closecb(worker, ievent); break; + case netievent_detach: isc__nm_async_detach(worker, ievent); break; @@ -1645,17 +1658,194 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) { } void -isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { - isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0; +isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_connectcb_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_connectcb); - REQUIRE(VALID_NMSOCK(ievent->sock)); - REQUIRE(ievent->sock->tid == isc_nm_tid()); - REQUIRE(ievent->sock->closehandle_cb != NULL); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_connectcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; UNUSED(worker); - ievent->sock->closehandle_cb(ievent->sock); - isc__nmsocket_detach(&ievent->sock); + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(ievent->sock->tid == isc_nm_tid()); + REQUIRE(uvreq->cb.connect != NULL); + + uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_acceptcb_t *ievent = + isc__nm_get_ievent(sock->mgr, netievent_acceptcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_acceptcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_acceptcb_t *ievent = (isc__netievent_acceptcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(uvreq->cb.accept != NULL); + + uvreq->cb.accept(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_readcb_t *ievent = isc__nm_get_ievent(sock->mgr, + netievent_readcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_readcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base, + .length = uvreq->uvbuf.len }; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + + uvreq->cb.recv(uvreq->handle, eresult, ®ion, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq, + isc_result_t eresult) { + isc__netievent_sendcb_t *ievent = isc__nm_get_ievent(sock->mgr, + netievent_sendcb); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + + ievent->sock = sock; + ievent->req = uvreq; + ievent->result = eresult; + + if (eresult == ISC_R_SUCCESS) { + isc__nm_async_sendcb(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + isc__nm_put_ievent(sock->mgr, ievent); + } else { + isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid], + (isc__netievent_t *)ievent); + } +} + +void +isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + isc__nm_uvreq_t *uvreq = ievent->req; + isc_result_t eresult = ievent->result; + + UNUSED(worker); + + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(uvreq)); + REQUIRE(VALID_NMHANDLE(uvreq->handle)); + REQUIRE(sock->tid == isc_nm_tid()); + + uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg); + + isc__nm_uvreq_put(&uvreq, sock); +} + +void +isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { + isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0; + isc_nmsocket_t *sock = ievent->sock; + + REQUIRE(VALID_NMSOCK(ievent->sock)); + REQUIRE(sock->tid == isc_nm_tid()); + REQUIRE(sock->closehandle_cb != NULL); + + UNUSED(worker); + + ievent->sock->closehandle_cb(sock); + isc__nmsocket_detach(&sock); } void diff --git a/lib/isc/netmgr/tcp.c b/lib/isc/netmgr/tcp.c index 83236e4aeb..4997d6f8de 100644 --- a/lib/isc/netmgr/tcp.c +++ b/lib/isc/netmgr/tcp.c @@ -49,7 +49,7 @@ can_log_tcp_quota(void) { return (false); } -static int +static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); static void @@ -80,6 +80,17 @@ quota_accept_cb(isc_quota_t *quota, void *sock0); static void failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); + +static bool +inactive(isc_nmsocket_t *sock) { + return (!isc__nmsocket_active(sock) || + atomic_load(&sock->mgr->closing) || + (sock->server != NULL && !isc__nmsocket_active(sock->server))); +} + static void failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { /* @@ -126,20 +137,17 @@ failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, } if (!atomic_load(&sock->connecting)) { + isc__nm_uvreq_put(&req, sock); return; } - atomic_store(&sock->connecting, false); isc__nmsocket_clearcb(sock); if (req->cb.connect != NULL) { - req->cb.connect(NULL, eresult, req->cbarg); + isc__nm_connectcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } - req->cb.connect = NULL; - req->cbarg = NULL; - - isc__nm_uvreq_put(&req, sock); - isc__nmsocket_detach(&sock); } static void @@ -147,16 +155,22 @@ connecttimeout_cb(uv_timer_t *handle) { isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle); isc_nmsocket_t *sock = req->sock; + REQUIRE(VALID_UVREQ(req)); + REQUIRE(VALID_NMHANDLE(req->handle)); REQUIRE(sock->tid == isc_nm_tid()); failed_connect_cb(sock, req, ISC_R_TIMEDOUT); + isc__nmsocket_detach(&sock); } -static int +static isc_result_t tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int r; + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); @@ -169,11 +183,8 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); atomic_store(&sock->closing, true); atomic_store(&sock->closed, true); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); - return (r); + return (isc__nm_uverr2result(r)); } if (req->local.length != 0) { @@ -181,12 +192,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_tcp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } } @@ -203,12 +211,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { if (r != 0) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECTFAIL]); - atomic_store(&sock->result, isc__nm_uverr2result(r)); - atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_tcp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); @@ -216,7 +221,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { 0); sock->timer_running = true; - return (0); + return (ISC_R_SUCCESS); } void @@ -225,22 +230,28 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { (isc__netievent_tcpconnect_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; - int r; - - REQUIRE(VALID_NMSOCK(sock)); - REQUIRE(sock->tid == isc_nm_tid()); + isc_result_t result = ISC_R_SUCCESS; UNUSED(worker); - r = tcp_connect_direct(sock, req); - if (r != 0) { - LOCK(&sock->lock); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - return; - } + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(sock->type == isc_nm_tcpsocket); + REQUIRE(sock->iface != NULL); + REQUIRE(sock->parent == NULL); + REQUIRE(sock->tid == isc_nm_tid()); - atomic_store(&sock->connected, true); + req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); + result = tcp_connect_direct(sock, req); + atomic_store(&sock->result, result); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->connected, true); + /* uvreq will be freed in tcp_connect_cb */ + /* socket will be detached in tcp_connect_cb */ + } else { + atomic_store(&sock->connect_error, true); + isc__nm_uvreq_put(&req, sock); + isc__nmsocket_detach(&ievent->sock); + } LOCK(&sock->lock); SIGNAL(&sock->cond); @@ -250,28 +261,32 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { static void tcp_connect_cb(uv_connect_t *uvreq, int status) { isc_result_t result; - isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq); + isc__nm_uvreq_t *req = NULL; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); struct sockaddr_storage ss; - isc_nmhandle_t *handle = NULL; int r; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); + /* We timed out */ + if (!atomic_load(&sock->connecting)) { + return; + } + + req = uv_handle_get_data((uv_handle_t *)uvreq); + + REQUIRE(VALID_UVREQ(req)); + REQUIRE(VALID_NMHANDLE(req->handle)); + if (sock->timer_running) { uv_timer_stop(&sock->timer); sock->timer_running = false; } - if (!atomic_load(&sock->connecting)) { - return; - } - - REQUIRE(VALID_UVREQ(req)); - if (status != 0) { failed_connect_cb(sock, req, isc__nm_uverr2result(status)); + isc__nmsocket_detach(&sock); return; } @@ -280,6 +295,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { &(int){ sizeof(ss) }); if (r != 0) { failed_connect_cb(sock, req, isc__nm_uverr2result(r)); + isc__nmsocket_detach(&sock); return; } @@ -288,21 +304,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) { result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); RUNTIME_CHECK(result == ISC_R_SUCCESS); - handle = isc__nmhandle_get(sock, NULL, NULL); - req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg); - - isc__nm_uvreq_put(&req, sock); + isc__nm_connectcb(sock, req, ISC_R_SUCCESS); /* * The sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - - /* - * The connect callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); } isc_result_t @@ -310,7 +317,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { isc_result_t result = ISC_R_SUCCESS; - isc_nmsocket_t *nsock = NULL, *tmp = NULL; + isc_nmsocket_t *sock = NULL, *tmp = NULL; isc__netievent_tcpconnect_t *ievent = NULL; isc__nm_uvreq_t *req = NULL; @@ -318,50 +325,50 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, REQUIRE(local != NULL); REQUIRE(peer != NULL); - nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); - isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); + sock = isc_mem_get(mgr->mctx, sizeof(*sock)); + isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local); - nsock->extrahandlesize = extrahandlesize; - nsock->connect_timeout = timeout; + sock->extrahandlesize = extrahandlesize; + sock->connect_timeout = timeout; - atomic_init(&nsock->result, ISC_R_SUCCESS); - atomic_init(&nsock->client, true); + atomic_init(&sock->result, ISC_R_SUCCESS); + atomic_init(&sock->client, true); - req = isc__nm_uvreq_get(mgr, nsock); + req = isc__nm_uvreq_get(mgr, sock); req->cb.connect = cb; req->cbarg = cbarg; req->peer = peer->addr; req->local = local->addr; ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect); - ievent->sock = nsock; + ievent->sock = sock; ievent->req = req; /* * Async callbacks can dereference the socket in the meantime, * we need to hold an additional reference to it. */ - isc__nmsocket_attach(nsock, &tmp); + isc__nmsocket_attach(sock, &tmp); if (isc__nm_in_netthread()) { - nsock->tid = isc_nm_tid(); - isc__nm_async_tcpconnect(&mgr->workers[nsock->tid], + sock->tid = isc_nm_tid(); + isc__nm_async_tcpconnect(&mgr->workers[sock->tid], (isc__netievent_t *)ievent); isc__nm_put_ievent(mgr, ievent); } else { - nsock->tid = isc_random_uniform(mgr->nworkers); - isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], + sock->tid = isc_random_uniform(mgr->nworkers); + isc__nm_enqueue_ievent(&mgr->workers[sock->tid], (isc__netievent_t *)ievent); - LOCK(&nsock->lock); - while (!atomic_load(&nsock->connected) && - !atomic_load(&nsock->connect_error)) { - WAIT(&nsock->cond, &nsock->lock); + LOCK(&sock->lock); + while (!atomic_load(&sock->connected) && + !atomic_load(&sock->connect_error)) { + WAIT(&sock->cond, &sock->lock); } - UNLOCK(&nsock->lock); + UNLOCK(&sock->lock); } - result = atomic_load(&nsock->result); + result = atomic_load(&sock->result); isc__nmsocket_detach(&tmp); @@ -581,13 +588,11 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_tcpchildaccept_t *ievent = (isc__netievent_tcpchildaccept_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - isc_nmhandle_t *handle; isc_result_t result; + isc__nm_uvreq_t *req = NULL; struct sockaddr_storage ss; isc_sockaddr_t local; int r; - isc_nm_accept_cb_t accept_cb; - void *accept_cbarg; REQUIRE(isc__nm_in_netthread()); REQUIRE(sock->tid == isc_nm_tid()); @@ -650,25 +655,22 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) { } sock->accepting = false; - handle = isc__nmhandle_get(sock, NULL, &local); - INSIST(sock->accept_cb != NULL); - accept_cb = sock->accept_cb; - accept_cbarg = sock->accept_cbarg; sock->read_timeout = sock->mgr->init; - accept_cb(handle, ISC_R_SUCCESS, accept_cbarg); + + req = isc__nm_uvreq_get(sock->mgr, sock); + req->handle = isc__nmhandle_get(sock, NULL, &local); + req->cb.accept = sock->accept_cb; + req->cbarg = sock->accept_cbarg; + + isc__nm_acceptcb(sock, req, ISC_R_SUCCESS); /* * sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - /* - * The accept callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); return; error: @@ -733,9 +735,6 @@ tcp_listenclose_cb(uv_handle_t *handle) { static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - isc_nm_recv_cb_t cb; - void *cbarg = NULL; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); @@ -750,12 +749,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { uv_read_stop(&sock->uv_handle.stream); - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock); + isc_nmhandle_attach(sock->statichandle, &req->handle); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); + isc__nmsocket_clearcb(sock); + + isc__nm_readcb(sock, req, result); + } +} + +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + + if (req->cb.send != NULL) { + isc__nm_sendcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } } @@ -790,29 +805,17 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMSOCK(handle->sock)); - if (!isc__nmsocket_active(sock)) { + sock->recv_cb = cb; + sock->recv_cbarg = cbarg; + + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); + failed_read_cb(sock, ISC_R_CANCELED); return; } REQUIRE(sock->tid == isc_nm_tid()); - sock->recv_cb = cb; - sock->recv_cbarg = cbarg; - sock->read_timeout = (atomic_load(&sock->keepalive) ? sock->mgr->keepalive : sock->mgr->idle); @@ -864,17 +867,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(worker->id == isc_nm_tid()); - if (!isc__nmsocket_active(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -982,13 +975,22 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) { REQUIRE(buf != NULL); if (nread >= 0) { - isc_region_t region = { .base = (unsigned char *)buf->base, - .length = nread }; - isc_nm_recv_cb_t cb = sock->recv_cb; - void *cbarg = sock->recv_cbarg; + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, + sock); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; + isc_nmhandle_attach(sock->statichandle, &req->handle); - if (cb != NULL) { - cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); + /* + * The callback will be called synchronously because the + * result is ISC_R_SUCCESS, so we don't need to retain + * the buffer + */ + req->uvbuf.base = buf->base; + req->uvbuf.len = nread; + + isc__nm_readcb(sock, req, ISC_R_SUCCESS); } if (sock->timer_initialized && sock->read_timeout != 0) { @@ -1183,24 +1185,6 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, REQUIRE(sock->type == isc_nm_tcpsocket); - if (!isc__nmsocket_active(sock)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } - uvreq = isc__nm_uvreq_get(sock->mgr, sock); uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.len = region->length; @@ -1210,6 +1194,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, uvreq->cb.send = cb; uvreq->cbarg = cbarg; + if (inactive(sock)) { + isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); + return; + } + if (sock->tid == isc_nm_tid()) { /* * If we're in the same thread as the socket we can send the @@ -1219,8 +1209,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, result); } } else { /* @@ -1284,13 +1273,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_tcpsocket); - if (!isc__nmsocket_active(sock)) { - return (ISC_R_CANCELED); - } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - return (ISC_R_CANCELED); - } - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { return (ISC_R_CANCELED); } @@ -1408,11 +1391,6 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) { } if (atomic_load(&sock->connecting)) { - if (sock->timer_initialized) { - isc__nm_uvreq_t *req = - uv_handle_get_data((uv_handle_t *)&sock->timer); - failed_connect_cb(sock, req, ISC_R_CANCELED); - } return; } diff --git a/lib/isc/netmgr/tcpdns.c b/lib/isc/netmgr/tcpdns.c index 2a1596d0e8..f742c64a65 100644 --- a/lib/isc/netmgr/tcpdns.c +++ b/lib/isc/netmgr/tcpdns.c @@ -787,11 +787,6 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); - if (result != ISC_R_SUCCESS) { - cb(NULL, result, cbarg); - return; - } - dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock)); isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket, handle->sock->iface); @@ -807,6 +802,13 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) { readhandle = isc__nmhandle_get(dnssock, NULL, NULL); + if (result != ISC_R_SUCCESS) { + cb(readhandle, result, cbarg); + isc__nmsocket_detach(&dnssock); + isc_nmhandle_detach(&readhandle); + return; + } + INSIST(dnssock->statichandle != NULL); INSIST(dnssock->statichandle == readhandle); INSIST(readhandle->sock == dnssock); @@ -838,20 +840,26 @@ isc_result_t isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { - tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); + isc_result_t result = ISC_R_SUCCESS; + tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(*conn)); *conn = (tcpconnect_t){ .cb = cb, .cbarg = cbarg, .extrahandlesize = extrahandlesize }; isc_mem_attach(mgr->mctx, &conn->mctx); - return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, - timeout, 0)); + result = isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, + timeout, 0); + if (result != ISC_R_SUCCESS) { + isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); + } + return (result); } isc_result_t isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_cb_t cb, void *cbarg, unsigned int timeout, size_t extrahandlesize) { + isc_result_t result = ISC_R_SUCCESS; tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); SSL_CTX *ctx = NULL; @@ -861,9 +869,12 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_mem_attach(mgr->mctx, &conn->mctx); ctx = SSL_CTX_new(SSLv23_client_method()); - isc_result_t result = isc_nm_tlsconnect( - mgr, local, peer, tcpdnsconnect_cb, conn, ctx, timeout, 0); + result = isc_nm_tlsconnect(mgr, local, peer, tcpdnsconnect_cb, conn, + ctx, timeout, 0); SSL_CTX_free(ctx); + if (result != ISC_R_SUCCESS) { + isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); + } return (result); } diff --git a/lib/isc/netmgr/tls.c b/lib/isc/netmgr/tls.c index acaaeb1ad5..fdface5aa2 100644 --- a/lib/isc/netmgr/tls.c +++ b/lib/isc/netmgr/tls.c @@ -79,6 +79,7 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) { UNUSED(handle); /* XXXWPK TODO */ UNUSED(eresult); + isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base, sock->tls.senddata.length); sock->tls.senddata = (isc_region_t){ NULL, 0 }; @@ -701,7 +702,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { REQUIRE(VALID_NMSOCK(tlssock)); if (result != ISC_R_SUCCESS) { - tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); + tlssock->connect_cb(handle, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); tls_close_direct(tlssock); @@ -714,7 +715,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) { isc_nmhandle_attach(handle, &tlssock->outerhandle); result = initialize_tls(tlssock, false); if (result != ISC_R_SUCCESS) { - tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); + tlssock->connect_cb(handle, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); tls_close_direct(tlssock); @@ -742,6 +743,7 @@ isc__nm_async_tlsconnect(isc__networker_t *worker, isc__netievent_t *ev0) { tls_connect_cb, tlssock, tlssock->connect_timeout, 0); if (result != ISC_R_SUCCESS) { + /* FIXME: We need to pass valid handle */ tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); atomic_store(&tlssock->result, result); atomic_store(&tlssock->connect_error, true); diff --git a/lib/isc/netmgr/udp.c b/lib/isc/netmgr/udp.c index 9f8a37340e..ecaf66046a 100644 --- a/lib/isc/netmgr/udp.c +++ b/lib/isc/netmgr/udp.c @@ -51,8 +51,15 @@ static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult); +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult); + +static bool +inactive(isc_nmsocket_t *sock) { + return (!isc__nmsocket_active(sock) || + atomic_load(&sock->mgr->closing) || + (sock->server != NULL && !isc__nmsocket_active(sock->server))); +} isc_result_t isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, @@ -339,14 +346,11 @@ static void udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) { isc_result_t result; - isc_nmhandle_t *nmhandle = NULL; isc_sockaddr_t sockaddr; isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); - isc_region_t region; + isc__nm_uvreq_t *req = NULL; uint32_t maxudp; bool free_buf; - isc_nm_recv_cb_t cb; - void *cbarg; REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->tid == isc_nm_tid()); @@ -385,38 +389,35 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, goto done; } - region.base = (unsigned char *)buf->base; - region.length = nrecv; - - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - if (sock->timer_running) { uv_timer_stop(&sock->timer); sock->timer_running = false; } + req = isc__nm_uvreq_get(sock->mgr, sock); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; + /* + * The callback will be called synchronously, because result is + * ISC_R_SUCCESS. + */ + req->uvbuf.base = buf->base; + req->uvbuf.len = nrecv; + if (atomic_load(&sock->client)) { if (nrecv < 0) { failed_read_cb(sock, isc__nm_uverr2result(nrecv)); return; } - cb(sock->statichandle, ISC_R_SUCCESS, ®ion, cbarg); + isc_nmhandle_attach(sock->statichandle, &req->handle); } else { result = isc_sockaddr_fromsockaddr(&sockaddr, addr); RUNTIME_CHECK(result == ISC_R_SUCCESS); - nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); - - cb(nmhandle, ISC_R_SUCCESS, ®ion, cbarg); - - /* - * If the recv callback wants to hold on to the handle, - * it needs to attach to it. - */ - isc_nmhandle_detach(&nmhandle); + req->handle = isc__nmhandle_get(sock, &sockaddr, NULL); } + isc__nm_readcb(sock, req, ISC_R_SUCCESS); done: if (free_buf) { @@ -440,21 +441,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, uint32_t maxudp = atomic_load(&sock->mgr->maxudp); int ntid; - if (!isc__nmsocket_active(sock)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } + uvreq = isc__nm_uvreq_get(sock->mgr, sock); + uvreq->uvbuf.base = (char *)region->base; + uvreq->uvbuf.len = region->length; - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); - return; - } + isc_nmhandle_attach(handle, &uvreq->handle); - if (atomic_load(&sock->mgr->closing)) { + uvreq->cb.send = cb; + uvreq->cbarg = cbarg; + + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, cbarg); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } @@ -467,6 +465,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, * we need to do so here. */ if (maxudp != 0 && region->length > maxudp) { + isc__nm_uvreq_put(&uvreq, sock); isc_nmhandle_detach(&handle); return; } @@ -499,15 +498,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, rsock = &psock->children[ntid]; } - uvreq = isc__nm_uvreq_get(sock->mgr, sock); - uvreq->uvbuf.base = (char *)region->base; - uvreq->uvbuf.len = region->length; - - isc_nmhandle_attach(handle, &uvreq->handle); - - uvreq->cb.send = cb; - uvreq->cbarg = cbarg; - if (isc_nm_tid() == rsock->tid) { /* * If we're in the same thread as the socket we can send @@ -518,8 +508,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb, if (result != ISC_R_SUCCESS) { isc__nm_incstats(rsock->mgr, rsock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(rsock, uvreq, result); } } else { /* @@ -549,23 +538,21 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) { REQUIRE(worker->id == sock->tid); if (!isc__nmsocket_active(ievent->sock)) { - uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, ISC_R_CANCELED); return; } result = udp_send_direct(sock, uvreq, &ievent->peer); if (result != ISC_R_SUCCESS) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, sock); + failed_send_cb(sock, uvreq, result); } } static void udp_send_cb(uv_udp_send_t *req, int status) { isc_result_t result = ISC_R_SUCCESS; - isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; + isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req); isc_nmsocket_t *sock = uvreq->sock; REQUIRE(VALID_UVREQ(uvreq)); @@ -576,8 +563,7 @@ udp_send_cb(uv_udp_send_t *req, int status) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); } - uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); - isc__nm_uvreq_put(&uvreq, uvreq->sock); + isc__nm_sendcb(sock, uvreq, result); } /* @@ -595,13 +581,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->type == isc_nm_udpsocket); - if (!isc__nmsocket_active(sock)) { - return (ISC_R_CANCELED); - } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - return (ISC_R_CANCELED); - } - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { return (ISC_R_CANCELED); } @@ -625,7 +605,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, return (ISC_R_SUCCESS); } -static int +static isc_result_t udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__networker_t *worker = NULL; int uv_bind_flags = UV_UDP_REUSEADDR; @@ -644,11 +624,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { /* Socket was never opened; no need for isc__nm_udp_close() */ atomic_store(&sock->closing, true); atomic_store(&sock->closed, true); - atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->connect_error, true); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); - return (r); + return (isc__nm_uverr2result(r)); } r = uv_udp_open(&sock->uv_handle.udp, sock->fd); @@ -656,10 +634,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); @@ -673,10 +650,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } uv_handle_set_data(&sock->uv_handle.handle, sock); @@ -687,10 +663,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { sock->statsindex[STATID_CONNECTFAIL]); atomic_store(&sock->connect_error, true); atomic_store(&sock->result, isc__nm_uverr2result(r)); - failed_connect_cb(sock, req, isc__nm_uverr2result(r)); atomic_store(&sock->active, false); isc__nm_udp_close(sock); - return (r); + return (isc__nm_uverr2result(r)); } isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); atomic_store(&sock->connecting, false); @@ -703,7 +678,7 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { uv_send_buffer_size(&sock->uv_handle.handle, &(int){ ISC_SEND_BUFFER_SIZE }); #endif - return (0); + return (ISC_R_SUCCESS); } /* @@ -716,37 +691,27 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { (isc__netievent_udpconnect_t *)ev0; isc_nmsocket_t *sock = ievent->sock; isc__nm_uvreq_t *req = ievent->req; - isc_nmhandle_t *handle = NULL; - isc_nm_cb_t cb; - void *cbarg; - int r; isc_result_t result; UNUSED(worker); + REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->iface != NULL); REQUIRE(sock->parent == NULL); REQUIRE(sock->tid == isc_nm_tid()); - cb = sock->connect_cb; - cbarg = sock->connect_cbarg; - - r = udp_connect_direct(sock, req); - if (r != 0) { - LOCK(&sock->lock); - SIGNAL(&sock->cond); - UNLOCK(&sock->lock); - return; + req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); + result = udp_connect_direct(sock, req); + atomic_store(&sock->result, result); + if (result == ISC_R_SUCCESS) { + atomic_store(&sock->connected, true); + isc__nm_connectcb(sock, req, ISC_R_SUCCESS); + } else { + atomic_store(&sock->connect_error, true); + isc__nm_uvreq_put(&req, sock); } - atomic_store(&sock->connected, true); - atomic_store(&sock->result, ISC_R_SUCCESS); - result = atomic_load(&sock->result); - - handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr); - cb(handle, result, cbarg); - LOCK(&sock->lock); SIGNAL(&sock->cond); UNLOCK(&sock->lock); @@ -755,12 +720,6 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) { * The sock is now attached to the handle. */ isc__nmsocket_detach(&sock); - - /* - * The connect callback should have attached to the handle. - * If it didn't, the socket will be closed now. - */ - isc_nmhandle_detach(&handle); } isc_result_t @@ -834,7 +793,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc__nm_async_udpconnect(&mgr->workers[sock->tid], (isc__netievent_t *)event); isc__nm_put_ievent(mgr, event); - isc__nm_uvreq_put(&req, sock); } else { sock->tid = isc_random_uniform(mgr->nworkers); isc__nm_enqueue_ievent(&mgr->workers[sock->tid], @@ -846,7 +804,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, WAIT(&sock->cond, &sock->lock); } UNLOCK(&sock->lock); - isc__nm_uvreq_put(&req, sock); } result = atomic_load(&sock->result); @@ -867,9 +824,6 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, static void failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { - isc_nm_recv_cb_t cb; - void *cbarg = NULL; - REQUIRE(VALID_NMSOCK(sock)); REQUIRE(sock->statichandle != NULL); @@ -880,12 +834,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { uv_udp_recv_stop(&sock->uv_handle.udp); - cb = sock->recv_cb; - cbarg = sock->recv_cbarg; - isc__nmsocket_clearcb(sock); + if (sock->recv_cb != NULL) { + isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock); + isc_nmhandle_attach(sock->statichandle, &req->handle); + req->cb.recv = sock->recv_cb; + req->cbarg = sock->recv_cbarg; - if (cb != NULL) { - cb(sock->statichandle, result, NULL, cbarg); + isc__nmsocket_clearcb(sock); + + isc__nm_readcb(sock, req, result); + } +} + +static void +failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, + isc_result_t eresult) { + REQUIRE(VALID_NMSOCK(sock)); + REQUIRE(VALID_UVREQ(req)); + + if (req->cb.send != NULL) { + isc__nm_sendcb(sock, req, eresult); + } else { + isc__nm_uvreq_put(&req, sock); } } @@ -911,17 +881,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) { isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc_nmsocket_t *sock = ievent->sock; - if (!isc__nmsocket_active(sock)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - failed_read_cb(sock, ISC_R_CANCELED); - return; - } - - if (atomic_load(&sock->mgr->closing)) { + if (inactive(sock)) { failed_read_cb(sock, ISC_R_CANCELED); return; } @@ -950,24 +910,12 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) { REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(handle->sock->type == isc_nm_udpsocket); - if (!isc__nmsocket_active(sock)) { + if (inactive(sock)) { isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); cb(handle, ISC_R_CANCELED, NULL, cbarg); return; } - if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - - if (atomic_load(&sock->mgr->closing)) { - isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); - cb(handle, ISC_R_CANCELED, NULL, cbarg); - return; - } - REQUIRE(sock->tid == isc_nm_tid()); sock->recv_cb = cb; sock->recv_cbarg = cbarg; @@ -1059,35 +1007,6 @@ isc__nm_udp_close(isc_nmsocket_t *sock) { } } -static void -failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, - isc_result_t eresult) { - REQUIRE(sock->tid == isc_nm_tid()); - - if (sock->timer_running) { - uv_timer_stop(&sock->timer); - sock->timer_running = false; - } - - if (!atomic_load(&sock->connecting)) { - return; - } - - atomic_store(&sock->connecting, false); - - INSIST(req != NULL); - - isc__nmsocket_clearcb(sock); - - if (req->cb.connect != NULL) { - req->cb.connect(NULL, eresult, req->cbarg); - } - req->cb.connect = NULL; - req->cbarg = NULL; - - isc__nmsocket_detach(&sock); -} - void isc__nm_udp_shutdown(isc_nmsocket_t *sock) { REQUIRE(VALID_NMSOCK(sock)); @@ -1098,11 +1017,6 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) { } if (atomic_load(&sock->connecting)) { - if (sock->timer_initialized) { - isc__nm_uvreq_t *req = - uv_handle_get_data((uv_handle_t *)&sock->timer); - failed_connect_cb(sock, req, ISC_R_CANCELED); - } return; }