diff --git a/lib/dns/dispatch.c b/lib/dns/dispatch.c index 3a39296957..a7fdde6300 100644 --- a/lib/dns/dispatch.c +++ b/lib/dns/dispatch.c @@ -35,6 +35,14 @@ #include "../isc/util.h" +/* + * If we cannot send to this task, the application is broken. + */ +#define ISC_TASK_SEND(a, b) do { \ + RUNTIME_CHECK(isc_task_send(a, b) == ISC_R_SUCCESS); \ +} while (0) + + struct dns_dispentry { unsigned int magic; dns_messageid_t id; @@ -43,8 +51,9 @@ struct dns_dispentry { isc_task_t *task; isc_taskaction_t action; void *arg; + isc_boolean_t item_out; ISC_LIST(dns_dispatchevent_t) items; - ISC_LINK(dns_dispentry_t) link; + ISC_LINK(dns_dispentry_t) link; }; #define INVALID_BUCKET (0xffffdead) @@ -63,6 +72,12 @@ struct dns_dispatch { isc_mempool_t *epool; /* memory pool for events */ isc_mempool_t *bpool; /* memory pool for buffers */ isc_mempool_t *rpool; /* memory pool request/reply */ + dns_dispatchevent_t *failsafe_ev; /* failsafe cancel event */ + unsigned int recvs; /* recv() calls outstanding */ + unsigned int recvs_wanted; /* recv() calls wanted */ + unsigned int shutting_down : 1, + shutdown_out : 1; + dns_result_t shutdown_why; ISC_LIST(dns_dispentry_t) rq_handlers; /* request handler list */ ISC_LIST(dns_dispatchevent_t) rq_events; /* holder for rq events */ isc_int32_t qid_state; /* state generator info */ @@ -83,13 +98,93 @@ struct dns_dispatch { /* * statics. */ -static dns_dispentry_t *bucket_search(dns_dispatch_t *, isc_sockaddr_t *, - dns_messageid_t, unsigned int); -static void destroy(dns_dispatch_t *); -static void udp_recv(isc_task_t *, isc_event_t *); -static dns_result_t startrecv(dns_dispatch_t *); -static dns_messageid_t randomid(dns_dispatch_t *); -static unsigned int hash(dns_dispatch_t *, isc_sockaddr_t *, dns_messageid_t); +static dns_dispentry_t * +bucket_search(dns_dispatch_t *, isc_sockaddr_t *, + dns_messageid_t, unsigned int); + +static void +destroy(dns_dispatch_t *); + +static void +udp_recv(isc_task_t *, isc_event_t *); + +static void +startrecv(dns_dispatch_t *); + +static dns_messageid_t +randomid(dns_dispatch_t *); + +static unsigned int +hash(dns_dispatch_t *, isc_sockaddr_t *, dns_messageid_t); + +static void +free_buffer(dns_dispatch_t *disp, void *buf, unsigned int len); + +static void * +allocate_buffer(dns_dispatch_t *disp, unsigned int len); + +static inline void +free_event(dns_dispatch_t *disp, dns_dispatchevent_t *ev); + +static inline dns_dispatchevent_t * +allocate_event(dns_dispatch_t *disp); + +static inline isc_boolean_t +ok_to_kill(dns_dispatch_t *disp); + +static void +do_next_request(dns_dispatch_t *disp, dns_dispentry_t *resp); + +static void +do_next_response(dns_dispatch_t *disp, dns_dispentry_t *resp); + +static void +do_cancel(dns_dispatch_t *disp, dns_dispentry_t *resp); + +static dns_dispentry_t * +linear_first(dns_dispatch_t *disp); + +static dns_dispentry_t * +linear_next(dns_dispatch_t *disp, dns_dispentry_t *resp); + +static dns_dispentry_t * +linear_first(dns_dispatch_t *disp) +{ + dns_dispentry_t *ret; + unsigned int bucket; + + bucket = 0; + + while (bucket < disp->qid_hashsize) { + ret = ISC_LIST_HEAD(disp->qid_table[bucket]); + if (ret != NULL) + return (ret); + bucket++; + } + + return (NULL); +} + +static dns_dispentry_t * +linear_next(dns_dispatch_t *disp, dns_dispentry_t *resp) +{ + dns_dispentry_t *ret; + unsigned int bucket; + + ret = ISC_LIST_NEXT(resp, link); + if (ret != NULL) + return (ret); + + bucket = resp->bucket; + while (bucket < disp->qid_hashsize) { + ret = ISC_LIST_HEAD(disp->qid_table[bucket]); + if (ret != NULL) + return (ret); + bucket++; + } + + return (NULL); +} /* * Return a hash of the destination and message id. For now, just return @@ -120,17 +215,45 @@ randomid(dns_dispatch_t *disp) return ((dns_messageid_t)disp->qid_state); } +static inline isc_boolean_t +ok_to_kill(dns_dispatch_t *disp) +{ + if (disp->recvs > 0) + return (ISC_FALSE); + + if (disp->refcount > 0) + return (ISC_FALSE); + + return (ISC_TRUE); +} + /* * Called when refcount reaches 0 at any time. */ static void destroy(dns_dispatch_t *disp) { + dns_dispatchevent_t *ev; + disp->magic = 0; isc_task_detach(&disp->task); isc_socket_detach(&disp->socket); + /* + * Final cleanup of packets on the request list. + */ + ev = ISC_LIST_HEAD(disp->rq_events); + while (ev != NULL) { + ISC_LIST_UNLINK(disp->rq_events, ev, link); + free_buffer(disp, ev->buffer.base, ev->buffer.length); + free_event(disp, ev); + ev = ISC_LIST_HEAD(disp->rq_events); + } + + isc_mempool_put(disp->epool, disp->failsafe_ev); + disp->failsafe_ev = NULL; + isc_mempool_destroy(&disp->rpool); isc_mempool_destroy(&disp->bpool); isc_mempool_destroy(&disp->epool); @@ -158,6 +281,75 @@ bucket_search(dns_dispatch_t *disp, isc_sockaddr_t *dest, dns_messageid_t id, return (NULL); } +static void +free_buffer(dns_dispatch_t *disp, void *buf, unsigned int len) +{ + if (len == disp->buffersize) + isc_mempool_put(disp->bpool, buf); + else + isc_mem_put(disp->mctx, buf, len); +} + +static void * +allocate_buffer(dns_dispatch_t *disp, unsigned int len) +{ + void *temp; + + INSIST(len > 0); + + if (len == disp->buffersize) + temp = isc_mempool_get(disp->bpool); + else + temp = isc_mem_get(disp->mctx, len); + + return (temp); +} + +static inline void +free_event(dns_dispatch_t *disp, dns_dispatchevent_t *ev) +{ + if (disp->failsafe_ev == ev) { + INSIST(disp->shutdown_out == 1); + disp->shutdown_out = 0; + return; + } + + isc_mempool_put(disp->epool, ev); +} + +static inline dns_dispatchevent_t * +allocate_event(dns_dispatch_t *disp) +{ + dns_dispatchevent_t *ev; + + ev = isc_mempool_get(disp->epool); + + return (ev); +} + +/* + * General flow: + * + * If I/O result == CANCELED, free the buffer and notify everyone as + * the various queues drain. + * + * If I/O is error (not canceled and not success) log it, free the buffer, + * and restart. + * + * If query: + * if no listeners: free the buffer, restart. + * if listener: allocate event, fill in details. + * If cannot allocate, free buffer, restart. + * if rq event queue is not empty, queue. else, send. + * restart. + * + * If response: + * Allocate event, fill in details. + * If cannot allocate, free buffer, restart. + * find target. If not found, free buffer, restart. + * if event queue is not empty, queue. else, send. + * restart. + */ static void udp_recv(isc_task_t *task, isc_event_t *ev_in) { @@ -167,19 +359,29 @@ udp_recv(isc_task_t *task, isc_event_t *ev_in) dns_result_t dres; isc_buffer_t source; unsigned int flags; + dns_dispentry_t *resp; + dns_dispatchevent_t *rev; + unsigned int bucket; (void)task; /* shut up compiler */ LOCK(&disp->lock); if (ev->result != ISC_R_SUCCESS) { - /* * If the recv() was canceled pass the word on. - * XXXMLG */ if (ev->result == ISC_R_CANCELED) { + free_buffer(disp, ev->region.base, ev->region.length); isc_event_free(&ev_in); + + INSIST(disp->recvs > 0); + disp->recvs--; + if (disp->recvs == 0 && disp->shutting_down == 0) { + disp->shutdown_why = ISC_R_CANCELED; + disp->shutting_down = 1; + do_cancel(disp, NULL); + } return; } @@ -187,6 +389,7 @@ udp_recv(isc_task_t *task, isc_event_t *ev_in) * otherwise, on strange error, log it and restart. * XXXMLG */ + free_buffer(disp, ev->region.base, ev->region.length); goto restart; } @@ -197,28 +400,52 @@ udp_recv(isc_task_t *task, isc_event_t *ev_in) ISC_BUFFERTYPE_BINARY); dres = dns_message_peekheader(&source, &id, &flags); if (dres != DNS_R_SUCCESS) { + free_buffer(disp, ev->region.base, ev->region.length); /* XXXMLG log something here... */ goto restart; } + /* + * Allocate an event to send to the query or response client, and + * allocate a new buffer for our use. + */ + /* * Look at flags. If query, check to see if we have someone handling * them. If response, look to see where it goes. */ if ((flags & DNS_MESSAGEFLAG_QR) == 0) { - /* XXXLMG query */ + resp = ISC_LIST_HEAD(disp->rq_handlers); + if (resp == NULL) { + free_buffer(disp, ev->region.base, ev->region.length); + goto restart; + } + rev = allocate_event(disp); + if (rev == NULL) { + free_buffer(disp, ev->region.base, ev->region.length); + goto restart; + } + /* query */ } else { - /* XXXMLG response */ + /* response */ + rev = allocate_event(disp); + if (rev == NULL) { + free_buffer(disp, ev->region.base, ev->region.length); + goto restart; + } + bucket = hash(disp, &ev->address, id); + resp = bucket_search(disp, &ev->address, id, bucket); + if (resp == NULL) { + free_buffer(disp, ev->region.base, ev->region.length); + goto restart; + } } /* * Restart recv() to get the next packet. */ restart: - dres = startrecv(disp); - if (dres != DNS_R_SUCCESS) { - /* XXXMLG kill all people listening, try again? */ - } + startrecv(disp); UNLOCK(&disp->lock); @@ -228,35 +455,46 @@ udp_recv(isc_task_t *task, isc_event_t *ev_in) /* * disp must be locked */ -static dns_result_t +static void startrecv(dns_dispatch_t *disp) { isc_sockettype_t socktype; isc_result_t res; isc_region_t region; + if (disp->shutting_down == 1) + return; + + if (disp->recvs >= disp->recvs_wanted) + return; + socktype = isc_socket_gettype(disp->socket); - switch (socktype) { - /* - * UDP reads are always maximal. - */ - case isc_socket_udp: - region.length = disp->buffersize; - region.base = isc_mempool_get(disp->bpool); - if (region.base == NULL) - return (DNS_R_NOMEMORY); - res = isc_socket_recv(disp->socket, ®ion, ISC_TRUE, - disp->task, udp_recv, disp); - if (res != ISC_R_SUCCESS) - return (res); - break; - case isc_socket_tcp: - INSIST(1); /* XXXMLG */ - break; - } + while (disp->recvs < disp->recvs_wanted) { + switch (socktype) { + /* + * UDP reads are always maximal. + */ + case isc_socket_udp: + region.length = disp->buffersize; + region.base = allocate_buffer(disp, disp->buffersize); + if (region.base == NULL) + return; + res = isc_socket_recv(disp->socket, ®ion, ISC_TRUE, + disp->task, udp_recv, disp); + if (res != ISC_R_SUCCESS) { + disp->shutdown_why = res; + do_cancel(disp, NULL); + return; + } + disp->recvs++; + break; - return (DNS_R_SUCCESS); + case isc_socket_tcp: + INSIST(1); /* XXXMLG */ + break; + } + } } /* @@ -298,6 +536,11 @@ dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, disp->socket = NULL; /* set below */ disp->buffersize = maxbuffersize; disp->refcount = 1; + disp->recvs = 0; + disp->recvs_wanted = 1; + disp->shutting_down = 0; + disp->shutdown_out = 0; + disp->shutdown_why = ISC_R_UNEXPECTED; ISC_LIST_INIT(disp->rq_handlers); ISC_LIST_INIT(disp->rq_events); @@ -329,7 +572,6 @@ dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, res = DNS_R_NOMEMORY; goto out4; } - isc_mempool_setfreemax(disp->bpool, maxbuffers); if (isc_mempool_create(mctx, sizeof(dns_dispentry_t), &disp->rpool) != ISC_R_SUCCESS) { @@ -337,9 +579,32 @@ dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, goto out5; } + /* + * Keep some number of items around. This should be a config + * option. For now, keep 8, but later keep at least two even + * if the caller wants less. This allows us to ensure certain + * things, like an event can be "freed" and the next allocation + * will always succeed. + * + * Note that if limits are placed on anything here, we use one + * event internally, so the actual limit should be "wanted + 1." + * + * XXXMLG + */ + isc_mempool_setfreemax(disp->epool, 8); + isc_mempool_setfreemax(disp->bpool, 8); + isc_mempool_setfreemax(disp->rpool, 8); + + disp->failsafe_ev = allocate_event(disp); + if (disp->failsafe_ev == NULL) { + res = DNS_R_NOMEMORY; + goto out6; + } + /* * should initialize qid_state here XXXMLG */ + disp->qid_state = (unsigned int)disp; disp->magic = DISPATCH_MAGIC; @@ -353,10 +618,8 @@ dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, /* * error returns */ -#if 0 /* enable when needed */ out6: - isc_mempool_destroy(&disp->respool); -#endif + isc_mempool_destroy(&disp->rpool); out5: isc_mempool_destroy(&disp->bpool); out4: @@ -382,14 +645,11 @@ dns_dispatch_destroy(dns_dispatch_t **dispp) disp = *dispp; *dispp = NULL; - killit = ISC_FALSE; - LOCK(&disp->lock); INSIST(disp->refcount > 0); disp->refcount--; - if (disp->refcount == 0) - killit = ISC_TRUE; + killit = ok_to_kill(disp); UNLOCK(&disp->lock); @@ -451,6 +711,7 @@ dns_dispatch_addresponse(dns_dispatch_t *disp, isc_sockaddr_t *dest, res->task = task; res->action = action; res->arg = arg; + res->item_out = ISC_FALSE; ISC_LIST_INIT(res->items); ISC_LINK_INIT(res, link); ISC_LIST_APPEND(disp->qid_table[bucket], res, link); @@ -460,6 +721,8 @@ dns_dispatch_addresponse(dns_dispatch_t *disp, isc_sockaddr_t *dest, *idp = id; *resp = res; + startrecv(disp); + return (DNS_R_SUCCESS); } @@ -479,8 +742,6 @@ dns_dispatch_removeresponse(dns_dispatch_t *disp, dns_dispentry_t **resp, res = *resp; *resp = NULL; - killit = ISC_FALSE; - if (sockevent != NULL) { REQUIRE(*sockevent != NULL); ev = *sockevent; @@ -493,8 +754,7 @@ dns_dispatch_removeresponse(dns_dispatch_t *disp, dns_dispentry_t **resp, INSIST(disp->refcount > 0); disp->refcount--; - if (disp->refcount == 0) - killit = ISC_TRUE; + killit = ok_to_kill(disp); res->magic = 0; bucket = res->bucket; @@ -502,6 +762,14 @@ dns_dispatch_removeresponse(dns_dispatch_t *disp, dns_dispentry_t **resp, ISC_LIST_UNLINK(disp->qid_table[bucket], res, link); isc_mempool_put(disp->rpool, res); + if (ev != NULL) { + free_buffer(disp, ev->buffer.base, ev->buffer.length); + free_event(disp, ev); + } + if (disp->shutting_down == 1) + do_cancel(disp, NULL); + + startrecv(disp); UNLOCK(&disp->lock); @@ -533,6 +801,7 @@ dns_dispatch_addrequest(dns_dispatch_t *disp, res->task = task; res->action = action; res->arg = arg; + res->item_out = ISC_FALSE; ISC_LIST_INIT(res->items); ISC_LINK_INIT(res, link); ISC_LIST_APPEND(disp->rq_handlers, res, link); @@ -541,6 +810,8 @@ dns_dispatch_addrequest(dns_dispatch_t *disp, *resp = res; + startrecv(disp); + return (DNS_R_SUCCESS); } @@ -559,8 +830,6 @@ dns_dispatch_removerequest(dns_dispatch_t *disp, dns_dispentry_t **resp, res = *resp; *resp = NULL; - killit = ISC_FALSE; - if (sockevent != NULL) { REQUIRE(*sockevent != NULL); ev = *sockevent; @@ -573,17 +842,150 @@ dns_dispatch_removerequest(dns_dispatch_t *disp, dns_dispentry_t **resp, INSIST(disp->refcount > 0); disp->refcount--; - if (disp->refcount == 0) - killit = ISC_TRUE; + killit = ok_to_kill(disp); res->magic = 0; ISC_LIST_UNLINK(disp->rq_handlers, res, link); isc_mempool_put(disp->rpool, res); + if (ev != NULL) { + if (ev->buffer.length != 0) + free_buffer(disp, ev->buffer.base, ev->buffer.length); + free_event(disp, ev); + } + + startrecv(disp); UNLOCK(&disp->lock); if (killit) destroy(disp); } + +void +dns_dispatch_freeevent(dns_dispatch_t *disp, dns_dispentry_t *resp, + dns_dispatchevent_t **sockevent) +{ + dns_dispatchevent_t *ev; + isc_boolean_t response; + + REQUIRE(VALID_DISPATCH(disp)); + REQUIRE(sockevent != NULL && *sockevent != NULL); + + ev = *sockevent; + *sockevent = NULL; + + response = ISC_FALSE; + if (VALID_RESPONSE(resp)) { + response = ISC_TRUE; + } else { + REQUIRE(VALID_RESPONSE(resp) || VALID_REQUEST(resp)); + } + + LOCK(&disp->lock); + REQUIRE(ev != disp->failsafe_ev); + + free_buffer(disp, ev->buffer.base, ev->buffer.length); + free_event(disp, ev); + + if (response) + do_next_response(disp, resp); + else + do_next_request(disp, resp); + + startrecv(disp); + + UNLOCK(&disp->lock); +} + +static void +do_next_response(dns_dispatch_t *disp, dns_dispentry_t *resp) +{ + dns_dispatchevent_t *ev; + + INSIST(resp->item_out == ISC_FALSE); + + ev = ISC_LIST_HEAD(resp->items); + if (ev == NULL) { + if (disp->shutting_down == 1) + do_cancel(disp, resp); + return; + } + + ISC_LIST_UNLINK(disp->rq_events, ev, link); + + ev->action = resp->action; + ev->arg = resp->arg; + ev->sender = resp; + resp->item_out = ISC_TRUE; + ISC_TASK_SEND(resp->task, (isc_event_t **)&ev); +} + +static void +do_next_request(dns_dispatch_t *disp, dns_dispentry_t *resp) +{ + dns_dispatchevent_t *ev; + + INSIST(resp->item_out == ISC_FALSE); + + ev = ISC_LIST_HEAD(disp->rq_events); + if (ev == NULL) { + if (disp->shutting_down == 1) + do_cancel(disp, resp); + return; + } + + ISC_LIST_UNLINK(disp->rq_events, ev, link); + + ev->action = resp->action; + ev->arg = resp->arg; + ev->sender = resp; + resp->item_out = ISC_TRUE; + ISC_TASK_SEND(resp->task, (isc_event_t **)&ev); +} + +static void +do_cancel(dns_dispatch_t *disp, dns_dispentry_t *resp) +{ + if (disp->shutdown_out == 1) + return; + + /* + * If no target given, find the first request handler. If + * there are packets waiting for any handler, however, don't + * kill them. + */ + if (resp == NULL) { + resp = ISC_LIST_HEAD(disp->rq_handlers); + if (resp != NULL && resp->item_out == ISC_FALSE) + resp = NULL; + } + + /* + * Search for the first responce handler without packets outstanding. + */ + if (resp == NULL) { + resp = linear_first(disp); /* no first item? */ + if (resp == NULL) + return; + + do { + if (resp->item_out == ISC_FALSE) + break; + + resp = linear_next(disp, resp); + } while (resp != NULL); + + /* + * No one to send the cancel event to, so nothing to do. + */ + if (resp == NULL) + return; + } + + /* + * Send the shutdown failsafe event to this response critter IFF + * the queue is empty. If it is not empty, defer. + */ +} diff --git a/lib/dns/include/dns/dispatch.h b/lib/dns/include/dns/dispatch.h index 844da0541d..17c174b125 100644 --- a/lib/dns/include/dns/dispatch.h +++ b/lib/dns/include/dns/dispatch.h @@ -67,17 +67,11 @@ struct dns_dispatchevent { dns_result_t result; /* result code */ isc_int16_t id; /* message id */ isc_sockaddr_t addr; /* address recv'd from */ - unsigned int lattributes; /* some private, some public */ isc_buffer_t buffer; /* data buffer */ }; typedef struct dns_dispentry dns_dispentry_t; -/* - * Private attributes of events - */ -#define DNS_DISPATCHATTR_MPOOL 0x00010000 /* allocated via mpool */ - dns_result_t dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, unsigned int maxbuffersize, @@ -222,7 +216,8 @@ dns_dispatch_removerequest(dns_dispatch_t *disp, dns_dispentry_t **resp, */ void -dns_dispatch_freeevent(dns_dispatch_t *disp, dns_dispatchevent_t **sockevent); +dns_dispatch_freeevent(dns_dispatch_t *disp, dns_dispentry_t *resp, + dns_dispatchevent_t **sockevent); /* * Return a dispatchevent and associated buffer to the dispatch. This needs * to be called if more events are desired but a particular event is fully @@ -242,12 +237,30 @@ void dns_dispatch_attach(dns_dispatch_t *disp, dns_dispatch_t **dispp); /* * Attach to a dispatch handle. + * + * Requires: + * < mumble > + * + * Ensures: + * < mumble > + * + * Returns: + * < mumble > */ void dns_dispatch_detach(dns_dispatch_t **dispp); /* * Detach from a dispatch handle. + * + * Requires: + * < mumble > + * + * Ensures: + * < mumble > + * + * Returns: + * < mumble > */ ISC_LANG_ENDDECLS