diff --git a/lib/dns/dispatch.c b/lib/dns/dispatch.c index 9ce2a00d4e..50d6f0b612 100644 --- a/lib/dns/dispatch.c +++ b/lib/dns/dispatch.c @@ -33,6 +33,7 @@ #include #include #include +#include #include "../isc/util.h" @@ -92,6 +93,7 @@ struct dns_dispatch { unsigned int buffers; /* allocated buffers */ ISC_LIST(dns_dispentry_t) rq_handlers; /* request handler list */ ISC_LIST(dns_dispatchevent_t) rq_events; /* holder for rq events */ + dns_tcpmsg_t tcpmsg; /* for tcp streams */ isc_int32_t qid_state; /* state generator info */ unsigned int qid_hashsize; /* hash table size */ unsigned int qid_mask; /* mask for hash table */ @@ -120,6 +122,9 @@ destroy(dns_dispatch_t *); static void udp_recv(isc_task_t *, isc_event_t *); +static void +tcp_recv(isc_task_t *, isc_event_t *); + static void startrecv(dns_dispatch_t *); @@ -236,6 +241,8 @@ destroy(dns_dispatch_t *disp) disp->magic = 0; + dns_tcpmsg_invalidate(&disp->tcpmsg); + isc_socket_detach(&disp->socket); isc_task_detach(&disp->task); @@ -292,16 +299,31 @@ bucket_search(dns_dispatch_t *disp, isc_sockaddr_t *dest, dns_messageid_t id, static void free_buffer(dns_dispatch_t *disp, void *buf, unsigned int len) { + isc_sockettype_t socktype; + INSIST(disp->buffers > 0); disp->buffers--; - XDEBUG(("Freeing buffer %p, length %d, into %s, %d remain\n", - buf, len, (len == disp->buffersize ? "mempool" : "mctx"), - disp->buffers)); - if (len == disp->buffersize) - isc_mempool_put(disp->bpool, buf); - else + socktype = isc_socket_gettype(disp->socket); + + switch (socktype) { + case isc_socket_tcp: isc_mem_put(disp->mctx, buf, len); + break; + case isc_socket_udp: + XDEBUG(("Freeing buffer %p, length %d, into %s, %d remain\n", + buf, len, + (len == disp->buffersize ? "mempool" : "mctx"), + disp->buffers)); + if (len == disp->buffersize) + isc_mempool_put(disp->bpool, buf); + else + isc_mem_put(disp->mctx, buf, len); + break; + default: + INSIST(1); + break; + } } static void * @@ -527,6 +549,178 @@ udp_recv(isc_task_t *task, isc_event_t *ev_in) isc_event_free(&ev_in); } +/* + * General flow: + * + * If I/O result == CANCELED, free the buffer and notify everyone as + * the various queues drain. + * + * If I/O is error (not canceled and not success) log it, free the buffer, + * and restart. + * + * If query: + * if no listeners: free the buffer, restart. + * if listener: allocate event, fill in details. + * If cannot allocate, free buffer, restart. + * if rq event queue is not empty, queue. else, send. + * restart. + * + * If response: + * Allocate event, fill in details. + * If cannot allocate, free buffer, restart. + * find target. If not found, free buffer, restart. + * if event queue is not empty, queue. else, send. + * restart. + */ +static void +tcp_recv(isc_task_t *task, isc_event_t *ev_in) +{ + dns_dispatch_t *disp = ev_in->arg; + dns_tcpmsg_t *tcpmsg = &disp->tcpmsg; + dns_messageid_t id; + dns_result_t dres; + unsigned int flags; + dns_dispentry_t *resp; + dns_dispatchevent_t *rev; + unsigned int bucket; + isc_boolean_t killit; + isc_boolean_t queue_request; + isc_boolean_t queue_response; + + (void)task; /* shut up compiler */ + + XDEBUG(("Got TCP packet!\n")); + + LOCK(&disp->lock); + + INSIST(disp->recvs > 0); + disp->recvs--; + + switch (tcpmsg->result) { + case ISC_R_SUCCESS: + break; + + case ISC_R_EOF: + XDEBUG(("Shutting down on EOF\n")); + disp->shutdown_why = ISC_R_EOF; + disp->shutting_down = 1; + do_cancel(disp, NULL); + /* FALLTHROUGH */ + case ISC_R_CANCELED: + /* + * If the recv() was canceled pass the word on. + */ + killit = ISC_FALSE; + if (disp->recvs == 0 && disp->refcount == 0) + killit = ISC_TRUE; + + UNLOCK(&disp->lock); + + if (killit) + destroy(disp); + + isc_event_free(&ev_in); + return; + + default: + /* + * otherwise, on strange error, log it and restart. + * XXXMLG + */ + goto restart; + } + + XDEBUG(("result %d, length == %d, addr = %p\n", + tcpmsg->result, + tcpmsg->buffer.length, tcpmsg->buffer.base)); + + /* + * Peek into the buffer to see what we can see. + */ + dres = dns_message_peekheader(&tcpmsg->buffer, &id, &flags); + if (dres != DNS_R_SUCCESS) { + XDEBUG(("dns_message_peekheader(): %s\n", + isc_result_totext(dres))); + /* XXXMLG log something here... */ + goto restart; + } + + XDEBUG(("Got valid DNS message header, /QR %c, id %d\n", + ((flags & DNS_MESSAGEFLAG_QR) ? '1' : '0'), id)); + + /* + * Allocate an event to send to the query or response client, and + * allocate a new buffer for our use. + */ + + /* + * Look at flags. If query, check to see if we have someone handling + * them. If response, look to see where it goes. + */ + queue_request = ISC_FALSE; + queue_response = ISC_FALSE; + if ((flags & DNS_MESSAGEFLAG_QR) == 0) { + resp = ISC_LIST_HEAD(disp->rq_handlers); + while (resp != NULL) { + if (resp->item_out == ISC_FALSE) + break; + resp = ISC_LIST_NEXT(resp, link); + } + if (resp == NULL) + queue_request = ISC_TRUE; + rev = allocate_event(disp); + if (rev == NULL) + goto restart; + /* query */ + } else { + /* response */ + bucket = hash(disp, &tcpmsg->address, id); + resp = bucket_search(disp, &tcpmsg->address, id, bucket); + XDEBUG(("Search for response in bucket %d: %s\n", + bucket, (resp == NULL ? "NOT FOUND" : "FOUND"))); + + if (resp == NULL) + goto restart; + queue_response = resp->item_out; + rev = allocate_event(disp); + if (rev == NULL) + goto restart; + } + + /* + * At this point, rev contains the event we want to fill in, and + * resp contains the information on the place to send it to. + * Send the event off. + */ + dns_tcpmsg_keepbuffer(tcpmsg, &rev->buffer); + disp->buffers++; + rev->result = DNS_R_SUCCESS; + rev->id = id; + rev->addr = tcpmsg->address; + if (queue_request) { + ISC_LIST_APPEND(disp->rq_events, rev, link); + } else if (queue_response) { + ISC_LIST_APPEND(resp->items, rev, link); + } else { + ISC_EVENT_INIT(rev, sizeof(*rev), 0, NULL, DNS_EVENT_DISPATCH, + resp->action, resp->arg, resp, NULL, NULL); + XDEBUG(("Sent event for buffer %p (len %d) to task %p\n", + rev->buffer.base, rev->buffer.length, resp->task)); + resp->item_out = ISC_TRUE; + ISC_TASK_SEND(resp->task, (isc_event_t **)&rev); + } + + /* + * Restart recv() to get the next packet. + */ + restart: + startrecv(disp); + + UNLOCK(&disp->lock); + + isc_event_free(&ev_in); +} + /* * disp must be locked */ @@ -576,7 +770,16 @@ startrecv(dns_dispatch_t *disp) break; case isc_socket_tcp: - INSIST(1); /* XXXMLG */ + XDEBUG(("Starting tcp receive\n")); + res = dns_tcpmsg_readmessage(&disp->tcpmsg, + disp->task, tcp_recv, + disp); + if (res != ISC_R_SUCCESS) { + disp->shutdown_why = res; + do_cancel(disp, NULL); + return; + } + disp->recvs++; break; } } @@ -713,6 +916,8 @@ dns_dispatch_create(isc_mem_t *mctx, isc_socket_t *sock, isc_task_t *task, disp->socket = NULL; isc_socket_attach(sock, &disp->socket); + dns_tcpmsg_init(disp->mctx, disp->socket, &disp->tcpmsg); + *dispp = disp; return (DNS_R_SUCCESS); @@ -1115,8 +1320,10 @@ do_cancel(dns_dispatch_t *disp, dns_dispentry_t *resp) { dns_dispatchevent_t *ev; - if (disp->shutdown_out == 1) + if (disp->shutdown_out == 1) { + XDEBUG(("do_cancel() call ignored\n")); return; + } /* * If no target given, find the first request handler. If @@ -1125,7 +1332,7 @@ do_cancel(dns_dispatch_t *disp, dns_dispentry_t *resp) */ if (resp == NULL) { resp = ISC_LIST_HEAD(disp->rq_handlers); - if (resp != NULL && resp->item_out == ISC_FALSE) + if (resp != NULL && resp->item_out == ISC_TRUE) resp = NULL; } @@ -1157,9 +1364,11 @@ do_cancel(dns_dispatch_t *disp, dns_dispentry_t *resp) ev = disp->failsafe_ev; ISC_EVENT_INIT(ev, sizeof (*ev), 0, NULL, DNS_EVENT_DISPATCH, resp->action, resp->arg, resp, NULL, NULL); - ev->result = ISC_R_CANCELED; + ev->result = disp->shutdown_why; ev->buffer.base = NULL; ev->buffer.length = 0; + disp->shutdown_out = 1; + XDEBUG(("Sending failsafe event to task %p\n", resp->task)); ISC_TASK_SEND(resp->task, (isc_event_t **)&ev); }