I can't believe how long it was since I posted the first version of this. However, I have finally put the time into this to rework it. Differences from previous version: - Don't use variable sized messages - Use existing code in ntdll - Use existing code for thread wait queue in server ChangeLog: - Add server (and minimal client) support for I/O completion ports License: - LGPL Rob
Index: dlls/ntdll/Makefile.in =================================================================== RCS file: /home/wine/wine/dlls/ntdll/Makefile.in,v retrieving revision 1.55 diff -u -r1.55 Makefile.in --- dlls/ntdll/Makefile.in 12 Apr 2003 00:10:13 -0000 1.55 +++ dlls/ntdll/Makefile.in 21 Apr 2003 14:44:19 -0000 @@ -78,6 +78,7 @@ error.c \ file.c \ heap.c \ + iocompletion.c \ large_int.c \ loader.c \ misc.c \ Index: dlls/ntdll/error.c =================================================================== RCS file: /home/wine/wine/dlls/ntdll/error.c,v retrieving revision 1.8 diff -u -r1.8 error.c --- dlls/ntdll/error.c 2 Jan 2003 17:50:48 -0000 1.8 +++ dlls/ntdll/error.c 21 Apr 2003 14:44:25 -0000 @@ -69,8 +69,9 @@ /* conversion tables */ -static const DWORD table_00000103[31] = +static const DWORD table_00000102[32] = { + ERROR_TIMEOUT, /* 00000102 (STATUS_TIMEOUT) */ ERROR_IO_PENDING, /* 00000103 (STATUS_PENDING) */ ERROR_MR_MID_NOT_FOUND, /* 00000104 */ ERROR_MORE_DATA, /* 00000105 (STATUS_MORE_ENTRIES) */ @@ -1333,7 +1334,7 @@ static const struct error_table error_table[] = { - { 0x00000103, 0x00000122, table_00000103 }, + { 0x00000102, 0x00000122, table_00000102 }, { 0x40000002, 0x4000000e, table_40000002 }, { 0x40000370, 0x40000371, table_40000370 }, { 0x40020056, 0x40020057, table_40020056 }, Index: dlls/ntdll/ntdll.spec =================================================================== RCS file: /home/wine/wine/dlls/ntdll/ntdll.spec,v retrieving revision 1.105 diff -u -r1.105 ntdll.spec --- dlls/ntdll/ntdll.spec 12 Apr 2003 00:10:13 -0000 1.105 +++ dlls/ntdll/ntdll.spec 21 Apr 2003 14:44:30 -0000 @@ -79,7 +79,7 @@ @ stdcall NtCreateEvent(long long long long long) @ stub NtCreateEventPair @ stdcall NtCreateFile(ptr long ptr ptr long long long ptr long long ptr) -@ stub NtCreateIoCompletion +@ stdcall NtCreateIoCompletion(ptr long long long) @ stdcall NtCreateKey(long long long long long long long) @ stdcall NtCreateMailslotFile(long long long long long long long long) @ stub NtCreateMutant @@ -192,7 +192,7 @@ @ stub NtReleaseMutant @ stub NtReleaseProcessMutant @ stdcall NtReleaseSemaphore(long long ptr) -@ stub NtRemoveIoCompletion +@ stdcall NtRemoveIoCompletion(ptr ptr ptr ptr ptr) @ stdcall NtReplaceKey(ptr long ptr) @ stub NtReplyPort @ stdcall NtReplyWaitReceivePort(ptr ptr ptr ptr) @@ -220,7 +220,7 @@ @ stdcall NtSetInformationThread(long long long long) @ stub NtSetInformationToken @ stdcall NtSetIntervalProfile(long long) -@ stub NtSetIoCompletion +@ stdcall NtSetIoCompletion(ptr long ptr long long) @ stub NtSetLdtEntries @ stub NtSetLowEventPair @ stub NtSetLowWaitHighEventPair Index: dlls/ntdll/sync.c =================================================================== RCS file: /home/wine/wine/dlls/ntdll/sync.c,v retrieving revision 1.27 diff -u -r1.27 sync.c --- dlls/ntdll/sync.c 4 Apr 2003 22:26:34 -0000 1.27 +++ dlls/ntdll/sync.c 21 Apr 2003 14:44:33 -0000 @@ -312,11 +312,11 @@ /*********************************************************************** - * wait_reply + * NTDLL_wait_reply * * Wait for a reply on the waiting pipe of the current thread. */ -static int wait_reply( void *cookie ) +int NTDLL_wait_reply( void *cookie ) { int signaled; struct wake_up_reply reply; @@ -329,7 +329,7 @@ if (!reply.cookie) break; /* thread got killed */ if (reply.cookie == cookie) return reply.signaled; /* we stole another reply, wait for the real one */ - signaled = wait_reply( cookie ); + signaled = NTDLL_wait_reply( cookie ); /* and now put the wrong one back in the pipe */ for (;;) { @@ -430,7 +430,7 @@ ret = wine_server_call( req ); } SERVER_END_REQ; - if (ret == STATUS_PENDING) ret = wait_reply( &cookie ); + if (ret == STATUS_PENDING) ret = NTDLL_wait_reply( &cookie ); if (ret != STATUS_USER_APC) break; call_apcs( alertable ); if (alertable) break; Index: include/winternl.h =================================================================== RCS file: /home/wine/wine/include/winternl.h,v retrieving revision 1.25 diff -u -r1.25 winternl.h --- include/winternl.h 12 Apr 2003 00:10:13 -0000 1.25 +++ include/winternl.h 21 Apr 2003 14:46:45 -0000 @@ -1272,6 +1274,22 @@ NTSTATUS WINAPI LdrShutdownThread(void); NTSTATUS WINAPI LdrUnloadDll(HMODULE); NTSTATUS WINAPI LdrUnlockLoaderLock(ULONG,ULONG); + +/************************************************************************* + * I/O completion functions and structures. + * + * Those are not part of standard Winternl.h + */ +typedef struct _FILE_COMPLETION_INFORMATION { + HANDLE CompletionPort; + ULONG_PTR CompletionKey; +} FILE_COMPLETION_INFORMATION; +typedef FILE_COMPLETION_INFORMATION *PFILE_COMPLETION_INFORMATION; + +NTSTATUS WINAPI NtCreateIoCompletion(PHANDLE,ACCESS_MASK,ULONG_PTR,ULONG); +NTSTATUS WINAPI NtSetIoCompletion(HANDLE,ULONG_PTR,LPOVERLAPPED,ULONG,ULONG); +NTSTATUS WINAPI NtRemoveIoCompletion(HANDLE,PULONG_PTR,LPOVERLAPPED*,PIO_STATUS_BLOCK,PLARGE_INTEGER); +NTSTATUS WINAPI NtSetInformationFile(HANDLE,PIO_STATUS_BLOCK,PVOID,ULONG,FILE_INFORMATION_CLASS); #ifdef __cplusplus } /* extern "C" */ Index: include/wine/server_protocol.h =================================================================== RCS file: /home/wine/wine/include/wine/server_protocol.h,v retrieving revision 1.65 diff -u -r1.65 server_protocol.h --- include/wine/server_protocol.h 4 Apr 2003 22:26:34 -0000 1.65 +++ include/wine/server_protocol.h 21 Apr 2003 14:47:04 -0000 @@ -3040,6 +3040,63 @@ }; +struct create_io_completion_request +{ + struct request_header __header; + unsigned int access; + unsigned int concurrent_threads; +}; +struct create_io_completion_reply +{ + struct reply_header __header; + user_handle_t handle; +}; + + +struct set_io_completion_request +{ + struct request_header __header; + user_handle_t handle; + unsigned int bytes_transferred; + void* completion_key; + void* overlapped; +}; +struct set_io_completion_reply +{ + struct reply_header __header; +}; + + +struct remove_io_completion_request +{ + struct request_header __header; + user_handle_t handle; + void* cookie; + abs_time_t timeout; +}; +struct remove_io_completion_reply +{ + struct reply_header __header; + unsigned int bytes_transferred; + void* completion_key; + void* overlapped; +}; + + +struct remove_io_completion_assigned_request +{ + struct request_header __header; + user_handle_t handle; +}; +struct remove_io_completion_assigned_reply +{ + struct reply_header __header; + unsigned int bytes_transferred; + void* completion_key; + void* overlapped; +}; + + enum request { REQ_new_process, @@ -3217,6 +3274,10 @@ REQ_start_hook_chain, REQ_finish_hook_chain, REQ_get_next_hook, + REQ_create_io_completion, + REQ_set_io_completion, + REQ_remove_io_completion, + REQ_remove_io_completion_assigned, REQ_NB_REQUESTS }; @@ -3399,6 +3460,10 @@ struct start_hook_chain_request start_hook_chain_request; struct finish_hook_chain_request finish_hook_chain_request; struct get_next_hook_request get_next_hook_request; + struct create_io_completion_request create_io_completion_request; + struct set_io_completion_request set_io_completion_request; + struct remove_io_completion_request remove_io_completion_request; + struct remove_io_completion_assigned_request remove_io_completion_assigned_request; }; union generic_reply { @@ -3579,8 +3644,12 @@ struct start_hook_chain_reply start_hook_chain_reply; struct finish_hook_chain_reply finish_hook_chain_reply; struct get_next_hook_reply get_next_hook_reply; + struct create_io_completion_reply create_io_completion_reply; + struct set_io_completion_reply set_io_completion_reply; + struct remove_io_completion_reply remove_io_completion_reply; + struct remove_io_completion_assigned_reply remove_io_completion_assigned_reply; }; -#define SERVER_PROTOCOL_VERSION 105 +#define SERVER_PROTOCOL_VERSION 106 #endif /* __WINE_WINE_SERVER_PROTOCOL_H */ Index: server/Makefile.in =================================================================== RCS file: /home/wine/wine/server/Makefile.in,v retrieving revision 1.43 diff -u -r1.43 Makefile.in --- server/Makefile.in 26 Mar 2003 01:32:18 -0000 1.43 +++ server/Makefile.in 21 Apr 2003 14:47:18 -0000 @@ -20,6 +20,7 @@ file.c \ handle.c \ hook.c \ + iocompletion.c \ main.c \ mapping.c \ mutex.c \ Index: server/protocol.def =================================================================== RCS file: /home/wine/wine/server/protocol.def,v retrieving revision 1.65 diff -u -r1.65 protocol.def --- server/protocol.def 4 Apr 2003 22:26:34 -0000 1.65 +++ server/protocol.def 21 Apr 2003 14:47:23 -0000 @@ -2119,3 +2119,39 @@ int next_unicode; /* is the next a unicode hook? */ VARARG(module,unicode_str); /* module name */ @END + +/* Create an I/O completion port */ +@REQ(create_io_completion) + unsigned int access; /* access desired by client */ + unsigned int concurrent_threads; /* the number of concurrent threads processing a request */ +@REPLY + user_handle_t handle; /* handle to the i/o completion port */ +@END + +/* Post data to an I/O completion port */ +@REQ(set_io_completion) + user_handle_t handle; /* handle to the i/o completion port */ + unsigned int bytes_transferred; /* number of bytes transferred */ + void* completion_key; /* user data to send to waiting client */ + void* overlapped; /* overlapped structure to send to client */ +@END + +/* Get or wait for data posted to an I/O completion port */ +@REQ(remove_io_completion) + user_handle_t handle; /* handle to the i/o completion port */ + void* cookie; /* magic cookie to return to client */ + abs_time_t timeout; /* absolute timeout */ +@REPLY + unsigned int bytes_transferred; /* number of bytes transferred */ + void* completion_key; /* user data to send to waiting client */ + void* overlapped; /* overlapped structure to send to client */ +@END + +/* Get data posted to an I/O completion port after waiting for it */ +@REQ(remove_io_completion_assigned) + user_handle_t handle; /* handle to the i/o completion port */ +@REPLY + unsigned int bytes_transferred; /* number of bytes transferred */ + void* completion_key; /* user data to send to waiting client */ + void* overlapped; /* overlapped structure to send to client */ +@END Index: server/request.h =================================================================== RCS file: /home/wine/wine/server/request.h,v retrieving revision 1.83 diff -u -r1.83 request.h --- server/request.h 26 Mar 2003 23:41:43 -0000 1.83 +++ server/request.h 21 Apr 2003 14:47:24 -0000 @@ -278,6 +278,10 @@ DECL_HANDLER(start_hook_chain); DECL_HANDLER(finish_hook_chain); DECL_HANDLER(get_next_hook); +DECL_HANDLER(create_io_completion); +DECL_HANDLER(set_io_completion); +DECL_HANDLER(remove_io_completion); +DECL_HANDLER(remove_io_completion_assigned); #ifdef WANT_REQUEST_HANDLERS @@ -459,6 +463,10 @@ (req_handler)req_start_hook_chain, (req_handler)req_finish_hook_chain, (req_handler)req_get_next_hook, + (req_handler)req_create_io_completion, + (req_handler)req_set_io_completion, + (req_handler)req_remove_io_completion, + (req_handler)req_remove_io_completion_assigned, }; #endif /* WANT_REQUEST_HANDLERS */ Index: server/thread.c =================================================================== RCS file: /home/wine/wine/server/thread.c,v retrieving revision 1.97 diff -u -r1.97 thread.c --- server/thread.c 4 Apr 2003 22:26:34 -0000 1.97 +++ server/thread.c 21 Apr 2003 14:47:27 -0000 @@ -519,7 +519,7 @@ } /* select on a list of handles */ -static void select_on( int count, void *cookie, const obj_handle_t *handles, +void select_on( int count, void *cookie, const obj_handle_t *handles, int flags, const abs_time_t *timeout ) { int ret, i; Index: server/trace.c =================================================================== RCS file: /home/wine/wine/server/trace.c,v retrieving revision 1.162 diff -u -r1.162 trace.c --- server/trace.c 4 Apr 2003 22:26:34 -0000 1.162 +++ server/trace.c 21 Apr 2003 14:47:32 -0000 @@ -2439,6 +2439,52 @@ dump_varargs_unicode_str( cur_size ); } +static void dump_create_io_completion_request( const struct create_io_completion_request *req ) +{ + fprintf( stderr, " access=%08x,", req->access ); + fprintf( stderr, " concurrent_threads=%08x", req->concurrent_threads ); +} + +static void dump_create_io_completion_reply( const struct create_io_completion_reply *req ) +{ + fprintf( stderr, " handle=%p", req->handle ); +} + +static void dump_set_io_completion_request( const struct set_io_completion_request *req ) +{ + fprintf( stderr, " handle=%p,", req->handle ); + fprintf( stderr, " bytes_transferred=%08x,", req->bytes_transferred ); + fprintf( stderr, " completion_key=%p,", req->completion_key ); + fprintf( stderr, " overlapped=%p", req->overlapped ); +} + +static void dump_remove_io_completion_request( const struct remove_io_completion_request *req ) +{ + fprintf( stderr, " handle=%p,", req->handle ); + fprintf( stderr, " cookie=%p,", req->cookie ); + fprintf( stderr, " timeout=" ); + dump_abs_time( &req->timeout ); +} + +static void dump_remove_io_completion_reply( const struct remove_io_completion_reply *req ) +{ + fprintf( stderr, " bytes_transferred=%08x,", req->bytes_transferred ); + fprintf( stderr, " completion_key=%p,", req->completion_key ); + fprintf( stderr, " overlapped=%p", req->overlapped ); +} + +static void dump_remove_io_completion_assigned_request( const struct remove_io_completion_assigned_request *req ) +{ + fprintf( stderr, " handle=%p", req->handle ); +} + +static void dump_remove_io_completion_assigned_reply( const struct remove_io_completion_assigned_reply *req ) +{ + fprintf( stderr, " bytes_transferred=%08x,", req->bytes_transferred ); + fprintf( stderr, " completion_key=%p,", req->completion_key ); + fprintf( stderr, " overlapped=%p", req->overlapped ); +} + static const dump_func req_dumpers[REQ_NB_REQUESTS] = { (dump_func)dump_new_process_request, (dump_func)dump_get_new_process_info_request, @@ -2615,6 +2661,10 @@ (dump_func)dump_start_hook_chain_request, (dump_func)dump_finish_hook_chain_request, (dump_func)dump_get_next_hook_request, + (dump_func)dump_create_io_completion_request, + (dump_func)dump_set_io_completion_request, + (dump_func)dump_remove_io_completion_request, + (dump_func)dump_remove_io_completion_assigned_request, }; static const dump_func reply_dumpers[REQ_NB_REQUESTS] = { @@ -2793,6 +2843,10 @@ (dump_func)dump_start_hook_chain_reply, (dump_func)0, (dump_func)dump_get_next_hook_reply, + (dump_func)dump_create_io_completion_reply, + (dump_func)0, + (dump_func)dump_remove_io_completion_reply, + (dump_func)dump_remove_io_completion_assigned_reply, }; static const char * const req_names[REQ_NB_REQUESTS] = { @@ -2971,6 +3025,10 @@ "start_hook_chain", "finish_hook_chain", "get_next_hook", + "create_io_completion", + "set_io_completion", + "remove_io_completion", + "remove_io_completion_assigned", }; /* ### make_requests end ### */ --- /dev/null Mon Jun 24 01:53:01 2002 +++ dlls/ntdll/iocompletion.c Wed Apr 16 20:37:00 2003 @@ -0,0 +1,211 @@ +/* + * I/O Completion Ports + * + * Copyright (C) 2003 Robert Shearman + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "winbase.h" +#include "winnt.h" +#include "winternl.h" + +#include "config.h" + +#include "ntdll_misc.h" +#include "wine/server.h" +#include "wine/debug.h" + +WINE_DEFAULT_DEBUG_CHANNEL(ntdll); + +extern int NTDLL_wait_reply( void *cookie ); + +/************************************************************************** + * NtCreateIoCompletion (NTDLL.@) + * + * Params: + * CompletionPort [O]: the handle created + * DesiredAccess [I}: the access desired (e.g. GENERIC_ALL) + * Reserved [I}: unknown + * NumberOfConcurrentThreads [I]: the desired number of concurrent + * threads + * Returns: + * Status + * Notes: + * It is effectively a FIFO queue for data and + * a LIFO queue for threads to "minimize context switches". + * The aim is to keep a small number of threads constantly + * active. + * See: + * MSDN for CreateIoCompletionPort spec and + * the article "Inside I/O Completion Ports" + * (http://www.sysinternals.com/ntw2k/info/comport.shtml) + */ +NTSTATUS WINAPI NtCreateIoCompletion ( + OUT PHANDLE CompletionPort, + IN ACCESS_MASK DesiredAccess, + IN ULONG_PTR Reserved, + IN ULONG NumberOfConcurrentThreads + ) +{ + NTSTATUS ret; + + TRACE("(%p, %lx, %lx, %ld)\n", + CompletionPort, + DesiredAccess, + Reserved, + NumberOfConcurrentThreads); + + if (Reserved != 0) + { + FIXME("Reserved != 0 not supported\n"); + return STATUS_INVALID_PARAMETER; + } + + if (DesiredAccess && GENERIC_ALL) + DesiredAccess |= GENERIC_READ | GENERIC_WRITE; + + SERVER_START_REQ( create_io_completion ) + { + req->access = DesiredAccess; + req->concurrent_threads = NumberOfConcurrentThreads; + ret = wine_server_call( req ); + *CompletionPort = reply->handle; + } + SERVER_END_REQ; + + TRACE("returning %lx\n", ret); + return ret; +} + +/************************************************************************** + * NtSetIoCompletion (NTDLL.@) + * + * Params: + * CompletionPort [I]: port to send data to + * CompletionKey [I}: user key to identify this set of data + * lpOverlapped [I}: OVERLAPPED structure to send to port + * NumberOfBytesTransferred [I}: unknown - seems to be set to zero always + * NumberOfBytesToTransfer [I]: Bytes to transfer in this packet of data + * Returns: + * Status + * See: + * MSDN for PostQueuedCompletionStatus spec and + * the article "Inside I/O Completion Ports" + * (http://www.sysinternals.com/ntw2k/info/comport.shtml) + */ +NTSTATUS WINAPI NtSetIoCompletion( + IN HANDLE CompletionPort, + IN ULONG_PTR CompletionKey, + IN LPOVERLAPPED lpOverlapped, + IN ULONG NumberOfBytesTransferred, /* normally set to 0 */ + IN ULONG NumberOfBytesToTransfer /* will become number of bytes transferred in the io operation */ + ) +{ + NTSTATUS ret; + + TRACE("(%p, %lx, %p, %ld, %ld)\n", + CompletionPort, + CompletionKey, + lpOverlapped, + NumberOfBytesTransferred, + NumberOfBytesToTransfer); + + if (NumberOfBytesTransferred != 0) + { + FIXME("NumberOfBytesTransferred != 0 not supported\n"); + return STATUS_INVALID_PARAMETER; + } + + SERVER_START_REQ( set_io_completion ) + { + req->handle = CompletionPort; + req->completion_key = (void *)CompletionKey; + req->overlapped = lpOverlapped; + req->bytes_transferred = NumberOfBytesToTransfer; + ret = wine_server_call( req ); + } + SERVER_END_REQ; + + TRACE("returning %lx\n", ret); + return ret; +} + +/************************************************************************** + * NtRemoveIoCompletion (NTDLL.@) + * + * See: MSDN for GetQueuedCompletionStatus spec and + * the article "Inside I/O Completion Ports" + * (http://www.sysinternals.com/ntw2k/info/comport.shtml) + */ +NTSTATUS WINAPI NtRemoveIoCompletion ( + IN HANDLE CompletionPort, + OUT PULONG_PTR CompletionKey, + OUT LPOVERLAPPED * lplpOverlapped, + OUT PIO_STATUS_BLOCK CompletionStatus, + IN PLARGE_INTEGER WaitTime + ) +{ + NTSTATUS ret; + int cookie; + + TRACE("(%p, %p, %p, %p, %p)\n", + CompletionPort, + CompletionKey, + lplpOverlapped, + CompletionStatus, + WaitTime); + + for (;;) + { + SERVER_START_REQ( remove_io_completion ) + { + req->handle = CompletionPort; + req->cookie = &cookie; + NTDLL_get_server_timeout( &req->timeout, WaitTime ); + ret = wine_server_call( req ); + if (ret == STATUS_SUCCESS) + { + *CompletionKey = (ULONG_PTR)reply->completion_key; + *lplpOverlapped = reply->overlapped; + CompletionStatus->u.Status = STATUS_SUCCESS; + CompletionStatus->Information = reply->bytes_transferred; + } + } + SERVER_END_REQ; + if (ret == STATUS_PENDING) + ret = NTDLL_wait_reply( &cookie ); + if (ret == STATUS_ABANDONED) + { + SERVER_START_REQ( remove_io_completion_assigned ) + { + req->handle = CompletionPort; + ret = wine_server_call( req ); + if (ret == STATUS_SUCCESS) + { + *CompletionKey = (ULONG_PTR)reply->completion_key; + *lplpOverlapped = reply->overlapped; + CompletionStatus->u.Status = STATUS_SUCCESS; + CompletionStatus->Information = reply->bytes_transferred; + } + } + SERVER_END_REQ; + } + break; + } + + TRACE("returning %lx\n", ret); + return ret; +} --- /dev/null Mon Jun 24 01:53:01 2002 +++ server/iocompletion.c Mon Apr 21 16:10:27 2003 @@ -0,0 +1,309 @@ +/* + * I/O Completion Ports + * + * Copyright (C) 2003 Robert Shearman + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "config.h" +#include "wine/port.h" + +#include <stdio.h> + +#include "windef.h" + +#include "handle.h" +#include "thread.h" +#include "request.h" +#include "list.h" + +static void io_completion_dump( struct object *obj, int verbose ); +static void io_completion_destroy( struct object *obj ); +static int io_completion_signaled( struct object * obj, struct thread * thread ); +static int io_completion_satisfied( struct object * obj, struct thread * thread ); + +extern void select_on( int count, void *cookie, const obj_handle_t *handles, + int flags, const abs_time_t *timeout ); + +struct io_completion_data +{ + struct list entry; + unsigned int bytes_transferred; + void * completion_key; + void * overlapped; +}; + +struct io_completion_assigned_data +{ + struct list entry; + struct thread * thread; + struct io_completion_data * data; +}; + +struct io_completion_port +{ + struct object obj; + unsigned int concurrent_threads; + unsigned int max_concurrent_threads; /* FIXME: should we honour this? */ + struct io_completion_data * data_head; /* fifo queue for data */ + struct io_completion_data * data_tail; + + struct io_completion_assigned_data * assigned_data; + + /* Used to determine whether we have initiated the select() + * through GetQueuedCompletionStatus or whether the client + * has done WaitForSingleObject */ + int satisfied; +}; + +static const struct object_ops io_completion_ops = +{ + sizeof(struct io_completion_port), /* size */ + io_completion_dump, /* dump */ + add_queue, /* add_queue */ + remove_queue, /* remove_queue */ + io_completion_signaled, /* signaled */ + io_completion_satisfied, /* satisfied */ + no_get_fd, /* get_fd */ + io_completion_destroy /* destroy */ +}; + +static void io_completion_dump( struct object *obj, int verbose ) +{ + struct io_completion_port *port = (struct io_completion_port *)obj; + assert( obj->ops == &io_completion_ops ); + fprintf( stderr, "I/O completion port max_threads=%d data_head=%p data_tail=%p\n", + port->max_concurrent_threads, port->data_head, port->data_tail ); +} + +static void io_completion_destroy( struct object *obj ) +{ + struct list * current; + struct list * next; + struct io_completion_port *port = (struct io_completion_port *)obj; + assert( obj->ops == &io_completion_ops ); + + /* free data queue */ + if (port->data_head) + { + for (current = &port->data_head->entry, next = current->next; + next != current; current = next) + { + next = current->next; + free( current ); + } + } + + /* free assigned data queue */ + if (port->assigned_data) + { + for (current = &port->assigned_data->entry, next = current->next; + next != current; current = next) + { + next = current->next; + free( ((struct io_completion_assigned_data *)current)->data ); + free( current ); + } + } +} + +static int io_completion_signaled( struct object * obj, struct thread * thread ) +{ + struct io_completion_port * port = (struct io_completion_port *)obj; + assert( obj->ops == &io_completion_ops ); + return (port->data_head != NULL); +} + +static int io_completion_satisfied( struct object * obj, struct thread * thread ) +{ + struct io_completion_port * port = (struct io_completion_port *)obj; + assert( obj->ops == &io_completion_ops ); + return port->satisfied; +} + +static struct object * create_io_completion(unsigned int concurrent_threads) +{ + struct io_completion_port * port; + if (!(port = alloc_object( &io_completion_ops ))) + { + return NULL; + } + + port->data_head = NULL; + port->data_tail = NULL; + port->assigned_data = NULL; + port->concurrent_threads = 0; + port->max_concurrent_threads = concurrent_threads; + port->satisfied = 1; /* abandon any waits on the port immediately */ + + return &port->obj; +} + +static void assign_data(struct io_completion_port * port, struct thread * thread) +{ + struct io_completion_data * data = port->data_head; + struct io_completion_assigned_data * assigned_data; + if ((assigned_data = mem_alloc(sizeof(*data))) != NULL) + { + list_init(&assigned_data->entry); + assigned_data->data = data; + assigned_data->thread = thread; + + if (port->assigned_data) + list_add_head( &port->assigned_data->entry, &assigned_data->entry ); + port->assigned_data = assigned_data; + + port->data_head = (struct io_completion_data *) + list_next( &data->entry, &data->entry ); + list_remove( &data->entry ); + if (port->data_tail == data) /* i.e. we removed the last one */ + port->data_tail = port->data_head = NULL; + } +} + +DECL_HANDLER(create_io_completion) +{ + struct object * obj; + + reply->handle = 0; + if ((obj = create_io_completion(req->concurrent_threads)) != NULL) + { + reply->handle = alloc_handle(current->process, obj, req->access | SYNCHRONIZE, FALSE /*inherit flag*/); + release_object( obj ); + } +} + +DECL_HANDLER(remove_io_completion) +{ + struct io_completion_port * port = (struct io_completion_port *)get_handle_obj( + current->process, + req->handle, + GENERIC_READ, + &io_completion_ops); + + if (!port) + { + reply->bytes_transferred = 0; + reply->completion_key = NULL; + reply->overlapped = NULL; + return; + } + + if (port->data_head) /* there is waiting data */ + { + struct io_completion_data * data = port->data_head; + reply->bytes_transferred = data->bytes_transferred; + reply->completion_key = data->completion_key; + reply->overlapped = data->overlapped; + port->data_head = (struct io_completion_data *) + list_next( &data->entry, &data->entry ); + list_remove( &data->entry ); + if (port->data_tail == data) /* i.e. we removed the last one */ + port->data_tail = port->data_head = NULL; + free( data ); + } + else /* there is no waiting data */ + { + port->satisfied = 0; /* don't abandon wait on the port */ + select_on(1, req->cookie, &req->handle, SELECT_TIMEOUT, &req->timeout); + port->satisfied = 1; /* abandon any waits on the port immediately */ + reply->bytes_transferred = 0; + reply->completion_key = NULL; + reply->overlapped = NULL; + } + release_object( &port->obj ); +} + +DECL_HANDLER(set_io_completion) +{ + struct io_completion_data * data; + struct io_completion_port * port = (struct io_completion_port *)get_handle_obj( + current->process, + req->handle, + GENERIC_WRITE, + &io_completion_ops); + + if (!port) + return; + + if ((data = mem_alloc(sizeof(*data))) != NULL) + { + list_init(&data->entry); + data->bytes_transferred = req->bytes_transferred; + data->completion_key = req->completion_key; + data->overlapped = req->overlapped; + + if (port->data_head == NULL) /* need to update head as well */ + port->data_head = data; + else /* need to update existing data to reflect new data added */ + list_add_tail(&port->data_tail->entry, &data->entry); + + port->data_tail = data; + + if (port->obj.tail != NULL) /* there is a waiting thread */ + { + struct wait_queue_entry * waiting = port->obj.tail; + assign_data( port, waiting->thread ); + wake_thread( waiting->thread ); + } + } + release_object( &port->obj ); +} + +DECL_HANDLER(remove_io_completion_assigned) +{ + struct io_completion_assigned_data * assigned_data; + struct io_completion_port * port = (struct io_completion_port *)get_handle_obj( + current->process, + req->handle, + GENERIC_WRITE, + &io_completion_ops); + + if (!port) + { + reply->bytes_transferred = 0; + reply->completion_key = NULL; + reply->overlapped = NULL; + return; + } + + if (port->assigned_data) + { + LIST_FOR_EACH((struct list *)assigned_data, &port->assigned_data->entry) + { + if (assigned_data->thread == current) + { + reply->bytes_transferred = assigned_data->data->bytes_transferred; + reply->completion_key = assigned_data->data->completion_key; + reply->overlapped = assigned_data->data->overlapped; + if (port->assigned_data == assigned_data) + { + port->assigned_data = (struct io_completion_assigned_data *) + list_next( &port->assigned_data->entry, &port->assigned_data->entry ); + } + list_remove( &assigned_data->entry ); + free( assigned_data->data ); + free( assigned_data ); + return; + } + } + } + + set_error(STATUS_INVALID_PARAMETER); + reply->bytes_transferred = 0; + reply->completion_key = NULL; + reply->overlapped = NULL; +}