2
0
mirror of https://gitlab.isc.org/isc-projects/bind9 synced 2025-09-01 15:05:23 +00:00

Turn all the callback to be always asynchronous

When calling the high level netmgr functions, the callback would be
sometimes called synchronously if we catch the failure directly, or
asynchronously if it happens later.  The synchronous call to the
callback could create deadlocks as the caller would not expect the
failed callback to be executed directly.
This commit is contained in:
Ondřej Surý
2020-11-11 10:46:33 +01:00
parent fece7a4881
commit a49d88568f
6 changed files with 497 additions and 342 deletions

View File

@@ -172,6 +172,11 @@ typedef enum isc__netievent_type {
netievent_stop, netievent_stop,
netievent_pause, netievent_pause,
netievent_connectcb,
netievent_acceptcb,
netievent_readcb,
netievent_sendcb,
netievent_prio = 0xff, /* event type values higher than this netievent_prio = 0xff, /* event type values higher than this
* will be treated as high-priority * will be treated as high-priority
* events, which can be processed * events, which can be processed
@@ -187,6 +192,7 @@ typedef union {
isc_nm_recv_cb_t recv; isc_nm_recv_cb_t recv;
isc_nm_cb_t send; isc_nm_cb_t send;
isc_nm_cb_t connect; isc_nm_cb_t connect;
isc_nm_accept_cb_t accept;
} isc__nm_cb_t; } isc__nm_cb_t;
/* /*
@@ -260,6 +266,18 @@ typedef isc__netievent__socket_req_t isc__netievent_tcplisten_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t; typedef isc__netievent__socket_req_t isc__netievent_tcpsend_t;
typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t; typedef isc__netievent__socket_req_t isc__netievent_tcpdnssend_t;
typedef struct isc__netievent__socket_req_result {
isc__netievent_type type;
isc_nmsocket_t *sock;
isc__nm_uvreq_t *req;
isc_result_t result;
} isc__netievent__socket_req_result_t;
typedef isc__netievent__socket_req_result_t isc__netievent_connectcb_t;
typedef isc__netievent__socket_req_result_t isc__netievent_acceptcb_t;
typedef isc__netievent__socket_req_result_t isc__netievent_readcb_t;
typedef isc__netievent__socket_req_result_t isc__netievent_sendcb_t;
typedef struct isc__netievent__socket_streaminfo_quota { typedef struct isc__netievent__socket_streaminfo_quota {
isc__netievent_type type; isc__netievent_type type;
isc_nmsocket_t *sock; isc_nmsocket_t *sock;
@@ -746,6 +764,48 @@ isc__nmsocket_clearcb(isc_nmsocket_t *sock);
* Clear the recv and accept callbacks in 'sock'. * Clear the recv and accept callbacks in 'sock'.
*/ */
void
isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult);
void
isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Issue a connect callback on the socket, used to call the callback
* on failed conditions when the event can't be scheduled on the uv loop.
*/
void
isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult);
void
isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Issue a accept callback on the socket, used to call the callback
* on failed conditions when the event can't be scheduled on the uv loop.
*/
void
isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult);
void
isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Issue a read callback on the socket, used to call the callback
* on failed conditions when the event can't be scheduled on the uv loop.
*
*/
void
isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult);
void
isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Issue a write callback on the socket, used to call the callback
* on failed conditions when the event can't be scheduled on the uv loop.
*/
void void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0); isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0);
/*%< /*%<

View File

@@ -700,9 +700,22 @@ process_queue(isc__networker_t *worker, isc_queue_t *queue) {
isc__nm_async_tls_do_bio(worker, ievent); isc__nm_async_tls_do_bio(worker, ievent);
break; break;
case netievent_connectcb:
isc__nm_async_connectcb(worker, ievent);
break;
case netievent_acceptcb:
isc__nm_async_acceptcb(worker, ievent);
break;
case netievent_readcb:
isc__nm_async_readcb(worker, ievent);
break;
case netievent_sendcb:
isc__nm_async_sendcb(worker, ievent);
break;
case netievent_closecb: case netievent_closecb:
isc__nm_async_closecb(worker, ievent); isc__nm_async_closecb(worker, ievent);
break; break;
case netievent_detach: case netievent_detach:
isc__nm_async_detach(worker, ievent); isc__nm_async_detach(worker, ievent);
break; break;
@@ -1645,17 +1658,194 @@ isc_nm_stoplistening(isc_nmsocket_t *sock) {
} }
void void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) { isc__nm_connectcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0; isc_result_t eresult) {
isc__netievent_connectcb_t *ievent =
isc__nm_get_ievent(sock->mgr, netievent_connectcb);
REQUIRE(VALID_NMSOCK(ievent->sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(ievent->sock->tid == isc_nm_tid()); REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(ievent->sock->closehandle_cb != NULL); REQUIRE(VALID_NMHANDLE(uvreq->handle));
ievent->sock = sock;
ievent->req = uvreq;
ievent->result = eresult;
if (eresult == ISC_R_SUCCESS) {
isc__nm_async_connectcb(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_connectcb(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_connectcb_t *ievent = (isc__netievent_connectcb_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_result_t eresult = ievent->result;
UNUSED(worker); UNUSED(worker);
ievent->sock->closehandle_cb(ievent->sock); REQUIRE(VALID_NMSOCK(sock));
isc__nmsocket_detach(&ievent->sock); REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(ievent->sock->tid == isc_nm_tid());
REQUIRE(uvreq->cb.connect != NULL);
uvreq->cb.connect(uvreq->handle, eresult, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq, sock);
}
void
isc__nm_acceptcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult) {
isc__netievent_acceptcb_t *ievent =
isc__nm_get_ievent(sock->mgr, netievent_acceptcb);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
ievent->sock = sock;
ievent->req = uvreq;
ievent->result = eresult;
if (eresult == ISC_R_SUCCESS) {
isc__nm_async_acceptcb(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_acceptcb(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_acceptcb_t *ievent = (isc__netievent_acceptcb_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_result_t eresult = ievent->result;
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(uvreq->cb.accept != NULL);
uvreq->cb.accept(uvreq->handle, eresult, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq, sock);
}
void
isc__nm_readcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult) {
isc__netievent_readcb_t *ievent = isc__nm_get_ievent(sock->mgr,
netievent_readcb);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
ievent->sock = sock;
ievent->req = uvreq;
ievent->result = eresult;
if (eresult == ISC_R_SUCCESS) {
isc__nm_async_readcb(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_readcb(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_readcb_t *ievent = (isc__netievent_readcb_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_result_t eresult = ievent->result;
isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base,
.length = uvreq->uvbuf.len };
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(sock->tid == isc_nm_tid());
uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq, sock);
}
void
isc__nm_sendcb(isc_nmsocket_t *sock, isc__nm_uvreq_t *uvreq,
isc_result_t eresult) {
isc__netievent_sendcb_t *ievent = isc__nm_get_ievent(sock->mgr,
netievent_sendcb);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
ievent->sock = sock;
ievent->req = uvreq;
ievent->result = eresult;
if (eresult == ISC_R_SUCCESS) {
isc__nm_async_sendcb(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
isc__nm_put_ievent(sock->mgr, ievent);
} else {
isc__nm_enqueue_ievent(&sock->mgr->workers[sock->tid],
(isc__netievent_t *)ievent);
}
}
void
isc__nm_async_sendcb(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_sendcb_t *ievent = (isc__netievent_sendcb_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *uvreq = ievent->req;
isc_result_t eresult = ievent->result;
UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(uvreq));
REQUIRE(VALID_NMHANDLE(uvreq->handle));
REQUIRE(sock->tid == isc_nm_tid());
uvreq->cb.send(uvreq->handle, eresult, uvreq->cbarg);
isc__nm_uvreq_put(&uvreq, sock);
}
void
isc__nm_async_closecb(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_closecb_t *ievent = (isc__netievent_closecb_t *)ev0;
isc_nmsocket_t *sock = ievent->sock;
REQUIRE(VALID_NMSOCK(ievent->sock));
REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->closehandle_cb != NULL);
UNUSED(worker);
ievent->sock->closehandle_cb(sock);
isc__nmsocket_detach(&sock);
} }
void void

View File

@@ -49,7 +49,7 @@ can_log_tcp_quota(void) {
return (false); return (false);
} }
static int static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req); tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req);
static void static void
@@ -80,6 +80,17 @@ quota_accept_cb(isc_quota_t *quota, void *sock0);
static void static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult); failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult);
static void
failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult);
static bool
inactive(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) ||
atomic_load(&sock->mgr->closing) ||
(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}
static void static void
failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) { failed_accept_cb(isc_nmsocket_t *sock, isc_result_t eresult) {
/* /*
@@ -126,20 +137,17 @@ failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
} }
if (!atomic_load(&sock->connecting)) { if (!atomic_load(&sock->connecting)) {
isc__nm_uvreq_put(&req, sock);
return; return;
} }
atomic_store(&sock->connecting, false); atomic_store(&sock->connecting, false);
isc__nmsocket_clearcb(sock); isc__nmsocket_clearcb(sock);
if (req->cb.connect != NULL) { if (req->cb.connect != NULL) {
req->cb.connect(NULL, eresult, req->cbarg); isc__nm_connectcb(sock, req, eresult);
} else {
isc__nm_uvreq_put(&req, sock);
} }
req->cb.connect = NULL;
req->cbarg = NULL;
isc__nm_uvreq_put(&req, sock);
isc__nmsocket_detach(&sock);
} }
static void static void
@@ -147,16 +155,22 @@ connecttimeout_cb(uv_timer_t *handle) {
isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle); isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)handle);
isc_nmsocket_t *sock = req->sock; isc_nmsocket_t *sock = req->sock;
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
failed_connect_cb(sock, req, ISC_R_TIMEDOUT); failed_connect_cb(sock, req, ISC_R_TIMEDOUT);
isc__nmsocket_detach(&sock);
} }
static int static isc_result_t
tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL; isc__networker_t *worker = NULL;
int r; int r;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
REQUIRE(isc__nm_in_netthread()); REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
@@ -169,11 +183,8 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
atomic_store(&sock->closing, true); atomic_store(&sock->closing, true);
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
atomic_store(&sock->result, isc__nm_uverr2result(r));
atomic_store(&sock->connect_error, true);
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
return (r); return (isc__nm_uverr2result(r));
} }
if (req->local.length != 0) { if (req->local.length != 0) {
@@ -181,12 +192,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_BINDFAIL]); sock->statsindex[STATID_BINDFAIL]);
atomic_store(&sock->result, isc__nm_uverr2result(r));
atomic_store(&sock->connect_error, true);
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
isc__nm_tcp_close(sock); isc__nm_tcp_close(sock);
return (r); return (isc__nm_uverr2result(r));
} }
} }
@@ -203,12 +211,9 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
if (r != 0) { if (r != 0) {
isc__nm_incstats(sock->mgr, isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_CONNECTFAIL]); sock->statsindex[STATID_CONNECTFAIL]);
atomic_store(&sock->result, isc__nm_uverr2result(r));
atomic_store(&sock->connect_error, true);
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
isc__nm_tcp_close(sock); isc__nm_tcp_close(sock);
return (r); return (isc__nm_uverr2result(r));
} }
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
@@ -216,7 +221,7 @@ tcp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
0); 0);
sock->timer_running = true; sock->timer_running = true;
return (0); return (ISC_R_SUCCESS);
} }
void void
@@ -225,22 +230,28 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
(isc__netievent_tcpconnect_t *)ev0; (isc__netievent_tcpconnect_t *)ev0;
isc_nmsocket_t *sock = ievent->sock; isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *req = ievent->req; isc__nm_uvreq_t *req = ievent->req;
int r; isc_result_t result = ISC_R_SUCCESS;
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
UNUSED(worker); UNUSED(worker);
r = tcp_connect_direct(sock, req); REQUIRE(VALID_NMSOCK(sock));
if (r != 0) { REQUIRE(sock->type == isc_nm_tcpsocket);
LOCK(&sock->lock); REQUIRE(sock->iface != NULL);
SIGNAL(&sock->cond); REQUIRE(sock->parent == NULL);
UNLOCK(&sock->lock); REQUIRE(sock->tid == isc_nm_tid());
return;
}
atomic_store(&sock->connected, true); req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
result = tcp_connect_direct(sock, req);
atomic_store(&sock->result, result);
if (result == ISC_R_SUCCESS) {
atomic_store(&sock->connected, true);
/* uvreq will be freed in tcp_connect_cb */
/* socket will be detached in tcp_connect_cb */
} else {
atomic_store(&sock->connect_error, true);
isc__nm_uvreq_put(&req, sock);
isc__nmsocket_detach(&ievent->sock);
}
LOCK(&sock->lock); LOCK(&sock->lock);
SIGNAL(&sock->cond); SIGNAL(&sock->cond);
@@ -250,28 +261,32 @@ isc__nm_async_tcpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
static void static void
tcp_connect_cb(uv_connect_t *uvreq, int status) { tcp_connect_cb(uv_connect_t *uvreq, int status) {
isc_result_t result; isc_result_t result;
isc__nm_uvreq_t *req = uv_handle_get_data((uv_handle_t *)uvreq); isc__nm_uvreq_t *req = NULL;
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
struct sockaddr_storage ss; struct sockaddr_storage ss;
isc_nmhandle_t *handle = NULL;
int r; int r;
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
/* We timed out */
if (!atomic_load(&sock->connecting)) {
return;
}
req = uv_handle_get_data((uv_handle_t *)uvreq);
REQUIRE(VALID_UVREQ(req));
REQUIRE(VALID_NMHANDLE(req->handle));
if (sock->timer_running) { if (sock->timer_running) {
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
sock->timer_running = false; sock->timer_running = false;
} }
if (!atomic_load(&sock->connecting)) {
return;
}
REQUIRE(VALID_UVREQ(req));
if (status != 0) { if (status != 0) {
failed_connect_cb(sock, req, isc__nm_uverr2result(status)); failed_connect_cb(sock, req, isc__nm_uverr2result(status));
isc__nmsocket_detach(&sock);
return; return;
} }
@@ -280,6 +295,7 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
&(int){ sizeof(ss) }); &(int){ sizeof(ss) });
if (r != 0) { if (r != 0) {
failed_connect_cb(sock, req, isc__nm_uverr2result(r)); failed_connect_cb(sock, req, isc__nm_uverr2result(r));
isc__nmsocket_detach(&sock);
return; return;
} }
@@ -288,21 +304,12 @@ tcp_connect_cb(uv_connect_t *uvreq, int status) {
result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss); result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
RUNTIME_CHECK(result == ISC_R_SUCCESS); RUNTIME_CHECK(result == ISC_R_SUCCESS);
handle = isc__nmhandle_get(sock, NULL, NULL); isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
isc__nm_uvreq_put(&req, sock);
/* /*
* The sock is now attached to the handle. * The sock is now attached to the handle.
*/ */
isc__nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
/*
* The connect callback should have attached to the handle.
* If it didn't, the socket will be closed now.
*/
isc_nmhandle_detach(&handle);
} }
isc_result_t isc_result_t
@@ -310,7 +317,7 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, unsigned int timeout, isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
size_t extrahandlesize) { size_t extrahandlesize) {
isc_result_t result = ISC_R_SUCCESS; isc_result_t result = ISC_R_SUCCESS;
isc_nmsocket_t *nsock = NULL, *tmp = NULL; isc_nmsocket_t *sock = NULL, *tmp = NULL;
isc__netievent_tcpconnect_t *ievent = NULL; isc__netievent_tcpconnect_t *ievent = NULL;
isc__nm_uvreq_t *req = NULL; isc__nm_uvreq_t *req = NULL;
@@ -318,50 +325,50 @@ isc_nm_tcpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
REQUIRE(local != NULL); REQUIRE(local != NULL);
REQUIRE(peer != NULL); REQUIRE(peer != NULL);
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock)); sock = isc_mem_get(mgr->mctx, sizeof(*sock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local); isc__nmsocket_init(sock, mgr, isc_nm_tcpsocket, local);
nsock->extrahandlesize = extrahandlesize; sock->extrahandlesize = extrahandlesize;
nsock->connect_timeout = timeout; sock->connect_timeout = timeout;
atomic_init(&nsock->result, ISC_R_SUCCESS); atomic_init(&sock->result, ISC_R_SUCCESS);
atomic_init(&nsock->client, true); atomic_init(&sock->client, true);
req = isc__nm_uvreq_get(mgr, nsock); req = isc__nm_uvreq_get(mgr, sock);
req->cb.connect = cb; req->cb.connect = cb;
req->cbarg = cbarg; req->cbarg = cbarg;
req->peer = peer->addr; req->peer = peer->addr;
req->local = local->addr; req->local = local->addr;
ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect); ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
ievent->sock = nsock; ievent->sock = sock;
ievent->req = req; ievent->req = req;
/* /*
* Async callbacks can dereference the socket in the meantime, * Async callbacks can dereference the socket in the meantime,
* we need to hold an additional reference to it. * we need to hold an additional reference to it.
*/ */
isc__nmsocket_attach(nsock, &tmp); isc__nmsocket_attach(sock, &tmp);
if (isc__nm_in_netthread()) { if (isc__nm_in_netthread()) {
nsock->tid = isc_nm_tid(); sock->tid = isc_nm_tid();
isc__nm_async_tcpconnect(&mgr->workers[nsock->tid], isc__nm_async_tcpconnect(&mgr->workers[sock->tid],
(isc__netievent_t *)ievent); (isc__netievent_t *)ievent);
isc__nm_put_ievent(mgr, ievent); isc__nm_put_ievent(mgr, ievent);
} else { } else {
nsock->tid = isc_random_uniform(mgr->nworkers); sock->tid = isc_random_uniform(mgr->nworkers);
isc__nm_enqueue_ievent(&mgr->workers[nsock->tid], isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
(isc__netievent_t *)ievent); (isc__netievent_t *)ievent);
LOCK(&nsock->lock); LOCK(&sock->lock);
while (!atomic_load(&nsock->connected) && while (!atomic_load(&sock->connected) &&
!atomic_load(&nsock->connect_error)) { !atomic_load(&sock->connect_error)) {
WAIT(&nsock->cond, &nsock->lock); WAIT(&sock->cond, &sock->lock);
} }
UNLOCK(&nsock->lock); UNLOCK(&sock->lock);
} }
result = atomic_load(&nsock->result); result = atomic_load(&sock->result);
isc__nmsocket_detach(&tmp); isc__nmsocket_detach(&tmp);
@@ -581,13 +588,11 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_tcpchildaccept_t *ievent = isc__netievent_tcpchildaccept_t *ievent =
(isc__netievent_tcpchildaccept_t *)ev0; (isc__netievent_tcpchildaccept_t *)ev0;
isc_nmsocket_t *sock = ievent->sock; isc_nmsocket_t *sock = ievent->sock;
isc_nmhandle_t *handle;
isc_result_t result; isc_result_t result;
isc__nm_uvreq_t *req = NULL;
struct sockaddr_storage ss; struct sockaddr_storage ss;
isc_sockaddr_t local; isc_sockaddr_t local;
int r; int r;
isc_nm_accept_cb_t accept_cb;
void *accept_cbarg;
REQUIRE(isc__nm_in_netthread()); REQUIRE(isc__nm_in_netthread());
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
@@ -650,25 +655,22 @@ isc__nm_async_tcpchildaccept(isc__networker_t *worker, isc__netievent_t *ev0) {
} }
sock->accepting = false; sock->accepting = false;
handle = isc__nmhandle_get(sock, NULL, &local);
INSIST(sock->accept_cb != NULL); INSIST(sock->accept_cb != NULL);
accept_cb = sock->accept_cb;
accept_cbarg = sock->accept_cbarg;
sock->read_timeout = sock->mgr->init; sock->read_timeout = sock->mgr->init;
accept_cb(handle, ISC_R_SUCCESS, accept_cbarg);
req = isc__nm_uvreq_get(sock->mgr, sock);
req->handle = isc__nmhandle_get(sock, NULL, &local);
req->cb.accept = sock->accept_cb;
req->cbarg = sock->accept_cbarg;
isc__nm_acceptcb(sock, req, ISC_R_SUCCESS);
/* /*
* sock is now attached to the handle. * sock is now attached to the handle.
*/ */
isc__nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
/*
* The accept callback should have attached to the handle.
* If it didn't, the socket will be closed now.
*/
isc_nmhandle_detach(&handle);
return; return;
error: error:
@@ -733,9 +735,6 @@ tcp_listenclose_cb(uv_handle_t *handle) {
static void static void
failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
isc_nm_recv_cb_t cb;
void *cbarg = NULL;
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->statichandle != NULL); REQUIRE(sock->statichandle != NULL);
@@ -750,12 +749,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
uv_read_stop(&sock->uv_handle.stream); uv_read_stop(&sock->uv_handle.stream);
cb = sock->recv_cb; if (sock->recv_cb != NULL) {
cbarg = sock->recv_cbarg; isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
isc__nmsocket_clearcb(sock); isc_nmhandle_attach(sock->statichandle, &req->handle);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
if (cb != NULL) { isc__nmsocket_clearcb(sock);
cb(sock->statichandle, result, NULL, cbarg);
isc__nm_readcb(sock, req, result);
}
}
static void
failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
if (req->cb.send != NULL) {
isc__nm_sendcb(sock, req, eresult);
} else {
isc__nm_uvreq_put(&req, sock);
} }
} }
@@ -790,29 +805,17 @@ isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMHANDLE(handle)); REQUIRE(VALID_NMHANDLE(handle));
REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(VALID_NMSOCK(handle->sock));
if (!isc__nmsocket_active(sock)) { sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
if (inactive(sock)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg); failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg);
return;
}
if (atomic_load(&sock->mgr->closing)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg);
return; return;
} }
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
sock->recv_cb = cb;
sock->recv_cbarg = cbarg;
sock->read_timeout = (atomic_load(&sock->keepalive) sock->read_timeout = (atomic_load(&sock->keepalive)
? sock->mgr->keepalive ? sock->mgr->keepalive
: sock->mgr->idle); : sock->mgr->idle);
@@ -864,17 +867,7 @@ isc__nm_async_tcp_startread(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(worker->id == isc_nm_tid()); REQUIRE(worker->id == isc_nm_tid());
if (!isc__nmsocket_active(sock)) { if (inactive(sock)) {
failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (atomic_load(&sock->mgr->closing)) {
failed_read_cb(sock, ISC_R_CANCELED); failed_read_cb(sock, ISC_R_CANCELED);
return; return;
} }
@@ -982,13 +975,22 @@ read_cb(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) {
REQUIRE(buf != NULL); REQUIRE(buf != NULL);
if (nread >= 0) { if (nread >= 0) {
isc_region_t region = { .base = (unsigned char *)buf->base, if (sock->recv_cb != NULL) {
.length = nread }; isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr,
isc_nm_recv_cb_t cb = sock->recv_cb; sock);
void *cbarg = sock->recv_cbarg; req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
isc_nmhandle_attach(sock->statichandle, &req->handle);
if (cb != NULL) { /*
cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg); * The callback will be called synchronously because the
* result is ISC_R_SUCCESS, so we don't need to retain
* the buffer
*/
req->uvbuf.base = buf->base;
req->uvbuf.len = nread;
isc__nm_readcb(sock, req, ISC_R_SUCCESS);
} }
if (sock->timer_initialized && sock->read_timeout != 0) { if (sock->timer_initialized && sock->read_timeout != 0) {
@@ -1183,24 +1185,6 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->type == isc_nm_tcpsocket);
if (!isc__nmsocket_active(sock)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, cbarg);
return;
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, cbarg);
return;
}
if (atomic_load(&sock->mgr->closing)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, cbarg);
return;
}
uvreq = isc__nm_uvreq_get(sock->mgr, sock); uvreq = isc__nm_uvreq_get(sock->mgr, sock);
uvreq->uvbuf.base = (char *)region->base; uvreq->uvbuf.base = (char *)region->base;
uvreq->uvbuf.len = region->length; uvreq->uvbuf.len = region->length;
@@ -1210,6 +1194,12 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
uvreq->cb.send = cb; uvreq->cb.send = cb;
uvreq->cbarg = cbarg; uvreq->cbarg = cbarg;
if (inactive(sock)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
failed_send_cb(sock, uvreq, ISC_R_CANCELED);
return;
}
if (sock->tid == isc_nm_tid()) { if (sock->tid == isc_nm_tid()) {
/* /*
* If we're in the same thread as the socket we can send the * If we're in the same thread as the socket we can send the
@@ -1219,8 +1209,7 @@ isc__nm_tcp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock->mgr, isc__nm_incstats(sock->mgr,
sock->statsindex[STATID_SENDFAIL]); sock->statsindex[STATID_SENDFAIL]);
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); failed_send_cb(sock, uvreq, result);
isc__nm_uvreq_put(&uvreq, sock);
} }
} else { } else {
/* /*
@@ -1284,13 +1273,7 @@ tcp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_tcpsocket); REQUIRE(sock->type == isc_nm_tcpsocket);
if (!isc__nmsocket_active(sock)) { if (inactive(sock)) {
return (ISC_R_CANCELED);
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
return (ISC_R_CANCELED);
}
if (atomic_load(&sock->mgr->closing)) {
return (ISC_R_CANCELED); return (ISC_R_CANCELED);
} }
@@ -1408,11 +1391,6 @@ isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
} }
if (atomic_load(&sock->connecting)) { if (atomic_load(&sock->connecting)) {
if (sock->timer_initialized) {
isc__nm_uvreq_t *req =
uv_handle_get_data((uv_handle_t *)&sock->timer);
failed_connect_cb(sock, req, ISC_R_CANCELED);
}
return; return;
} }

View File

@@ -787,11 +787,6 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn)); isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
if (result != ISC_R_SUCCESS) {
cb(NULL, result, cbarg);
return;
}
dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock)); dnssock = isc_mem_get(handle->sock->mgr->mctx, sizeof(*dnssock));
isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket, isc__nmsocket_init(dnssock, handle->sock->mgr, isc_nm_tcpdnssocket,
handle->sock->iface); handle->sock->iface);
@@ -807,6 +802,13 @@ tcpdnsconnect_cb(isc_nmhandle_t *handle, isc_result_t result, void *arg) {
readhandle = isc__nmhandle_get(dnssock, NULL, NULL); readhandle = isc__nmhandle_get(dnssock, NULL, NULL);
if (result != ISC_R_SUCCESS) {
cb(readhandle, result, cbarg);
isc__nmsocket_detach(&dnssock);
isc_nmhandle_detach(&readhandle);
return;
}
INSIST(dnssock->statichandle != NULL); INSIST(dnssock->statichandle != NULL);
INSIST(dnssock->statichandle == readhandle); INSIST(dnssock->statichandle == readhandle);
INSIST(readhandle->sock == dnssock); INSIST(readhandle->sock == dnssock);
@@ -838,20 +840,26 @@ isc_result_t
isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_tcpdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, unsigned int timeout, isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
size_t extrahandlesize) { size_t extrahandlesize) {
tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); isc_result_t result = ISC_R_SUCCESS;
tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(*conn));
*conn = (tcpconnect_t){ .cb = cb, *conn = (tcpconnect_t){ .cb = cb,
.cbarg = cbarg, .cbarg = cbarg,
.extrahandlesize = extrahandlesize }; .extrahandlesize = extrahandlesize };
isc_mem_attach(mgr->mctx, &conn->mctx); isc_mem_attach(mgr->mctx, &conn->mctx);
return (isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn, result = isc_nm_tcpconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
timeout, 0)); timeout, 0);
if (result != ISC_R_SUCCESS) {
isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
}
return (result);
} }
isc_result_t isc_result_t
isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer, isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_nm_cb_t cb, void *cbarg, unsigned int timeout, isc_nm_cb_t cb, void *cbarg, unsigned int timeout,
size_t extrahandlesize) { size_t extrahandlesize) {
isc_result_t result = ISC_R_SUCCESS;
tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t)); tcpconnect_t *conn = isc_mem_get(mgr->mctx, sizeof(tcpconnect_t));
SSL_CTX *ctx = NULL; SSL_CTX *ctx = NULL;
@@ -861,9 +869,12 @@ isc_nm_tlsdnsconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc_mem_attach(mgr->mctx, &conn->mctx); isc_mem_attach(mgr->mctx, &conn->mctx);
ctx = SSL_CTX_new(SSLv23_client_method()); ctx = SSL_CTX_new(SSLv23_client_method());
isc_result_t result = isc_nm_tlsconnect( result = isc_nm_tlsconnect(mgr, local, peer, tcpdnsconnect_cb, conn,
mgr, local, peer, tcpdnsconnect_cb, conn, ctx, timeout, 0); ctx, timeout, 0);
SSL_CTX_free(ctx); SSL_CTX_free(ctx);
if (result != ISC_R_SUCCESS) {
isc_mem_putanddetach(&conn->mctx, conn, sizeof(*conn));
}
return (result); return (result);
} }

View File

@@ -79,6 +79,7 @@ tls_senddone(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
UNUSED(handle); UNUSED(handle);
/* XXXWPK TODO */ /* XXXWPK TODO */
UNUSED(eresult); UNUSED(eresult);
isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base, isc_mem_put(sock->mgr->mctx, sock->tls.senddata.base,
sock->tls.senddata.length); sock->tls.senddata.length);
sock->tls.senddata = (isc_region_t){ NULL, 0 }; sock->tls.senddata = (isc_region_t){ NULL, 0 };
@@ -701,7 +702,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
REQUIRE(VALID_NMSOCK(tlssock)); REQUIRE(VALID_NMSOCK(tlssock));
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
atomic_store(&tlssock->result, result); atomic_store(&tlssock->result, result);
atomic_store(&tlssock->connect_error, true); atomic_store(&tlssock->connect_error, true);
tls_close_direct(tlssock); tls_close_direct(tlssock);
@@ -714,7 +715,7 @@ tls_connect_cb(isc_nmhandle_t *handle, isc_result_t result, void *cbarg) {
isc_nmhandle_attach(handle, &tlssock->outerhandle); isc_nmhandle_attach(handle, &tlssock->outerhandle);
result = initialize_tls(tlssock, false); result = initialize_tls(tlssock, false);
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); tlssock->connect_cb(handle, result, tlssock->connect_cbarg);
atomic_store(&tlssock->result, result); atomic_store(&tlssock->result, result);
atomic_store(&tlssock->connect_error, true); atomic_store(&tlssock->connect_error, true);
tls_close_direct(tlssock); tls_close_direct(tlssock);
@@ -742,6 +743,7 @@ isc__nm_async_tlsconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
tls_connect_cb, tlssock, tls_connect_cb, tlssock,
tlssock->connect_timeout, 0); tlssock->connect_timeout, 0);
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
/* FIXME: We need to pass valid handle */
tlssock->connect_cb(NULL, result, tlssock->connect_cbarg); tlssock->connect_cb(NULL, result, tlssock->connect_cbarg);
atomic_store(&tlssock->result, result); atomic_store(&tlssock->result, result);
atomic_store(&tlssock->connect_error, true); atomic_store(&tlssock->connect_error, true);

View File

@@ -51,8 +51,15 @@ static void
failed_read_cb(isc_nmsocket_t *sock, isc_result_t result); failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
static void static void
failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req, failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult); isc_result_t eresult);
static bool
inactive(isc_nmsocket_t *sock) {
return (!isc__nmsocket_active(sock) ||
atomic_load(&sock->mgr->closing) ||
(sock->server != NULL && !isc__nmsocket_active(sock->server)));
}
isc_result_t isc_result_t
isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb, isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
@@ -339,14 +346,11 @@ static void
udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf, udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags) { const struct sockaddr *addr, unsigned flags) {
isc_result_t result; isc_result_t result;
isc_nmhandle_t *nmhandle = NULL;
isc_sockaddr_t sockaddr; isc_sockaddr_t sockaddr;
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle); isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
isc_region_t region; isc__nm_uvreq_t *req = NULL;
uint32_t maxudp; uint32_t maxudp;
bool free_buf; bool free_buf;
isc_nm_recv_cb_t cb;
void *cbarg;
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
@@ -385,38 +389,35 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
goto done; goto done;
} }
region.base = (unsigned char *)buf->base;
region.length = nrecv;
cb = sock->recv_cb;
cbarg = sock->recv_cbarg;
if (sock->timer_running) { if (sock->timer_running) {
uv_timer_stop(&sock->timer); uv_timer_stop(&sock->timer);
sock->timer_running = false; sock->timer_running = false;
} }
req = isc__nm_uvreq_get(sock->mgr, sock);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
/*
* The callback will be called synchronously, because result is
* ISC_R_SUCCESS.
*/
req->uvbuf.base = buf->base;
req->uvbuf.len = nrecv;
if (atomic_load(&sock->client)) { if (atomic_load(&sock->client)) {
if (nrecv < 0) { if (nrecv < 0) {
failed_read_cb(sock, isc__nm_uverr2result(nrecv)); failed_read_cb(sock, isc__nm_uverr2result(nrecv));
return; return;
} }
cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg); isc_nmhandle_attach(sock->statichandle, &req->handle);
} else { } else {
result = isc_sockaddr_fromsockaddr(&sockaddr, addr); result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS); RUNTIME_CHECK(result == ISC_R_SUCCESS);
nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL); req->handle = isc__nmhandle_get(sock, &sockaddr, NULL);
cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
/*
* If the recv callback wants to hold on to the handle,
* it needs to attach to it.
*/
isc_nmhandle_detach(&nmhandle);
} }
isc__nm_readcb(sock, req, ISC_R_SUCCESS);
done: done:
if (free_buf) { if (free_buf) {
@@ -440,21 +441,18 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
uint32_t maxudp = atomic_load(&sock->mgr->maxudp); uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
int ntid; int ntid;
if (!isc__nmsocket_active(sock)) { uvreq = isc__nm_uvreq_get(sock->mgr, sock);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); uvreq->uvbuf.base = (char *)region->base;
cb(handle, ISC_R_CANCELED, cbarg); uvreq->uvbuf.len = region->length;
return;
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) { isc_nmhandle_attach(handle, &uvreq->handle);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, cbarg);
return;
}
if (atomic_load(&sock->mgr->closing)) { uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
if (inactive(sock)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, cbarg); failed_send_cb(sock, uvreq, ISC_R_CANCELED);
return; return;
} }
@@ -467,6 +465,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
* we need to do so here. * we need to do so here.
*/ */
if (maxudp != 0 && region->length > maxudp) { if (maxudp != 0 && region->length > maxudp) {
isc__nm_uvreq_put(&uvreq, sock);
isc_nmhandle_detach(&handle); isc_nmhandle_detach(&handle);
return; return;
} }
@@ -499,15 +498,6 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
rsock = &psock->children[ntid]; rsock = &psock->children[ntid];
} }
uvreq = isc__nm_uvreq_get(sock->mgr, sock);
uvreq->uvbuf.base = (char *)region->base;
uvreq->uvbuf.len = region->length;
isc_nmhandle_attach(handle, &uvreq->handle);
uvreq->cb.send = cb;
uvreq->cbarg = cbarg;
if (isc_nm_tid() == rsock->tid) { if (isc_nm_tid() == rsock->tid) {
/* /*
* If we're in the same thread as the socket we can send * If we're in the same thread as the socket we can send
@@ -518,8 +508,7 @@ isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
isc__nm_incstats(rsock->mgr, isc__nm_incstats(rsock->mgr,
rsock->statsindex[STATID_SENDFAIL]); rsock->statsindex[STATID_SENDFAIL]);
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); failed_send_cb(rsock, uvreq, result);
isc__nm_uvreq_put(&uvreq, sock);
} }
} else { } else {
/* /*
@@ -549,23 +538,21 @@ isc__nm_async_udpsend(isc__networker_t *worker, isc__netievent_t *ev0) {
REQUIRE(worker->id == sock->tid); REQUIRE(worker->id == sock->tid);
if (!isc__nmsocket_active(ievent->sock)) { if (!isc__nmsocket_active(ievent->sock)) {
uvreq->cb.send(uvreq->handle, ISC_R_CANCELED, uvreq->cbarg); failed_send_cb(sock, uvreq, ISC_R_CANCELED);
isc__nm_uvreq_put(&uvreq, sock);
return; return;
} }
result = udp_send_direct(sock, uvreq, &ievent->peer); result = udp_send_direct(sock, uvreq, &ievent->peer);
if (result != ISC_R_SUCCESS) { if (result != ISC_R_SUCCESS) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); failed_send_cb(sock, uvreq, result);
isc__nm_uvreq_put(&uvreq, sock);
} }
} }
static void static void
udp_send_cb(uv_udp_send_t *req, int status) { udp_send_cb(uv_udp_send_t *req, int status) {
isc_result_t result = ISC_R_SUCCESS; isc_result_t result = ISC_R_SUCCESS;
isc__nm_uvreq_t *uvreq = (isc__nm_uvreq_t *)req->data; isc__nm_uvreq_t *uvreq = uv_handle_get_data((uv_handle_t *)req);
isc_nmsocket_t *sock = uvreq->sock; isc_nmsocket_t *sock = uvreq->sock;
REQUIRE(VALID_UVREQ(uvreq)); REQUIRE(VALID_UVREQ(uvreq));
@@ -576,8 +563,7 @@ udp_send_cb(uv_udp_send_t *req, int status) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
} }
uvreq->cb.send(uvreq->handle, result, uvreq->cbarg); isc__nm_sendcb(sock, uvreq, result);
isc__nm_uvreq_put(&uvreq, uvreq->sock);
} }
/* /*
@@ -595,13 +581,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->type == isc_nm_udpsocket);
if (!isc__nmsocket_active(sock)) { if (inactive(sock)) {
return (ISC_R_CANCELED);
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
return (ISC_R_CANCELED);
}
if (atomic_load(&sock->mgr->closing)) {
return (ISC_R_CANCELED); return (ISC_R_CANCELED);
} }
@@ -625,7 +605,7 @@ udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
return (ISC_R_SUCCESS); return (ISC_R_SUCCESS);
} }
static int static isc_result_t
udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) { udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__networker_t *worker = NULL; isc__networker_t *worker = NULL;
int uv_bind_flags = UV_UDP_REUSEADDR; int uv_bind_flags = UV_UDP_REUSEADDR;
@@ -644,11 +624,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
/* Socket was never opened; no need for isc__nm_udp_close() */ /* Socket was never opened; no need for isc__nm_udp_close() */
atomic_store(&sock->closing, true); atomic_store(&sock->closing, true);
atomic_store(&sock->closed, true); atomic_store(&sock->closed, true);
atomic_store(&sock->result, isc__nm_uverr2result(r));
atomic_store(&sock->connect_error, true); atomic_store(&sock->connect_error, true);
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
return (r); return (isc__nm_uverr2result(r));
} }
r = uv_udp_open(&sock->uv_handle.udp, sock->fd); r = uv_udp_open(&sock->uv_handle.udp, sock->fd);
@@ -656,10 +634,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPENFAIL]);
atomic_store(&sock->connect_error, true); atomic_store(&sock->connect_error, true);
atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->result, isc__nm_uverr2result(r));
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
isc__nm_udp_close(sock); isc__nm_udp_close(sock);
return (r); return (isc__nm_uverr2result(r));
} }
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_OPEN]);
@@ -673,10 +650,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_BINDFAIL]);
atomic_store(&sock->connect_error, true); atomic_store(&sock->connect_error, true);
atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->result, isc__nm_uverr2result(r));
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
isc__nm_udp_close(sock); isc__nm_udp_close(sock);
return (r); return (isc__nm_uverr2result(r));
} }
uv_handle_set_data(&sock->uv_handle.handle, sock); uv_handle_set_data(&sock->uv_handle.handle, sock);
@@ -687,10 +663,9 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
sock->statsindex[STATID_CONNECTFAIL]); sock->statsindex[STATID_CONNECTFAIL]);
atomic_store(&sock->connect_error, true); atomic_store(&sock->connect_error, true);
atomic_store(&sock->result, isc__nm_uverr2result(r)); atomic_store(&sock->result, isc__nm_uverr2result(r));
failed_connect_cb(sock, req, isc__nm_uverr2result(r));
atomic_store(&sock->active, false); atomic_store(&sock->active, false);
isc__nm_udp_close(sock); isc__nm_udp_close(sock);
return (r); return (isc__nm_uverr2result(r));
} }
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
atomic_store(&sock->connecting, false); atomic_store(&sock->connecting, false);
@@ -703,7 +678,7 @@ udp_connect_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req) {
uv_send_buffer_size(&sock->uv_handle.handle, uv_send_buffer_size(&sock->uv_handle.handle,
&(int){ ISC_SEND_BUFFER_SIZE }); &(int){ ISC_SEND_BUFFER_SIZE });
#endif #endif
return (0); return (ISC_R_SUCCESS);
} }
/* /*
@@ -716,37 +691,27 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
(isc__netievent_udpconnect_t *)ev0; (isc__netievent_udpconnect_t *)ev0;
isc_nmsocket_t *sock = ievent->sock; isc_nmsocket_t *sock = ievent->sock;
isc__nm_uvreq_t *req = ievent->req; isc__nm_uvreq_t *req = ievent->req;
isc_nmhandle_t *handle = NULL;
isc_nm_cb_t cb;
void *cbarg;
int r;
isc_result_t result; isc_result_t result;
UNUSED(worker); UNUSED(worker);
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->type == isc_nm_udpsocket); REQUIRE(sock->type == isc_nm_udpsocket);
REQUIRE(sock->iface != NULL); REQUIRE(sock->iface != NULL);
REQUIRE(sock->parent == NULL); REQUIRE(sock->parent == NULL);
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
cb = sock->connect_cb; req->handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
cbarg = sock->connect_cbarg; result = udp_connect_direct(sock, req);
atomic_store(&sock->result, result);
r = udp_connect_direct(sock, req); if (result == ISC_R_SUCCESS) {
if (r != 0) { atomic_store(&sock->connected, true);
LOCK(&sock->lock); isc__nm_connectcb(sock, req, ISC_R_SUCCESS);
SIGNAL(&sock->cond); } else {
UNLOCK(&sock->lock); atomic_store(&sock->connect_error, true);
return; isc__nm_uvreq_put(&req, sock);
} }
atomic_store(&sock->connected, true);
atomic_store(&sock->result, ISC_R_SUCCESS);
result = atomic_load(&sock->result);
handle = isc__nmhandle_get(sock, &req->peer, &sock->iface->addr);
cb(handle, result, cbarg);
LOCK(&sock->lock); LOCK(&sock->lock);
SIGNAL(&sock->cond); SIGNAL(&sock->cond);
UNLOCK(&sock->lock); UNLOCK(&sock->lock);
@@ -755,12 +720,6 @@ isc__nm_async_udpconnect(isc__networker_t *worker, isc__netievent_t *ev0) {
* The sock is now attached to the handle. * The sock is now attached to the handle.
*/ */
isc__nmsocket_detach(&sock); isc__nmsocket_detach(&sock);
/*
* The connect callback should have attached to the handle.
* If it didn't, the socket will be closed now.
*/
isc_nmhandle_detach(&handle);
} }
isc_result_t isc_result_t
@@ -834,7 +793,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
isc__nm_async_udpconnect(&mgr->workers[sock->tid], isc__nm_async_udpconnect(&mgr->workers[sock->tid],
(isc__netievent_t *)event); (isc__netievent_t *)event);
isc__nm_put_ievent(mgr, event); isc__nm_put_ievent(mgr, event);
isc__nm_uvreq_put(&req, sock);
} else { } else {
sock->tid = isc_random_uniform(mgr->nworkers); sock->tid = isc_random_uniform(mgr->nworkers);
isc__nm_enqueue_ievent(&mgr->workers[sock->tid], isc__nm_enqueue_ievent(&mgr->workers[sock->tid],
@@ -846,7 +804,6 @@ isc_nm_udpconnect(isc_nm_t *mgr, isc_nmiface_t *local, isc_nmiface_t *peer,
WAIT(&sock->cond, &sock->lock); WAIT(&sock->cond, &sock->lock);
} }
UNLOCK(&sock->lock); UNLOCK(&sock->lock);
isc__nm_uvreq_put(&req, sock);
} }
result = atomic_load(&sock->result); result = atomic_load(&sock->result);
@@ -867,9 +824,6 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
static void static void
failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) { failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
isc_nm_recv_cb_t cb;
void *cbarg = NULL;
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->statichandle != NULL); REQUIRE(sock->statichandle != NULL);
@@ -880,12 +834,28 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
uv_udp_recv_stop(&sock->uv_handle.udp); uv_udp_recv_stop(&sock->uv_handle.udp);
cb = sock->recv_cb; if (sock->recv_cb != NULL) {
cbarg = sock->recv_cbarg; isc__nm_uvreq_t *req = isc__nm_uvreq_get(sock->mgr, sock);
isc__nmsocket_clearcb(sock); isc_nmhandle_attach(sock->statichandle, &req->handle);
req->cb.recv = sock->recv_cb;
req->cbarg = sock->recv_cbarg;
if (cb != NULL) { isc__nmsocket_clearcb(sock);
cb(sock->statichandle, result, NULL, cbarg);
isc__nm_readcb(sock, req, result);
}
}
static void
failed_send_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(VALID_UVREQ(req));
if (req->cb.send != NULL) {
isc__nm_sendcb(sock, req, eresult);
} else {
isc__nm_uvreq_put(&req, sock);
} }
} }
@@ -911,17 +881,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0; isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
isc_nmsocket_t *sock = ievent->sock; isc_nmsocket_t *sock = ievent->sock;
if (!isc__nmsocket_active(sock)) { if (inactive(sock)) {
failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
failed_read_cb(sock, ISC_R_CANCELED);
return;
}
if (atomic_load(&sock->mgr->closing)) {
failed_read_cb(sock, ISC_R_CANCELED); failed_read_cb(sock, ISC_R_CANCELED);
return; return;
} }
@@ -950,24 +910,12 @@ isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
REQUIRE(VALID_NMSOCK(handle->sock)); REQUIRE(VALID_NMSOCK(handle->sock));
REQUIRE(handle->sock->type == isc_nm_udpsocket); REQUIRE(handle->sock->type == isc_nm_udpsocket);
if (!isc__nmsocket_active(sock)) { if (inactive(sock)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]); isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg); cb(handle, ISC_R_CANCELED, NULL, cbarg);
return; return;
} }
if (sock->server != NULL && !isc__nmsocket_active(sock->server)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg);
return;
}
if (atomic_load(&sock->mgr->closing)) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_SENDFAIL]);
cb(handle, ISC_R_CANCELED, NULL, cbarg);
return;
}
REQUIRE(sock->tid == isc_nm_tid()); REQUIRE(sock->tid == isc_nm_tid());
sock->recv_cb = cb; sock->recv_cb = cb;
sock->recv_cbarg = cbarg; sock->recv_cbarg = cbarg;
@@ -1059,35 +1007,6 @@ isc__nm_udp_close(isc_nmsocket_t *sock) {
} }
} }
static void
failed_connect_cb(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_result_t eresult) {
REQUIRE(sock->tid == isc_nm_tid());
if (sock->timer_running) {
uv_timer_stop(&sock->timer);
sock->timer_running = false;
}
if (!atomic_load(&sock->connecting)) {
return;
}
atomic_store(&sock->connecting, false);
INSIST(req != NULL);
isc__nmsocket_clearcb(sock);
if (req->cb.connect != NULL) {
req->cb.connect(NULL, eresult, req->cbarg);
}
req->cb.connect = NULL;
req->cbarg = NULL;
isc__nmsocket_detach(&sock);
}
void void
isc__nm_udp_shutdown(isc_nmsocket_t *sock) { isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock)); REQUIRE(VALID_NMSOCK(sock));
@@ -1098,11 +1017,6 @@ isc__nm_udp_shutdown(isc_nmsocket_t *sock) {
} }
if (atomic_load(&sock->connecting)) { if (atomic_load(&sock->connecting)) {
if (sock->timer_initialized) {
isc__nm_uvreq_t *req =
uv_handle_get_data((uv_handle_t *)&sock->timer);
failed_connect_cb(sock, req, ISC_R_CANCELED);
}
return; return;
} }