From 8c17b6f1e4ab18f857ff013a9f6146cea4d984f3 Mon Sep 17 00:00:00 2001 From: Danny Mayer Date: Tue, 6 Aug 2002 03:32:53 +0000 Subject: [PATCH] Updated code to support more than 63 accepts and connects by adding capability to add more event_wait threads on demand --- lib/isc/win32/socket.c | 499 ++++++++++++++++++++++++++++------------- 1 file changed, 342 insertions(+), 157 deletions(-) diff --git a/lib/isc/win32/socket.c b/lib/isc/win32/socket.c index 0ee23a47fe..25c00101a2 100644 --- a/lib/isc/win32/socket.c +++ b/lib/isc/win32/socket.c @@ -15,7 +15,40 @@ * WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ -/* $Id: socket.c,v 1.20 2002/08/02 03:45:56 mayer Exp $ */ +/* $Id: socket.c,v 1.21 2002/08/06 03:32:53 mayer Exp $ */ + +/* This code has been rewritten to take advantage of Windows Sockets + * I/O Completion Ports and Events. I/O Completion Ports is ONLY + * available on Windows NT, Windows 2000 and Windows XP series of + * the Windows Operating Systems. In CANNOT run on Windows 95, Windows 98 + * or the follow-ons to those Systems. + * + * This code is by nature multithreaded and takes advantage of various + * features to pass on information through the completion port for + * when I/O is completed. All sends and receives are completed through + * the completion port. Due to an implementation bug in Windows 2000, + * Service Pack 2 must installed on the system for this code to run correctly. + * For details on this problem see Knowledge base article Q263823. + * The code checks for this. The number of Completion Port Worker threads + * used is the total number of CPU's + 1. This increases the likelihood that + * a Worker Thread is available for processing a completed request. + * + * All accepts and connects are accomplished through the WSAEventSelect() + * function and the event_wait loop. Events are added to and deleted from + * each event_wait thread via a common event_update stack owned by the socket + * manager. If the event_wait thread runs out of array space in the events + * array it will look for another event_wait thread to add the event. If it + * fails to find another one it will create a new thread to handle the + * outstanding event. + * + * A future enhancement is to use AcceptEx to take avantage of Overlapped + * I/O which allows for enhanced performance of TCP connections. + * This will also reduce the number of events that are waited on by the + * event_wait threads to just the connect sockets and reduce the number + * additional threads required. + * + * XXXPDM 5 August, 2002 + */ #define MAKE_EXTERNAL 1 #include @@ -173,9 +206,11 @@ struct isc_socket { /* Pointers to scatter/gather buffers */ WSABUF iov[ISC_SOCKET_MAXSCATTERGATHER]; size_t totalBytes; - int iEvent; /* Index into Event Array */ WSAEVENT hEvent; /* Event Handle */ long wait_type; /* Events to wait on */ + WSAEVENT hAlert; /* Alert Event Handle */ + DWORD evthread_id; /* Event Thread Id for socket */ + /* Locked by socket lock. */ ISC_LINK(isc_socket_t) link; @@ -223,7 +258,8 @@ typedef struct IoCompletionInfo { /* * Define a maximum number of I/O Completion Port worker threads - * to handle the load on the Completion Port + * to handle the load on the Completion Port. The actual number + * used is the number of CPU's + 1. */ #define MAX_IOCPTHREADS 20 @@ -235,7 +271,8 @@ typedef struct event_change event_change_t; struct event_change { isc_socket_t *sock; - int iEvent; + WSAEVENT hEvent; + DWORD evthread_id; SOCKET fd; unsigned int action; ISC_LINK(event_change_t) link; @@ -243,6 +280,8 @@ struct event_change { /* * Note: We are using an array here since *WaitForMultiple* wants an array + * WARNING: This value may not be greater than 64 since the + * WSAWaitForMultipleEvents function is limited to 64 events. */ #define MAX_EVENTS 64 @@ -255,29 +294,41 @@ typedef struct sock_event_list { int total_events; isc_socket_t *aSockList[MAX_EVENTS]; WSAEVENT aEventList[MAX_EVENTS]; - isc_mutex_t EventLock; - ISC_LIST(event_change_t) event_updates; } sock_event_list; +/* + * Thread Event structure for managing the threads handling events + */ +typedef struct events_thread events_thread_t; + +struct events_thread { + isc_thread_t thread_handle; /* Thread's handle */ + DWORD thread_id; /* Thread's id */ + sock_event_list sockev_list; + isc_socketmgr_t *manager; + ISC_LINK(events_thread_t) link; +}; + #define SOCKET_MANAGER_MAGIC ISC_MAGIC('I', 'O', 'm', 'g') #define VALID_MANAGER(m) ISC_MAGIC_VALID(m, SOCKET_MANAGER_MAGIC) struct isc_socketmgr { /* Not locked. */ - unsigned int magic; - isc_mem_t *mctx; - isc_mutex_t lock; + unsigned int magic; + isc_mem_t *mctx; + isc_mutex_t lock; /* Locked by manager lock. */ - ISC_LIST(isc_socket_t) socklist; - sock_event_list sockev_list; - int event_written; - isc_boolean_t bShutdown; - isc_thread_t watcher; - isc_condition_t shutdown_ok; - HANDLE hIoCompletionPort; - int maxIOCPThreads; - HANDLE hIOCPThreads[MAX_IOCPTHREADS]; - DWORD dwIOCPThreadIds[MAX_IOCPTHREADS]; + ISC_LIST(event_change_t) event_updates; + ISC_LIST(isc_socket_t) socklist; + int event_written; + WSAEVENT prime_alert; + isc_boolean_t bShutdown; + ISC_LIST(events_thread_t) ev_threads; + isc_condition_t shutdown_ok; + HANDLE hIoCompletionPort; + int maxIOCPThreads; + HANDLE hIOCPThreads[MAX_IOCPTHREADS]; + DWORD dwIOCPThreadIds[MAX_IOCPTHREADS]; }; #define CLOSED 0 /* this one must be zero */ @@ -290,6 +341,7 @@ struct isc_socketmgr { #define MAXSCATTERGATHER_SEND (ISC_SOCKET_MAXSCATTERGATHER) #define MAXSCATTERGATHER_RECV (ISC_SOCKET_MAXSCATTERGATHER) +static isc_threadresult_t WINAPI event_wait(void *uap); static isc_threadresult_t WINAPI SocketIoThread(LPVOID ThreadContext); static void free_socket(isc_socket_t **); @@ -311,7 +363,7 @@ enum { #if defined(ISC_SOCKET_DEBUG) /* - * This is used to duump the contents of the sock structure + * This is used to dump the contents of the sock structure * You should make sure that the sock is locked before * dumping it. Since the code uses simple printf() statements * it should only be used interactively. @@ -372,8 +424,10 @@ signal_iocompletionport_exit(isc_socketmgr_t *manager) { int errval; char strbuf[ISC_STRERRORSIZE]; + REQUIRE(VALID_MANAGER(manager)); for (i = 0; i < manager->maxIOCPThreads; i++) { - if (!PostQueuedCompletionStatus(manager->hIoCompletionPort, 0, 0, 0)) { + if (!PostQueuedCompletionStatus(manager->hIoCompletionPort, + 0, 0, 0)) { errval = GetLastError(); isc__strerror(errval, strbuf, sizeof(strbuf)); FATAL_ERROR(__FILE__, __LINE__, @@ -392,8 +446,10 @@ void iocompletionport_createthreads(int total_threads, isc_socketmgr_t *manager) { int errval; char strbuf[ISC_STRERRORSIZE]; - int i = 0; + int i; + INSIST(total_threads > 0); + REQUIRE(VALID_MANAGER(manager)); /* * We need at least one */ @@ -420,6 +476,8 @@ void iocompletionport_init(isc_socketmgr_t *manager) { int errval; char strbuf[ISC_STRERRORSIZE]; + + REQUIRE(VALID_MANAGER(manager)); /* * Create a private heap to handle the socket overlapped structure * The miniumum number of structures is 10, there is no maximum @@ -453,6 +511,8 @@ iocompletionport_init(isc_socketmgr_t *manager) { void iocompletionport_exit(isc_socketmgr_t *manager) { + + REQUIRE(VALID_MANAGER(manager)); if (manager->hIoCompletionPort != NULL) { /* Get each of the service threads to exit */ @@ -465,8 +525,9 @@ iocompletionport_exit(isc_socketmgr_t *manager) { */ void iocompletionport_update(isc_socket_t *sock) { - HANDLE hiocp; + + REQUIRE(sock != NULL); if(sock->iocp == 0) { sock->iocp = 1; hiocp = CreateIoCompletionPort((HANDLE) sock->fd, @@ -482,6 +543,7 @@ socket_event_minit(sock_event_list *evlist) { BOOL bReset; int i; + REQUIRE(evlist != NULL); /* Initialize the Event List */ evlist->max_event = 0; evlist->total_events = 0; @@ -490,109 +552,213 @@ socket_event_minit(sock_event_list *evlist) { evlist->aEventList[i] = (WSAEVENT) 0; } - /* - * initialize the lock - */ - if (isc_mutex_init(&(evlist->EventLock)) != ISC_R_SUCCESS) { - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_mutex_init() %s Event Lock", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - } evlist->aEventList[0] = WSACreateEvent(); (evlist->max_event)++; bReset = WSAResetEvent(evlist->aEventList[0]); - - ISC_LIST_INIT(evlist->event_updates); } - /* - * Note that the eventLock is already locked before calling this function + * Event Thread Initialization */ isc_result_t -socket_eventlist_add(isc_socket_t *sock, sock_event_list *evlist) { - int max_event; +event_thread_create(events_thread_t **evthreadp, isc_socketmgr_t *manager) { + events_thread_t *evthread; + REQUIRE(VALID_MANAGER(manager)); + REQUIRE(evthreadp != NULL && *evthreadp == NULL); + + evthread = isc_mem_get(manager->mctx, sizeof(*evthread)); + socket_event_minit(&evthread->sockev_list); + ISC_LINK_INIT(evthread, link); + evthread->manager = manager; + + ISC_LIST_APPEND(manager->ev_threads, evthread, link); + + /* + * Start up the event wait thread. + */ + if (isc_thread_create(event_wait, evthread, &evthread->thread_handle) != + ISC_R_SUCCESS) { + isc_mem_put(manager->mctx, evthread, sizeof(*evthread)); + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_thread_create() %s", + isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, + ISC_MSG_FAILED, "failed")); + return (ISC_R_UNEXPECTED); + } + *evthreadp = evthread; + return (ISC_R_SUCCESS); +} +/* + * Locate a thread with space for additional events or create one if + * necessary. The manager is locked at this point so the information + * cannot be changed by another thread while we are searching. + */ +void +locate_available_thread(isc_socketmgr_t *manager) { + events_thread_t *evthread; + DWORD threadid = GetCurrentThreadId(); + + evthread = ISC_LIST_HEAD(manager->ev_threads); + while (evthread != NULL) { + /* + * We need to find a thread with space to add an event + * If we find it, alert it to process the event change + * list + */ + if(threadid != evthread->thread_id && + evthread->sockev_list.max_event < MAX_EVENTS) { + WSASetEvent(evthread->sockev_list.aEventList[0]); + return; + } + evthread = ISC_LIST_NEXT(evthread, link); + } + /* + * We need to create a new thread as other threads are full. + * If we succeed in creating the thread, alert it to + * process the event change list since it will have space. + * If we are unable to create one, the event will stay on the + * list and the next event_wait thread will try again to add + * the event. It will call here again if it has no space. + */ + if (event_thread_create(&evthread, manager) == ISC_R_SUCCESS) { + WSASetEvent(evthread->sockev_list.aEventList[0]); + } + +} + +isc_boolean_t +socket_eventlist_add(event_change_t *evchange, sock_event_list *evlist, + isc_socketmgr_t *manager) { + int max_event; + isc_socket_t *sock; + REQUIRE(evchange != NULL); + + sock = evchange->sock; REQUIRE(sock != NULL); REQUIRE(sock->hEvent != NULL); REQUIRE(evlist != NULL); max_event = evlist->max_event; if(max_event >= MAX_EVENTS) { - return(ISC_R_NOSPACE); + locate_available_thread(manager); + return (ISC_FALSE); } - sock->iEvent = max_event; evlist->aSockList[max_event] = sock; evlist->aEventList[max_event] = sock->hEvent; evlist->max_event++; evlist->total_events++; - return (ISC_R_SUCCESS); + sock->hAlert = evlist->aEventList[0]; + sock->evthread_id = GetCurrentThreadId(); + return (ISC_TRUE); } /* * Note that the eventLock is locked before calling this function - * All Events and associated sockets are closed here + * All Events and associated sockes are closed here */ -void -socket_eventlist_delete(isc_socket_t *sock, SOCKET fd, int iEvent, - sock_event_list *evlist) { +isc_boolean_t +socket_eventlist_delete(event_change_t *evchange, sock_event_list *evlist) { int i; WSAEVENT hEvent; + int iEvent = -1; + + REQUIRE(evchange != NULL); + /* Make sure this is the right thread from which to delete the event */ + if(evchange->evthread_id != GetCurrentThreadId()) + return (ISC_FALSE); REQUIRE(evlist != NULL); - REQUIRE(iEvent > 0); + REQUIRE(evchange->hEvent != NULL); + hEvent = evchange->hEvent; - hEvent = evlist->aEventList[iEvent]; - if (hEvent != NULL) - WSACloseEvent(hEvent); + /* Find the Event */ + for (i = 1; i < evlist->max_event; i++) { + if (evlist->aEventList[i] == hEvent) { + iEvent = i; + break; + } + } + /* Actual event start at 1 */ + if (iEvent < 1) + return (ISC_FALSE); - - for(i = iEvent; i < evlist->max_event; i++) { + for(i = iEvent; i < (evlist->max_event - 1); i++) { evlist->aEventList[i] = evlist->aEventList[i + 1]; evlist->aSockList[i] = evlist->aSockList[i + 1]; - if(evlist->aSockList[i] != NULL) - evlist->aSockList[i]->iEvent = i; } - evlist->aEventList[evlist->max_event] = 0; - evlist->aSockList[evlist->max_event] = NULL; - if(sock != NULL) { - sock->iEvent = 0; - sock->pending_close = 1; - } - if (fd >= 0) - closesocket(fd); + evlist->aEventList[evlist->max_event - 1] = 0; + evlist->aSockList[evlist->max_event - 1] = NULL; + + /* Cleanup */ + WSACloseEvent(hEvent); + if (evchange->fd >= 0) + closesocket(evchange->fd); evlist->max_event--; evlist->total_events--; + return (ISC_TRUE); } /* * Get the event changes off of the list and apply the - * requested changes + * requested changes. The manager lock is taken out at + * the start of this function to prevent other event_wait + * threads processing the same information at the same + * time. The queue may not be empty on exit since other + * threads may be involved in processing the queue. + * + * The deletes are done first in order that there be space + * available for the events being added in the same thread + * in case the event list is almost full. This reduces the + * probability of having to create another thread which would + * increase overhead costs. */ isc_result_t process_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) { event_change_t *evchange; + event_change_t *next; + isc_boolean_t del; - evchange = ISC_LIST_HEAD(evlist->event_updates); + REQUIRE(evlist != NULL); + + LOCK(&manager->lock); + + /* First the deletes */ + evchange = ISC_LIST_HEAD(manager->event_updates); while (evchange != NULL) { - switch (evchange->action) { - case EVENT_ADD: - socket_eventlist_add(evchange->sock, evlist); - break; - case EVENT_DELETE: - socket_eventlist_delete(evchange->sock, evchange->fd, - evchange->iEvent, evlist); - break; - default: - break; - } + next = ISC_LIST_NEXT(evchange, link); + del = ISC_FALSE; + if(evchange->action == EVENT_DELETE) { + del = socket_eventlist_delete(evchange, evlist); - ISC_LIST_DEQUEUE(evlist->event_updates, evchange, link); - HeapFree(hHeapHandle, 0, evchange); - manager->event_written--; - evchange = ISC_LIST_HEAD(evlist->event_updates); + /* Delete only if this thread's socket list was updated */ + if (del) { + ISC_LIST_DEQUEUE(manager->event_updates, + evchange, link); + HeapFree(hHeapHandle, 0, evchange); + manager->event_written--; + } + } + evchange = next; } - manager->event_written = 0; + /* Now the adds */ + evchange = ISC_LIST_HEAD(manager->event_updates); + while (evchange != NULL) { + next = ISC_LIST_NEXT(evchange, link); + del = ISC_FALSE; + if(evchange->action == EVENT_ADD) { + del = socket_eventlist_add(evchange, evlist, manager); + + /* Delete only if this thread's socket list was updated */ + if (del) { + ISC_LIST_DEQUEUE(manager->event_updates, + evchange, link); + HeapFree(hHeapHandle, 0, evchange); + manager->event_written--; + } + } + evchange = next; + } + UNLOCK(&manager->lock); return (ISC_R_SUCCESS); } /* @@ -600,24 +766,32 @@ process_eventlist(sock_event_list *evlist, isc_socketmgr_t *manager) { * event loop */ static void -notify_eventlist(isc_socket_t *sock, sock_event_list *evlist, +notify_eventlist(isc_socket_t *sock, isc_socketmgr_t *manager, unsigned int action) { event_change_t *evchange; + + REQUIRE(VALID_MANAGER(manager)); + REQUIRE(sock != NULL); evchange = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(event_change_t)); evchange->sock = sock; evchange->action = action; - evchange->iEvent = sock->iEvent; + evchange->hEvent = sock->hEvent; evchange->fd = sock->fd; + evchange->evthread_id = sock->evthread_id; - LOCK(&evlist->EventLock); - ISC_LIST_APPEND(evlist->event_updates, evchange, link); - + LOCK(&manager->lock); + ISC_LIST_APPEND(manager->event_updates, evchange, link); sock->manager->event_written++; - UNLOCK(&evlist->EventLock); - WSASetEvent(evlist->aEventList[0]); /* Alert the Wait List */ + UNLOCK(&manager->lock); + + /* Alert the Wait List */ + if (sock->hAlert != NULL) + WSASetEvent(sock->hAlert); + else + WSASetEvent(manager->prime_alert); } /* * Note that the socket is already locked before calling this function @@ -626,10 +800,8 @@ isc_result_t socket_event_add(isc_socket_t *sock, long type) { int stat; WSAEVENT hEvent; - sock_event_list *evlist; REQUIRE(sock != NULL); - evlist = &(sock->manager->sockev_list); hEvent = WSACreateEvent(); if (hEvent == WSA_INVALID_EVENT) { @@ -644,7 +816,7 @@ socket_event_add(isc_socket_t *sock, long type) { sock->hEvent = hEvent; sock->wait_type = type; - notify_eventlist(sock, evlist, EVENT_ADD); + notify_eventlist(sock, sock->manager, EVENT_ADD); return (ISC_R_SUCCESS); } /* @@ -652,17 +824,17 @@ socket_event_add(isc_socket_t *sock, long type) { */ void socket_event_delete(isc_socket_t *sock) { - sock_event_list *evlist; REQUIRE(sock != NULL); REQUIRE(sock->hEvent != NULL); - evlist = &(sock->manager->sockev_list); if (sock->hEvent != NULL) { sock->wait_type = 0; sock->pending_close = 1; - notify_eventlist(sock, evlist, EVENT_DELETE); + notify_eventlist(sock, sock->manager, EVENT_DELETE); sock->hEvent = NULL; + sock->hAlert = NULL; + sock->evthread_id = 0; } } @@ -675,6 +847,8 @@ socket_event_delete(isc_socket_t *sock) { */ void socket_close(isc_socket_t *sock) { + + REQUIRE(sock != NULL); sock->pending_close = 1; if (sock->hEvent != NULL) socket_event_delete(sock); @@ -701,8 +875,6 @@ BOOL InitSockets() { err = WSAStartup(wVersionRequested, &wsaData); if ( err != 0 ) { /* Tell the user that we could not find a usable Winsock DLL */ - NTReportError("named", - "Application Requires Winsock 2.0 or later. Exiting"); return(FALSE); } return(TRUE); @@ -894,7 +1066,7 @@ connection_reset_fix(SOCKET fd) { if(isc_win32os_majorversion() < 5) return (ISC_R_SUCCESS); /* NT 4.0 has no problem */ - /* disable new behavior using IOCTL: SIO_UDP_CONNRESET */ + /* disable bad behavior using IOCTL: SIO_UDP_CONNRESET */ status = WSAIoctl(fd, SIO_UDP_CONNRESET, &bNewBehavior, sizeof(bNewBehavior), NULL, 0, &dwBytesReturned, NULL, NULL); @@ -1243,7 +1415,7 @@ completeio_recv(isc_socket_t *sock, isc_socketevent_t *dev, /* * If we read less than we expected, update counters, - * and let the upper layer poke the descriptor. + * and let the upper layer handle it. */ if (((size_t)cc != sock->totalBytes) && (dev->n < dev->minimum)) return (DOIO_SOFT); @@ -1468,6 +1640,8 @@ destroy_socket(isc_socket_t **sockp) { isc_socket_t *sock = *sockp; isc_socketmgr_t *manager = sock->manager; + REQUIRE(sock != NULL); + socket_log(sock, NULL, CREATION, isc_msgcat, ISC_MSGSET_SOCKET, ISC_MSG_DESTROYING, "destroying socket %d", sock->fd); @@ -1475,7 +1649,6 @@ destroy_socket(isc_socket_t **sockp) { INSIST(ISC_LIST_EMPTY(sock->recv_list)); INSIST(ISC_LIST_EMPTY(sock->send_list)); INSIST(sock->connect_ev == NULL); - REQUIRE(sock->fd >= 0); LOCK(&manager->lock); @@ -1537,8 +1710,9 @@ allocate_socket(isc_socketmgr_t *manager, isc_sockettype_t type, sock->connected = 0; sock->connecting = 0; sock->bound = 0; - sock->iEvent = 0; sock->hEvent = NULL; + sock->hAlert = NULL; + sock->evthread_id = 0; sock->wait_type = 0; /* @@ -1867,7 +2041,7 @@ internal_accept(isc_socket_t *sock, int accept_errno) { INSIST(VALID_MANAGER(manager)); INSIST(sock->listener); - INSIST(sock->iEvent); + INSIST(sock->hEvent != NULL); INSIST(sock->pending_accept == 1); sock->pending_accept = 0; @@ -1891,8 +2065,8 @@ internal_accept(isc_socket_t *sock, int accept_errno) { /* * Try to accept the new connection. If the accept fails with - * EAGAIN or EINTR, simply poke the watcher to watch this socket - * again. + * EAGAIN or EINTR, the event wait will be notified again since + * the event will be reset on return to caller. */ addrlen = sizeof dev->newsocket->address.type; memset(&dev->newsocket->address.type.sa, 0, addrlen); @@ -2212,6 +2386,11 @@ internal_send(isc_socket_t *sock, isc_socketevent_t *dev, struct msghdr *message UNLOCK(&sock->lock); } +/* + * This is the I/O Completion Port Worker Function. It loops forever + * waiting for I/O to complete and then forwards them for further + * processing. There are a number of these in separate threads. + */ static isc_threadresult_t WINAPI SocketIoThread(LPVOID ThreadContext) { isc_socketmgr_t *manager = ThreadContext; @@ -2228,6 +2407,8 @@ SocketIoThread(LPVOID ThreadContext) { char strbuf[ISC_STRERRORSIZE]; int errstatus; + REQUIRE(VALID_MANAGER(manager)); + /* Set the thread priority high enough so I/O will * preempt normal recv packet processing, but not * higher than the timer sync thread. @@ -2268,8 +2449,6 @@ SocketIoThread(LPVOID ThreadContext) { */ WSAGetOverlappedResult(sock->fd, (LPWSAOVERLAPPED) &lpo, &tbytes, FALSE, &tflags); - errstatus = WSAGetLastError(); - isc__strerror(errstatus, strbuf, sizeof(strbuf)); dev = lpo->dev; } @@ -2309,7 +2488,8 @@ SocketIoThread(LPVOID ThreadContext) { */ static isc_threadresult_t WINAPI event_wait(void *uap) { - isc_socketmgr_t *manager = uap; + events_thread_t *evthread = uap; + isc_socketmgr_t *manager = evthread->manager; int cc; int event_errno; char strbuf[ISC_STRERRORSIZE]; @@ -2320,12 +2500,17 @@ event_wait(void *uap) { WSANETWORKEVENTS NetworkEvents; int err; - evlist = &(manager->sockev_list); + REQUIRE(evthread != NULL); + REQUIRE(VALID_MANAGER(manager)); - /* - * Get the Event Lock here. - */ - LOCK(&evlist->EventLock); + /* We need to know the Id of the thread */ + evthread->thread_id = GetCurrentThreadId(); + + evlist = &(evthread->sockev_list); + + /* See if there's anything waiting to add to the event list */ + if (manager->event_written > 0) + process_eventlist(evlist, manager); while (!manager->bShutdown) { do { @@ -2334,9 +2519,8 @@ event_wait(void *uap) { event_errno = 0; WSAResetEvent(evlist->aEventList[0]); - UNLOCK(&evlist->EventLock); - cc = WSAWaitForMultipleEvents(max_event, - evlist->aEventList,FALSE, WSA_INFINITE, + cc = WSAWaitForMultipleEvents(max_event, + evlist->aEventList, FALSE, WSA_INFINITE, FALSE); if (cc == WSA_WAIT_FAILED) { event_errno = WSAGetLastError(); @@ -2344,7 +2528,7 @@ event_wait(void *uap) { isc__strerror(event_errno, strbuf, sizeof(strbuf)); FATAL_ERROR(__FILE__, __LINE__, - "WSAWaitForMultipleEvents() %s: %s", + "WSAWaitForMultipleEvents() %s: %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, @@ -2353,7 +2537,6 @@ event_wait(void *uap) { } } - LOCK(&evlist->EventLock); } while (cc < 0 && !manager->bShutdown && manager->event_written == 0); @@ -2379,8 +2562,13 @@ event_wait(void *uap) { continue; if (WSAEnumNetworkEvents( wsock->fd, 0, - &NetworkEvents) == SOCKET_ERROR) + &NetworkEvents) == SOCKET_ERROR) { err = WSAGetLastError(); + isc__strerror(err, strbuf, sizeof(strbuf)); + UNEXPECTED_ERROR(__FILE__, __LINE__, + "event_wait: WSAEnumNetworkEvents() %s", + strbuf); + } if(NetworkEvents.lNetworkEvents == 0 ) { WSAResetEvent(wsock->hEvent); @@ -2412,7 +2600,6 @@ event_wait(void *uap) { isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_EXITING, "event_wait exiting")); - UNLOCK(&evlist->EventLock); return ((isc_threadresult_t)0); } /* @@ -2421,10 +2608,11 @@ event_wait(void *uap) { isc_result_t isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { isc_socketmgr_t *manager; + events_thread_t *evthread = NULL; REQUIRE(managerp != NULL && *managerp == NULL); - manager = isc_mem_get(mctx, sizeof *manager); + manager = isc_mem_get(mctx, sizeof(*manager)); if (manager == NULL) return (ISC_R_NOMEMORY); @@ -2432,7 +2620,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { manager->mctx = NULL; ISC_LIST_INIT(manager->socklist); if (isc_mutex_init(&manager->lock) != ISC_R_SUCCESS) { - isc_mem_put(mctx, manager, sizeof *manager); + isc_mem_put(mctx, manager, sizeof(*manager)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_mutex_init() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -2441,7 +2629,7 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { } if (isc_condition_init(&manager->shutdown_ok) != ISC_R_SUCCESS) { DESTROYLOCK(&manager->lock); - isc_mem_put(mctx, manager, sizeof *manager); + isc_mem_put(mctx, manager, sizeof (*manager)); UNEXPECTED_ERROR(__FILE__, __LINE__, "isc_condition_init() %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, @@ -2449,26 +2637,30 @@ isc_socketmgr_create(isc_mem_t *mctx, isc_socketmgr_t **managerp) { return (ISC_R_UNEXPECTED); } + isc_mem_attach(mctx, &manager->mctx); + iocompletionport_init(manager); /* Create the Completion Ports */ - socket_event_minit(&manager->sockev_list); + /* + * Event Wait Thread Initialization + */ + ISC_LIST_INIT(manager->ev_threads); + + /* + * Start up the initial event wait thread. + */ + if (event_thread_create(&evthread, manager) != ISC_R_SUCCESS) { + DESTROYLOCK(&manager->lock); + isc_mem_put(mctx, manager, sizeof(*manager)); + return (ISC_R_UNEXPECTED); + } + + manager->prime_alert = evthread->sockev_list.aEventList[0]; manager->event_written = 0; manager->bShutdown = ISC_FALSE; - /* - * Start up the select/poll thread. - */ - if (isc_thread_create(event_wait, manager, &manager->watcher) != - ISC_R_SUCCESS) { - DESTROYLOCK(&manager->lock); - isc_mem_put(mctx, manager, sizeof *manager); - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_create() %s", - isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, - ISC_MSG_FAILED, "failed")); - return (ISC_R_UNEXPECTED); - } - isc_mem_attach(mctx, &manager->mctx); + /* Initialize the event update list */ + ISC_LIST_INIT(manager->event_updates); *managerp = manager; @@ -2480,6 +2672,7 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { isc_socketmgr_t *manager; int i; isc_mem_t *mctx; + events_thread_t *evthread; /* * Destroy a socket manager. @@ -2510,17 +2703,27 @@ isc_socketmgr_destroy(isc_socketmgr_t **managerp) { */ signal_iocompletionport_exit(manager); manager->bShutdown = ISC_TRUE; - WSASetEvent(manager->sockev_list.aEventList[0]); /* - * Wait for thread to exit. + * Wait for threads to exit. */ - if (isc_thread_join(manager->watcher, NULL) != ISC_R_SUCCESS) - UNEXPECTED_ERROR(__FILE__, __LINE__, - "isc_thread_join() %s", + /* + * Shut down the event wait threads + */ + evthread = ISC_LIST_HEAD(manager->ev_threads); + while (evthread != NULL) { + WSASetEvent(evthread->sockev_list.aEventList[0]); + if (isc_thread_join(evthread->thread_handle, NULL) != ISC_R_SUCCESS) + UNEXPECTED_ERROR(__FILE__, __LINE__, + "isc_thread_join() for event_wait %s", isc_msgcat_get(isc_msgcat, ISC_MSGSET_GENERAL, ISC_MSG_FAILED, "failed")); + ISC_LIST_DEQUEUE(manager->ev_threads, evthread, link); + isc_mem_put(manager->mctx, evthread, sizeof(*evthread)); + evthread = ISC_LIST_HEAD(manager->ev_threads); + } + /* * Now the I/O Completion Port Worker Threads */ @@ -2749,7 +2952,6 @@ socket_send(isc_socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, * We couldn't send all or part of the request right now, so * queue it unless ISC_SOCKFLAG_NORETRY is set. */ -// if ((flags & ISC_SOCKFLAG_NORETRY) == 0) { isc_task_attach(task, &ntask); dev->attributes |= ISC_SOCKEVENTATTR_ATTACHED; @@ -2770,7 +2972,6 @@ socket_send(isc_socket_t *sock, isc_socketevent_t *dev, isc_task_t *task, if ((flags & ISC_SOCKFLAG_IMMEDIATE) != 0) result = ISC_R_INPROGRESS; break; -// } case DOIO_SUCCESS: break; @@ -3286,14 +3487,6 @@ isc_socket_cancel(isc_socket_t *sock, isc_task_t *task, unsigned int how) { dev->result = ISC_R_CANCELED; send_recvdone_event(sock, &dev); } -/* if (sock->iocp == 1) { - CancelRequest = (IoCompletionInfo *) HeapAlloc(hHeapHandle, - HEAP_ZERO_MEMORY, sizeof(IoCompletionInfo)); - CancelRequest->request_type = SOCKET_CANCEL; - CancelRequest->dev = dev; - PostQueuedCompletionStatus(hIoCompletionPort, 0, - (DWORD) sock, &CancelRequest->overlapped); - } */ dev = next; } } @@ -3313,14 +3506,6 @@ isc_socket_cancel(isc_socket_t *sock, isc_task_t *task, unsigned int how) { dev->result = ISC_R_CANCELED; send_senddone_event(sock, &dev); } -/* if (sock->iocp == 1) { - CancelRequest = (IoCompletionInfo *) HeapAlloc(hHeapHandle, - HEAP_ZERO_MEMORY, sizeof(IoCompletionInfo)); - CancelRequest->request_type = SOCKET_CANCEL; - CancelRequest->dev = dev; - PostQueuedCompletionStatus(hIoCompletionPort, 0, - (DWORD) sock, &CancelRequest->overlapped); - } */ dev = next; } }