diff options
-rw-r--r-- | tests/server/sockfilt.c | 246 |
1 files changed, 163 insertions, 83 deletions
diff --git a/tests/server/sockfilt.c b/tests/server/sockfilt.c index 5403d7c42..9c43f90e4 100644 --- a/tests/server/sockfilt.c +++ b/tests/server/sockfilt.c @@ -532,12 +532,14 @@ static void lograw(unsigned char *buffer, ssize_t len) */ struct select_ws_wait_data { HANDLE handle; /* actual handle to wait for during select */ - HANDLE event; /* internal event to abort waiting thread */ + HANDLE signal; /* internal event to signal handle trigger */ + HANDLE abort; /* internal event to abort waiting thread */ + HANDLE mutex; /* mutex to prevent event race-condition */ }; static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter) { struct select_ws_wait_data *data; - HANDLE handle, handles[2]; + HANDLE mutex, signal, handle, handles[2]; INPUT_RECORD inputrecord; LARGE_INTEGER size, pos; DWORD type, length; @@ -546,8 +548,10 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter) data = (struct select_ws_wait_data *) lpParameter; if(data) { handle = data->handle; - handles[0] = data->event; + handles[0] = data->abort; handles[1] = handle; + signal = data->signal; + mutex = data->mutex; free(data); } else @@ -567,30 +571,41 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter) */ while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE) == WAIT_TIMEOUT) { - /* get total size of file */ - length = 0; - size.QuadPart = 0; - size.LowPart = GetFileSize(handle, &length); - if((size.LowPart != INVALID_FILE_SIZE) || - (GetLastError() == NO_ERROR)) { - size.HighPart = length; - /* get the current position within the file */ - pos.QuadPart = 0; - pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart, - FILE_CURRENT); - if((pos.LowPart != INVALID_SET_FILE_POINTER) || - (GetLastError() == NO_ERROR)) { - /* compare position with size, abort if not equal */ - if(size.QuadPart == pos.QuadPart) { - /* sleep and continue waiting */ - SleepEx(0, FALSE); - continue; + length = WaitForSingleObjectEx(mutex, 0, FALSE); + if(length == WAIT_OBJECT_0) { + /* get total size of file */ + length = 0; + size.QuadPart = 0; + size.LowPart = GetFileSize(handle, &length); + if((size.LowPart != INVALID_FILE_SIZE) || + (GetLastError() == NO_ERROR)) { + size.HighPart = length; + /* get the current position within the file */ + pos.QuadPart = 0; + pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart, + FILE_CURRENT); + if((pos.LowPart != INVALID_SET_FILE_POINTER) || + (GetLastError() == NO_ERROR)) { + /* compare position with size, abort if not equal */ + if(size.QuadPart == pos.QuadPart) { + /* sleep and continue waiting */ + SleepEx(0, FALSE); + ReleaseMutex(mutex); + continue; + } } } + /* there is some data available, stop waiting */ + logmsg("[select_ws_wait_thread] data available, DISK: %p", handle); + SetEvent(signal); + ReleaseMutex(mutex); + break; + } + else if(length == WAIT_ABANDONED) { + /* we are not allowed to process this event, because select_ws + is post-processing the signalled events and we must exit. */ + break; } - /* there is some data available, stop waiting */ - logmsg("[select_ws_wait_thread] data available on DISK: %p", handle); - break; } break; @@ -604,23 +619,34 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter) */ while(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE) == WAIT_OBJECT_0 + 1) { - /* check if this is an actual console handle */ - length = 0; - if(GetConsoleMode(handle, &length)) { - /* retrieve an event from the console buffer */ + length = WaitForSingleObjectEx(mutex, 0, FALSE); + if(length == WAIT_OBJECT_0) { + /* check if this is an actual console handle */ length = 0; - if(PeekConsoleInput(handle, &inputrecord, 1, &length)) { - /* check if the event is not an actual key-event */ - if(length == 1 && inputrecord.EventType != KEY_EVENT) { - /* purge the non-key-event and continue waiting */ - ReadConsoleInput(handle, &inputrecord, 1, &length); - continue; + if(GetConsoleMode(handle, &length)) { + /* retrieve an event from the console buffer */ + length = 0; + if(PeekConsoleInput(handle, &inputrecord, 1, &length)) { + /* check if the event is not an actual key-event */ + if(length == 1 && inputrecord.EventType != KEY_EVENT) { + /* purge the non-key-event and continue waiting */ + ReadConsoleInput(handle, &inputrecord, 1, &length); + ReleaseMutex(mutex); + continue; + } } } + /* there is some data available, stop waiting */ + logmsg("[select_ws_wait_thread] data available, CHAR: %p", handle); + SetEvent(signal); + ReleaseMutex(mutex); + break; + } + else if(length == WAIT_ABANDONED) { + /* we are not allowed to process this event, because select_ws + is post-processing the signalled events and we must exit. */ + break; } - /* there is some data available, stop waiting */ - logmsg("[select_ws_wait_thread] data available on CHAR: %p", handle); - break; } break; @@ -634,43 +660,62 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter) */ while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE) == WAIT_TIMEOUT) { - /* peek into the pipe and retrieve the amount of data available */ - length = 0; - if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) { - /* if there is no data available, sleep and continue waiting */ - if(length == 0) { - SleepEx(0, FALSE); - continue; + length = WaitForSingleObjectEx(mutex, 0, FALSE); + if(length == WAIT_OBJECT_0) { + /* peek into the pipe and retrieve the amount of data available */ + length = 0; + if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) { + /* if there is no data available, sleep and continue waiting */ + if(length == 0) { + SleepEx(0, FALSE); + ReleaseMutex(mutex); + continue; + } + else { + logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length); + } } else { - logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length); + /* if the pipe has been closed, sleep and continue waiting */ + length = GetLastError(); + logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", length); + if(length == ERROR_BROKEN_PIPE) { + SleepEx(0, FALSE); + ReleaseMutex(mutex); + continue; + } } + /* there is some data available, stop waiting */ + logmsg("[select_ws_wait_thread] data available, PIPE: %p", handle); + SetEvent(signal); + ReleaseMutex(mutex); + break; } - else { - /* if the pipe has been closed, sleep and continue waiting */ - length = GetLastError(); - logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", length); - if(length == ERROR_BROKEN_PIPE) { - SleepEx(0, FALSE); - continue; - } + else if(length == WAIT_ABANDONED) { + /* we are not allowed to process this event, because select_ws + is post-processing the signalled events and we must exit. */ + break; } - /* there is some data available, stop waiting */ - logmsg("[select_ws_wait_thread] data available on PIPE: %p", handle); - break; } break; default: /* The handle has an unknown type, try to wait on it */ - WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE); - logmsg("[select_ws_wait_thread] data available on HANDLE: %p", handle); + if(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE) + == WAIT_OBJECT_0 + 1) { + if(WaitForSingleObjectEx(mutex, 0, FALSE) == WAIT_OBJECT_0) { + logmsg("[select_ws_wait_thread] data available, HANDLE: %p", handle); + SetEvent(signal); + ReleaseMutex(mutex); + } + } break; } return 0; } -static HANDLE select_ws_wait(HANDLE handle, HANDLE event) +static HANDLE select_ws_wait(HANDLE handle, HANDLE signal, + HANDLE abort, HANDLE mutex) { struct select_ws_wait_data *data; HANDLE thread = NULL; @@ -679,7 +724,9 @@ static HANDLE select_ws_wait(HANDLE handle, HANDLE event) data = malloc(sizeof(struct select_ws_wait_data)); if(data) { data->handle = handle; - data->event = event; + data->signal = signal; + data->abort = abort; + data->mutex = mutex; /* launch waiting thread */ thread = CreateThread(NULL, 0, @@ -695,21 +742,21 @@ static HANDLE select_ws_wait(HANDLE handle, HANDLE event) return thread; } struct select_ws_data { - curl_socket_t fd; /* the original input handle (indexed by fds) */ - curl_socket_t wsasock; /* the internal socket handle (indexed by wsa) */ - WSAEVENT wsaevent; /* the internal WINSOCK2 event (indexed by wsa) */ - HANDLE thread; /* the internal threads handle (indexed by thd) */ + curl_socket_t fd; /* the original input handle (indexed by fds/idx) */ + curl_socket_t wsasock; /* the internal socket handle (indexed by wsa) */ + WSAEVENT wsaevent; /* the internal WINSOCK event (indexed by wsa) */ + HANDLE signal; /* the internal signal handle (indexed by thd) */ + HANDLE thread; /* the internal thread handle (indexed by thd) */ }; static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout) { + HANDLE abort, mutex, signal, handle, *handles; DWORD milliseconds, wait, idx; WSANETWORKEVENTS wsanetevents; struct select_ws_data *data; - HANDLE handle, *handles; WSAEVENT wsaevent; int error, fds; - HANDLE waitevent = NULL; DWORD nfd = 0, thd = 0, wsa = 0; int ret = 0; @@ -725,9 +772,17 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, return 0; } - /* create internal event to signal waiting threads */ - waitevent = CreateEvent(NULL, TRUE, FALSE, NULL); - if(!waitevent) { + /* create internal event to abort waiting threads */ + abort = CreateEvent(NULL, TRUE, FALSE, NULL); + if(!abort) { + errno = ENOMEM; + return -1; + } + + /* create internal mutex to lock event handling in threads */ + mutex = CreateMutex(NULL, FALSE, NULL); + if(!mutex) { + CloseHandle(abort); errno = ENOMEM; return -1; } @@ -735,7 +790,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, /* allocate internal array for the internal data */ data = calloc(nfds, sizeof(struct select_ws_data)); if(data == NULL) { - CloseHandle(waitevent); + CloseHandle(abort); + CloseHandle(mutex); errno = ENOMEM; return -1; } @@ -743,7 +799,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, /* allocate internal array for the internal event handles */ handles = calloc(nfds, sizeof(HANDLE)); if(handles == NULL) { - CloseHandle(waitevent); + CloseHandle(abort); + CloseHandle(mutex); free(data); errno = ENOMEM; return -1; @@ -768,10 +825,19 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, data[nfd].fd = curlx_sitosk(fds); if(fds == fileno(stdin)) { handle = GetStdHandle(STD_INPUT_HANDLE); - handle = select_ws_wait(handle, waitevent); - handles[nfd] = handle; - data[thd].thread = handle; - thd++; + signal = CreateEvent(NULL, TRUE, FALSE, NULL); + if(signal) { + handle = select_ws_wait(handle, signal, abort, mutex); + if(handle) { + handles[nfd] = signal; + data[thd].signal = signal; + data[thd].thread = handle; + thd++; + } + else { + CloseHandle(signal); + } + } } else if(fds == fileno(stdout)) { handles[nfd] = GetStdHandle(STD_OUTPUT_HANDLE); @@ -794,10 +860,19 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, curl_socket_t socket = curlx_sitosk(fds); WSACloseEvent(wsaevent); handle = (HANDLE) socket; - handle = select_ws_wait(handle, waitevent); - handles[nfd] = handle; - data[thd].thread = handle; - thd++; + signal = CreateEvent(NULL, TRUE, FALSE, NULL); + if(signal) { + handle = select_ws_wait(handle, signal, abort, mutex); + if(handle) { + handles[nfd] = signal; + data[thd].signal = signal; + data[thd].thread = handle; + thd++; + } + else { + CloseHandle(signal); + } + } } } } @@ -816,8 +891,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, /* wait for one of the internal handles to trigger */ wait = WaitForMultipleObjectsEx(nfd, handles, FALSE, milliseconds, FALSE); - /* signal the event handle for the waiting threads */ - SetEvent(waitevent); + /* wait for internal mutex to lock event handling in threads */ + WaitForSingleObjectEx(mutex, INFINITE, FALSE); /* loop over the internal handles returned in the descriptors */ for(idx = 0; idx < nfd; idx++) { @@ -881,6 +956,9 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, } } + /* signal the event handle for the other waiting threads */ + SetEvent(abort); + for(fds = 0; fds < nfds; fds++) { if(FD_ISSET(fds, readfds)) logmsg("select_ws: %d is readable", fds); @@ -898,11 +976,13 @@ static int select_ws(int nfds, fd_set *readfds, fd_set *writefds, } for(idx = 0; idx < thd; idx++) { - WaitForSingleObject(data[idx].thread, INFINITE); + WaitForSingleObjectEx(data[idx].thread, INFINITE, FALSE); CloseHandle(data[idx].thread); + CloseHandle(data[idx].signal); } - CloseHandle(waitevent); + CloseHandle(abort); + CloseHandle(mutex); free(handles); free(data); |