Hi All,
This patch gets even further along in Dan's named pipe test at:
http://www.kegel.com/pipe.c
Mike
ChangeLog: * rewrite of the named pipe code * allow NtFileFlushBuffers to wait * allow DisconnectNamedPipe to invalidate client cached fd * fix the winehq pipe test now that one extra test passes
Index: server/protocol.def =================================================================== RCS file: /home/wine/wine/server/protocol.def,v retrieving revision 1.65 diff -u -r1.65 protocol.def --- server/protocol.def 4 Apr 2003 22:26:34 -0000 1.65 +++ server/protocol.def 14 May 2003 08:56:24 -0000 @@ -665,6 +665,8 @@ /* Flush a file buffers */ @REQ(flush_file) obj_handle_t handle; /* handle to the file */ +@REPLY + obj_handle_t event; /* event set when finished */ @END @@ -1739,6 +1741,8 @@ /* Disconnect a named pipe */ @REQ(disconnect_named_pipe) obj_handle_t handle; +@REPLY + int fd; /* associated fd to close */ @END Index: server/fd.c =================================================================== RCS file: /home/wine/wine/server/fd.c,v retrieving revision 1.10 diff -u -r1.10 fd.c --- server/fd.c 11 May 2003 02:45:33 -0000 1.10 +++ server/fd.c 14 May 2003 08:56:25 -0000 @@ -963,7 +963,7 @@ } /* default flush() routine */ -int no_flush( struct fd *fd ) +int no_flush( struct fd *fd, struct event **event ) { set_error( STATUS_OBJECT_TYPE_MISMATCH ); return 0; @@ -1002,10 +1002,15 @@ DECL_HANDLER(flush_file) { struct fd *fd = get_handle_fd_obj( current->process, req->handle, 0 ); + struct event * event = NULL; if (fd) { - fd->fd_ops->flush( fd ); + fd->fd_ops->flush( fd, &event ); + if( event ) + { + reply->event = alloc_handle( current->process, event, SYNCHRONIZE, 0 ); + } release_object( fd ); } } Index: server/file.c =================================================================== RCS file: /home/wine/wine/server/file.c,v retrieving revision 1.67 diff -u -r1.67 file.c --- server/file.c 26 Mar 2003 23:41:43 -0000 1.67 +++ server/file.c 14 May 2003 08:56:25 -0000 @@ -72,7 +72,7 @@ static int file_get_poll_events( struct fd *fd ); static void file_poll_event( struct fd *fd, int event ); -static int file_flush( struct fd *fd ); +static int file_flush( struct fd *fd, struct event **event ); static int file_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); static void file_queue_async( struct fd *fd, void *ptr, unsigned int status, int type, int count ); @@ -301,7 +301,7 @@ } -static int file_flush( struct fd *fd ) +static int file_flush( struct fd *fd, struct event **event ) { int ret = (fsync( get_unix_fd(fd) ) != -1); if (!ret) file_set_error(); Index: server/file.h =================================================================== RCS file: /home/wine/wine/server/file.h,v retrieving revision 1.7 diff -u -r1.7 file.h --- server/file.h 26 Mar 2003 23:41:43 -0000 1.7 +++ server/file.h 14 May 2003 08:56:25 -0000 @@ -35,7 +35,7 @@ /* a poll() event occured */ void (*poll_event)(struct fd *,int event); /* flush the object buffers */ - int (*flush)(struct fd *); + int (*flush)(struct fd *, struct event **); /* get file information */ int (*get_file_info)(struct fd *,struct get_file_info_reply *, int *flags); /* queue an async operation - see register_async handler in async.c*/ @@ -55,12 +55,13 @@ extern void set_fd_events( struct fd *fd, int events ); extern obj_handle_t lock_fd( struct fd *fd, file_pos_t offset, file_pos_t count, int shared, int wait ); extern void unlock_fd( struct fd *fd, file_pos_t offset, file_pos_t count ); +extern int flush_cached_fd( struct process *process, obj_handle_t handle, int *fd ); extern int default_fd_add_queue( struct object *obj, struct wait_queue_entry *entry ); extern void default_fd_remove_queue( struct object *obj, struct wait_queue_entry *entry ); extern int default_fd_signaled( struct object *obj, struct thread *thread ); extern void default_poll_event( struct fd *fd, int event ); -extern int no_flush( struct fd *fd ); +extern int no_flush( struct fd *fd, struct event **event ); extern int no_get_file_info( struct fd *fd, struct get_file_info_reply *info, int *flags ); extern void no_queue_async( struct fd *fd, void* ptr, unsigned int status, int type, int count ); extern void main_loop(void); Index: server/handle.c =================================================================== RCS file: /home/wine/wine/server/handle.c,v retrieving revision 1.26 diff -u -r1.26 handle.c --- server/handle.c 19 Feb 2003 00:33:33 -0000 1.26 +++ server/handle.c 14 May 2003 08:56:25 -0000 @@ -397,6 +397,17 @@ return entry->fd; } +int flush_cached_fd( struct process *process, obj_handle_t handle, int *fd ) +{ + struct handle_entry *entry = get_handle( process, handle ); + + if( !entry ) + return -1; + *fd = entry->fd; + entry->fd = -1; + return 0; +} + /* find the first inherited handle of the given type */ /* this is needed for window stations and desktops (don't ask...) */ obj_handle_t find_inherited_handle( struct process *process, const struct object_ops *ops ) Index: server/named_pipe.c =================================================================== RCS file: /home/wine/wine/server/named_pipe.c,v retrieving revision 1.23 diff -u -r1.23 named_pipe.c --- server/named_pipe.c 17 Apr 2003 02:14:04 -0000 1.23 +++ server/named_pipe.c 14 May 2003 08:56:26 -0000 @@ -19,7 +19,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * TODO: - * improve error handling + * message mode */ #include "config.h" @@ -50,26 +50,47 @@ ps_none, ps_idle_server, ps_wait_open, - ps_wait_connect, ps_connected_server, - ps_connected_client, - ps_disconnected + ps_wait_disconnect, + ps_disconnected_server, + ps_wait_connect +}; + +struct wait_info +{ + struct thread *thread; + void *func; + void *overlapped; }; struct named_pipe; -struct pipe_user +struct pipe_server +{ + struct object obj; + struct fd *fd; + enum pipe_state state; + struct pipe_client *client; + struct named_pipe *pipe; + struct pipe_server *next; + struct pipe_server *prev; + struct timeout_user *flush_poll; + struct event *event; + struct wait_info wait; +}; + +struct pipe_client { - struct object obj; - struct fd *fd; - enum pipe_state state; - struct pipe_user *other; - struct named_pipe *pipe; - struct pipe_user *next; - struct pipe_user *prev; - struct thread *thread; - void *func; - void *overlapped; + struct object obj; + struct fd *fd; + struct pipe_server *server; + struct wait_info wait; +}; + +struct connect_wait +{ + struct wait_info wait; + struct connect_wait *next; }; struct named_pipe @@ -80,11 +101,13 @@ unsigned int outsize; unsigned int insize; unsigned int timeout; - struct pipe_user *users; + unsigned int instances; + struct pipe_server *servers; + struct connect_wait *connect_waiters; }; static void named_pipe_dump( struct object *obj, int verbose ); -static void named_pipe_destroy( struct object *obj); +static void named_pipe_destroy( struct object *obj ); static const struct object_ops named_pipe_ops = { @@ -98,120 +121,359 @@ named_pipe_destroy /* destroy */ }; -static void pipe_user_dump( struct object *obj, int verbose ); -static struct fd *pipe_user_get_fd( struct object *obj ); -static void pipe_user_destroy( struct object *obj); +/* common to clients and servers */ +static int pipe_end_get_poll_events( struct fd *fd ); +static int pipe_end_get_info( struct fd *fd, + struct get_file_info_reply *reply, int *flags ); + +/* server end functions */ +static void pipe_server_dump( struct object *obj, int verbose ); +static struct fd *pipe_server_get_fd( struct object *obj ); +static void pipe_server_destroy( struct object *obj); +static int pipe_server_flush( struct fd *fd, struct event **event ); + +static const struct object_ops pipe_server_ops = +{ + sizeof(struct pipe_server), /* size */ + pipe_server_dump, /* dump */ + default_fd_add_queue, /* add_queue */ + default_fd_remove_queue, /* remove_queue */ + default_fd_signaled, /* signaled */ + no_satisfied, /* satisfied */ + pipe_server_get_fd, /* get_fd */ + pipe_server_destroy /* destroy */ +}; + +static const struct fd_ops pipe_server_fd_ops = +{ + pipe_end_get_poll_events, /* get_poll_events */ + default_poll_event, /* poll_event */ + pipe_server_flush, /* flush */ + pipe_end_get_info, /* get_file_info */ + no_queue_async /* queue_async */ +}; -static int pipe_user_get_poll_events( struct fd *fd ); -static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); +/* client end functions */ +static void pipe_client_dump( struct object *obj, int verbose ); +static struct fd *pipe_client_get_fd( struct object *obj ); +static void pipe_client_destroy( struct object *obj ); +static int pipe_client_flush( struct fd *fd, struct event **event ); -static const struct object_ops pipe_user_ops = +static const struct object_ops pipe_client_ops = { - sizeof(struct pipe_user), /* size */ - pipe_user_dump, /* dump */ + sizeof(struct pipe_client), /* size */ + pipe_client_dump, /* dump */ default_fd_add_queue, /* add_queue */ default_fd_remove_queue, /* remove_queue */ default_fd_signaled, /* signaled */ no_satisfied, /* satisfied */ - pipe_user_get_fd, /* get_fd */ - pipe_user_destroy /* destroy */ + pipe_client_get_fd, /* get_fd */ + pipe_client_destroy /* destroy */ }; -static const struct fd_ops pipe_user_fd_ops = +static const struct fd_ops pipe_client_fd_ops = { - pipe_user_get_poll_events, /* get_poll_events */ + pipe_end_get_poll_events, /* get_poll_events */ default_poll_event, /* poll_event */ - no_flush, /* flush */ - pipe_user_get_info, /* get_file_info */ + pipe_client_flush, /* flush */ + pipe_end_get_info, /* get_file_info */ no_queue_async /* queue_async */ }; static void named_pipe_dump( struct object *obj, int verbose ) { - struct named_pipe *pipe = (struct named_pipe *)obj; + struct named_pipe *pipe = (struct named_pipe *) obj; assert( obj->ops == &named_pipe_ops ); fprintf( stderr, "named pipe %p\n" ,pipe); } -static void pipe_user_dump( struct object *obj, int verbose ) +static void pipe_server_dump( struct object *obj, int verbose ) +{ + struct pipe_server *server = (struct pipe_server *) obj; + assert( obj->ops == &pipe_server_ops ); + fprintf( stderr, "named pipe server %p (state %d)\n", + server, server->state ); +} + +static void pipe_client_dump( struct object *obj, int verbose ) { - struct pipe_user *user = (struct pipe_user *)obj; - assert( obj->ops == &pipe_user_ops ); - fprintf( stderr, "named pipe user %p (state %d)\n", user, user->state ); + struct pipe_client *client = (struct pipe_client *) obj; + assert( obj->ops == &pipe_server_ops ); + fprintf( stderr, "named pipe client %p (server state %d)\n", + client, client->server->state ); } static void named_pipe_destroy( struct object *obj) { - struct named_pipe *pipe = (struct named_pipe *)obj; - assert( !pipe->users ); + struct named_pipe *pipe = (struct named_pipe *) obj; + assert( !pipe->servers ); + assert( !pipe->instances ); } -static void notify_waiter( struct pipe_user *user, unsigned int status) +static void notify_waiter( struct wait_info *wait, + unsigned int status ) { - if(user->thread && user->func && user->overlapped) + if( wait->thread && wait->func && wait->overlapped ) { /* queue a system APC, to notify a waiting thread */ - thread_queue_apc(user->thread, NULL, user->func, APC_ASYNC, 1, - user->overlapped, (void *)status, NULL); + thread_queue_apc( wait->thread, NULL, wait->func, APC_ASYNC, + 1, wait->overlapped, (void *)status, NULL ); + } + if( wait->thread ) release_object( wait->thread ); + memset( wait, 0, sizeof (struct wait_info) ); +} + +static void set_waiter( struct wait_info *wait, void *func, void *ov ) +{ + wait->thread = (struct thread *) grab_object( current ); + wait->func = func; + wait->overlapped = ov; +} + +static void notify_connect_waiters( struct named_pipe *pipe ) +{ + struct connect_wait *cw, **x = &pipe->connect_waiters; + + while( *x ) + { + cw = *x; + notify_waiter( &cw->wait, STATUS_SUCCESS ); + release_object( pipe ); + *x = cw->next; + free( cw ); + } +} + +static void queue_connect_waiter( struct named_pipe *pipe, + void *func, void *overlapped ) +{ + struct connect_wait *waiter; + + waiter = malloc( sizeof *waiter ); + if( waiter ) + { + set_waiter( &waiter->wait, func, overlapped ); + waiter->next = pipe->connect_waiters; + pipe->connect_waiters = waiter; + grab_object( pipe ); } - if (user->thread) release_object(user->thread); - user->thread = NULL; - user->func = NULL; - user->overlapped=NULL; + else + set_error( STATUS_NO_MEMORY ); } -static struct fd *pipe_user_get_fd( struct object *obj ) +static struct fd *pipe_client_get_fd( struct object *obj ) { - struct pipe_user *user = (struct pipe_user *)obj; - if (user->fd) return (struct fd *)grab_object( user->fd ); + struct pipe_client *client = (struct pipe_client *) obj; + if( client->fd ) + return (struct fd *) grab_object( client->fd ); set_error( STATUS_PIPE_DISCONNECTED ); return NULL; } -static void pipe_user_destroy( struct object *obj) +static struct fd *pipe_server_get_fd( struct object *obj ) +{ + struct pipe_server *server = (struct pipe_server *) obj; + + switch(server->state) + { + case ps_connected_server: + case ps_wait_disconnect: + assert( server->fd ); + return (struct fd *) grab_object( server->fd ); + + case ps_wait_open: + case ps_idle_server: + set_error( STATUS_PIPE_LISTENING ); + break; + + case ps_disconnected_server: + case ps_wait_connect: + set_error( STATUS_PIPE_DISCONNECTED ); + break; + + default: + assert( 0 ); + } + return NULL; +} + + +static void notify_empty( struct pipe_server *server ) +{ + if( !server->flush_poll ) + return; + assert( server->state == ps_connected_server ); + assert( server->event ); + remove_timeout_user( server->flush_poll ); + server->flush_poll = NULL; + set_event( server->event ); + release_object( server->event ); + server->event = NULL; +} + +static void do_disconnect( struct pipe_server *server ) { - struct pipe_user *user = (struct pipe_user *)obj; + /* we may only have a server fd, if the client disconnected */ + if( server->client ) + { + assert( server->client->server == server ); + assert( server->client->fd ); + release_object( server->client->fd ); + server->client->fd = NULL; + } + assert( server->fd ); + release_object( server->fd ); + server->fd = NULL; +} + +static void pipe_server_destroy( struct object *obj) +{ + struct pipe_server *server = (struct pipe_server *)obj; - assert( obj->ops == &pipe_user_ops ); + assert( obj->ops == &pipe_server_ops ); - if(user->overlapped) - notify_waiter(user,STATUS_HANDLES_CLOSED); + if( server->fd ) + { + notify_empty( server ); + do_disconnect( server ); + } - if(user->other) + if( server->client ) { - release_object( user->other->fd ); - user->other->fd = NULL; - switch(user->other->state) + server->client->server = NULL; + server->client = NULL; + } + + notify_waiter( &server->wait, STATUS_HANDLES_CLOSED ); + + assert( server->pipe->instances ); + server->pipe->instances--; + + /* remove server from pipe's server list */ + if( server->next ) server->next->prev = server->prev; + if( server->prev ) server->prev->next = server->next; + else server->pipe->servers = server->next; + release_object( server->pipe ); +} + +static void pipe_client_destroy( struct object *obj) +{ + struct pipe_client *client = (struct pipe_client *)obj; + struct pipe_server *server = client->server; + + assert( obj->ops == &pipe_client_ops ); + + notify_waiter( &client->wait, STATUS_HANDLES_CLOSED ); + + if( server ) + { + notify_empty( server ); + + switch( server->state ) { case ps_connected_server: - user->other->state = ps_idle_server; + /* Don't destroy the server's fd here as we can't + do a successful flush without it. */ + server->state = ps_wait_disconnect; + release_object( client->fd ); + client->fd = NULL; break; - case ps_connected_client: - user->other->state = ps_disconnected; + case ps_disconnected_server: + server->state = ps_wait_connect; break; default: - fprintf(stderr,"connected pipe has strange state %d!\n", - user->other->state); + assert( 0 ); } - user->other->other=NULL; - user->other = NULL; + assert( server->client ); + server->client = NULL; + client->server = NULL; } - - /* remove user from pipe's user list */ - if (user->next) user->next->prev = user->prev; - if (user->prev) user->prev->next = user->next; - else user->pipe->users = user->next; - if (user->thread) release_object(user->thread); - release_object(user->pipe); - if (user->fd) release_object( user->fd ); + assert( !client->fd ); } -static int pipe_user_get_poll_events( struct fd *fd ) +static int pipe_end_get_poll_events( struct fd *fd ) { return POLLIN | POLLOUT; /* FIXME */ } -static int pipe_user_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ) +static int pipe_data_remaining( struct pipe_server *server ) +{ + struct pollfd pfd; + int fd; + + assert( server->client ); + + fd = get_unix_fd( server->client->fd ); + if( fd < 0 ) + return 0; + pfd.fd = fd; + pfd.events = POLLIN; + pfd.revents = 0; + + if( 0 > poll( &pfd, 1, 0 ) ) + return 0; + + return pfd.revents&POLLIN; +} + +static void check_flushed( void *arg ) +{ + struct pipe_server *server = (struct pipe_server*) arg; + + assert( server->event ); + if( pipe_data_remaining( server ) ) + { + struct timeval tv; + + gettimeofday( &tv, 0 ); + add_timeout( &tv, 100 ); + server->flush_poll = add_timeout_user( &tv, check_flushed, server ); + } + else + notify_empty( server ); +} + +static int pipe_server_flush( struct fd *fd, struct event **event ) +{ + struct pipe_server *server = get_fd_user( fd ); + + if( !server ) + return 0; + + if( server->state != ps_connected_server ) + return 0; + + /* FIXME: if multiple threads flush the same pipe, + maybe should create a list of processes to notify */ + if( server->flush_poll ) + return 0; + + if( pipe_data_remaining( server ) ) + { + struct timeval tv; + + /* this kind of sux - + there's no unix way to be alerted when a pipe becomes empty */ + server->event = create_event( NULL, 0, 0, 0 ); + if( !server->event ) + return 0; + gettimeofday( &tv, 0 ); + add_timeout( &tv, 100 ); + server->flush_poll = add_timeout_user( &tv, check_flushed, server ); + *event = server->event; + } + + return 0; +} + +static int pipe_client_flush( struct fd *fd, struct event **event ) +{ + /* FIXME: what do we have to do for this? */ + return 0; +} + +static int pipe_end_get_info( struct fd *fd, + struct get_file_info_reply *reply, int *flags ) { if (reply) { @@ -234,12 +496,15 @@ { struct named_pipe *pipe; - if ((pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len ))) + pipe = create_named_object( sync_namespace, &named_pipe_ops, name, len ); + if( pipe ) { - if (get_error() != STATUS_OBJECT_NAME_COLLISION) + if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { /* initialize it if it didn't already exist */ - pipe->users = 0; + pipe->servers = 0; + pipe->instances = 0; + pipe->connect_waiters = NULL; } } return pipe; @@ -260,65 +525,80 @@ return NULL; } -static struct pipe_user *get_pipe_user_obj( struct process *process, obj_handle_t handle, - unsigned int access ) +static struct pipe_server *get_pipe_server_obj( struct process *process, + obj_handle_t handle, unsigned int access ) { - return (struct pipe_user *)get_handle_obj( process, handle, access, &pipe_user_ops ); + struct object *obj; + obj = get_handle_obj( process, handle, access, &pipe_server_ops ); + return (struct pipe_server *) obj; } -static struct pipe_user *create_pipe_user( struct named_pipe *pipe ) +static struct pipe_server *create_pipe_server( struct named_pipe *pipe ) { - struct pipe_user *user; + struct pipe_server *server; - user = alloc_object( &pipe_user_ops ); - if(!user) + server = alloc_object( &pipe_server_ops ); + if( !server ) return NULL; - user->fd = NULL; - user->pipe = pipe; - user->state = ps_none; - user->other = NULL; - user->thread = NULL; - user->func = NULL; - user->overlapped = NULL; + server->fd = NULL; + server->pipe = pipe; + server->state = ps_none; + server->client = NULL; + server->flush_poll = NULL; + memset( &server->wait, 0, sizeof (struct wait_info) ); - /* add to list of pipe users */ - if ((user->next = pipe->users)) user->next->prev = user; - user->prev = NULL; - pipe->users = user; + /* add to list of pipe servers */ + if ((server->next = pipe->servers)) server->next->prev = server; + server->prev = NULL; + pipe->servers = server; - grab_object(pipe); + grab_object( pipe ); - return user; + return server; } -static struct pipe_user *find_partner(struct named_pipe *pipe, enum pipe_state state) +static struct pipe_client *create_pipe_client( struct pipe_server *server ) { - struct pipe_user *x; + struct pipe_client *client; - for(x = pipe->users; x; x=x->next) - { - if(x->state==state) - break; - } + client = alloc_object( &pipe_client_ops ); + if( !client ) + return NULL; + + client->fd = NULL; + client->server = server; + memset( &client->wait, 0, sizeof (struct wait_info) ); + + return client; +} + +static struct pipe_server *find_server( struct named_pipe *pipe, + enum pipe_state state ) +{ + struct pipe_server *x; + + for( x = pipe->servers; x; x = x->next ) + if( x->state == state ) + break; - if(!x) + if( !x ) return NULL; - return (struct pipe_user *)grab_object( x ); + return (struct pipe_server *) grab_object( x ); } DECL_HANDLER(create_named_pipe) { struct named_pipe *pipe; - struct pipe_user *user; + struct pipe_server *server; reply->handle = 0; pipe = create_named_pipe( get_req_data(), get_req_data_size() ); - if(!pipe) + if( !pipe ) return; - if (get_error() != STATUS_OBJECT_NAME_COLLISION) + if( get_error() != STATUS_OBJECT_NAME_COLLISION ) { pipe->insize = req->insize; pipe->outsize = req->outsize; @@ -326,14 +606,33 @@ pipe->timeout = req->timeout; pipe->pipemode = req->pipemode; } + else + { + set_error( 0 ); /* clear the name collision */ + if( pipe->maxinstances <= pipe->instances ) + { + set_error( STATUS_PIPE_BUSY ); + release_object( pipe ); + return; + } + if( ( pipe->maxinstances != req->maxinstances ) || + ( pipe->timeout != req->timeout ) || + ( pipe->pipemode != req->pipemode ) ) + { + set_error( STATUS_ACCESS_DENIED ); + release_object( pipe ); + return; + } + } - user = create_pipe_user( pipe ); - - if(user) + server = create_pipe_server( pipe ); + if(server) { - user->state = ps_idle_server; - reply->handle = alloc_handle( current->process, user, GENERIC_READ|GENERIC_WRITE, 0 ); - release_object( user ); + server->state = ps_idle_server; + reply->handle = alloc_handle( current->process, server, + GENERIC_READ|GENERIC_WRITE, 0 ); + server->pipe->instances++; + release_object( server ); } release_object( pipe ); @@ -341,147 +640,176 @@ DECL_HANDLER(open_named_pipe) { - struct pipe_user *user, *partner; + struct pipe_server *server; + struct pipe_client *client; struct named_pipe *pipe; + int fds[2]; reply->handle = 0; - if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() ))) + pipe = open_named_pipe( get_req_data(), get_req_data_size() ); + if ( !pipe ) { set_error( STATUS_NO_SUCH_FILE ); return; } - if (!(partner = find_partner(pipe, ps_wait_open))) + + for( server = pipe->servers; server; server = server->next ) + if( ( server->state==ps_idle_server ) || + ( server->state==ps_wait_open ) ) + break; + release_object( pipe ); + + if ( !server ) { - release_object(pipe); set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } - if ((user = create_pipe_user( pipe ))) - { - int fds[2]; - if(!socketpair(PF_UNIX, SOCK_STREAM, 0, fds)) + client = create_pipe_client( server ); + if( client ) + { + if( !socketpair( PF_UNIX, SOCK_STREAM, 0, fds ) ) { - user->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[1], &user->obj ); - partner->fd = create_anonymous_fd( &pipe_user_fd_ops, fds[0], &partner->obj ); - if (user->fd && partner->fd) + fcntl( fds[0], F_SETFL, 0 ); + fcntl( fds[1], F_SETFL, 0 ); + assert( !client->fd ); + assert( !server->fd ); + client->fd = create_anonymous_fd( &pipe_server_fd_ops, + fds[1], &client->obj ); + server->fd = create_anonymous_fd( &pipe_server_fd_ops, + fds[0], &server->obj ); + if (client->fd && server->fd) { - notify_waiter(partner,STATUS_SUCCESS); - partner->state = ps_connected_server; - partner->other = user; - user->state = ps_connected_client; - user->other = partner; - reply->handle = alloc_handle( current->process, user, req->access, 0 ); + if( server->state == ps_wait_open ) + notify_waiter( &server->wait, STATUS_SUCCESS ); + assert( !server->wait.thread ); + server->state = ps_connected_server; + server->client = client; + client->server = server; + reply->handle = alloc_handle( current->process, client, + req->access, 0 ); } } - else file_set_error(); + else + file_set_error(); - release_object( user ); + release_object( client ); } - release_object( partner ); - release_object( pipe ); } DECL_HANDLER(connect_named_pipe) { - struct pipe_user *user, *partner; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + server = get_pipe_server_obj(current->process, req->handle, 0); + if(!server) return; - if( user->state != ps_idle_server ) - { - set_error(STATUS_PORT_ALREADY_SET); - } - else + switch( server->state ) { - user->state = ps_wait_open; - user->thread = (struct thread *)grab_object(current); - user->func = req->func; - user->overlapped = req->overlapped; - - /* notify all waiters that a pipe just became available */ - while( (partner = find_partner(user->pipe,ps_wait_connect)) ) - { - notify_waiter(partner,STATUS_SUCCESS); - release_object(partner); - } + case ps_idle_server: + case ps_wait_connect: + assert( !server->fd ); + server->state = ps_wait_open; + set_waiter( &server->wait, req->func, req->overlapped ); + notify_connect_waiters( server->pipe ); + break; + case ps_connected_server: + assert( server->fd ); + set_error( STATUS_PIPE_CONNECTED ); + break; + case ps_disconnected_server: + set_error( STATUS_PIPE_BUSY ); + break; + case ps_wait_disconnect: + set_error( STATUS_NO_DATA_DETECTED ); + break; + default: + set_error( STATUS_INVALID_HANDLE ); + break; } - release_object(user); + release_object(server); } DECL_HANDLER(wait_named_pipe) { struct named_pipe *pipe; - struct pipe_user *partner; + struct pipe_server *server; - if (!(pipe = open_named_pipe( get_req_data(), get_req_data_size() ))) + pipe = open_named_pipe( get_req_data(), get_req_data_size() ); + if ( pipe ) { set_error( STATUS_PIPE_NOT_AVAILABLE ); return; } - if( (partner = find_partner(pipe,ps_wait_open)) ) + server = find_server( pipe, ps_wait_open ); + if( server ) { - /* this should use notify_waiter, - but no pipe_user object exists now... */ - thread_queue_apc(current,NULL,req->func, - APC_ASYNC, 1, req->overlapped, STATUS_SUCCESS, NULL); - release_object(partner); + /* there's already a server waiting for a client to connect */ + struct wait_info wait; + set_waiter( &wait, req->func, req->overlapped ); + notify_waiter( &wait, STATUS_SUCCESS ); + release_object( server ); } else - { - struct pipe_user *user; + queue_connect_waiter( pipe, req->func, req->overlapped ); - if( (user = create_pipe_user( pipe )) ) - { - user->state = ps_wait_connect; - user->thread = (struct thread *)grab_object(current); - user->func = req->func; - user->overlapped = req->overlapped; - /* don't release it */ - } - } - release_object(pipe); + release_object( pipe ); } DECL_HANDLER(disconnect_named_pipe) { - struct pipe_user *user; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + reply->fd = -1; + server = get_pipe_server_obj( current->process, req->handle, 0 ); + if( !server ) return; - if( (user->state == ps_connected_server) && - (user->other->state == ps_connected_client) ) + switch( server->state ) { - release_object( user->other->fd ); - user->other->fd = NULL; - user->other->state = ps_disconnected; - user->other->other = NULL; - - release_object( user->fd ); - user->fd = NULL; - user->state = ps_idle_server; - user->other = NULL; + case ps_connected_server: + assert( server->fd ); + assert( server->client ); + assert( server->client->fd ); + + notify_empty( server ); + notify_waiter( &server->client->wait, STATUS_PIPE_DISCONNECTED ); + + /* Dump the client and server fds, but keep the pointers + around - client loses all waiting data */ + server->state = ps_disconnected_server; + do_disconnect( server ); + flush_cached_fd( current->process, req->handle, &reply->fd ); + break; + + case ps_wait_disconnect: + assert( !server->client ); + assert( server->fd ); + do_disconnect( server ); + server->state = ps_wait_connect; + flush_cached_fd( current->process, req->handle, &reply->fd ); + break; + + default: + set_error( STATUS_PIPE_DISCONNECTED ); } - release_object(user); + release_object( server ); } DECL_HANDLER(get_named_pipe_info) { - struct pipe_user *user; + struct pipe_server *server; - user = get_pipe_user_obj(current->process, req->handle, 0); - if(!user) + server = get_pipe_server_obj( current->process, req->handle, 0 ); + if(!server) return; - reply->flags = user->pipe->pipemode; - reply->maxinstances = user->pipe->maxinstances; - reply->insize = user->pipe->insize; - reply->outsize = user->pipe->outsize; + reply->flags = server->pipe->pipemode; + reply->maxinstances = server->pipe->maxinstances; + reply->insize = server->pipe->insize; + reply->outsize = server->pipe->outsize; - release_object(user); + release_object(server); } Index: server/serial.c =================================================================== RCS file: /home/wine/wine/server/serial.c,v retrieving revision 1.29 diff -u -r1.29 serial.c --- server/serial.c 12 Mar 2003 22:38:14 -0000 1.29 +++ server/serial.c 14 May 2003 08:56:26 -0000 @@ -58,7 +58,7 @@ static int serial_get_poll_events( struct fd *fd ); static void serial_poll_event( struct fd *fd, int event ); static int serial_get_info( struct fd *fd, struct get_file_info_reply *reply, int *flags ); -static int serial_flush( struct fd *fd ); +static int serial_flush( struct fd *fd, struct event **event ); static void serial_queue_async(struct fd *fd, void *ptr, unsigned int status, int type, int count); struct serial @@ -329,7 +329,7 @@ set_fd_events ( fd, serial_get_poll_events( fd )); } -static int serial_flush( struct fd *fd ) +static int serial_flush( struct fd *fd, struct event **event ) { /* MSDN says: If hFile is a handle to a communications device, * the function only flushes the transmit buffer. Index: dlls/kernel/sync.c =================================================================== RCS file: /home/wine/wine/dlls/kernel/sync.c,v retrieving revision 1.29 diff -u -r1.29 sync.c --- dlls/kernel/sync.c 12 May 2003 03:28:26 -0000 1.29 +++ dlls/kernel/sync.c 14 May 2003 08:56:26 -0000 @@ -754,6 +754,8 @@ { req->handle = hPipe; ret = !wine_server_call_err( req ); + if ( ret ) + close( reply->fd ); } SERVER_END_REQ; Index: dlls/ntdll/file.c =================================================================== RCS file: /home/wine/wine/dlls/ntdll/file.c,v retrieving revision 1.21 diff -u -r1.21 file.c --- dlls/ntdll/file.c 22 Apr 2003 04:04:17 -0000 1.21 +++ dlls/ntdll/file.c 14 May 2003 08:56:27 -0000 @@ -498,11 +498,19 @@ NTSTATUS WINAPI NtFlushBuffersFile( HANDLE hFile, IO_STATUS_BLOCK* IoStatusBlock ) { NTSTATUS ret; + HANDLE hEvent = NULL; + SERVER_START_REQ( flush_file ) { req->handle = hFile; ret = wine_server_call( req ); + hEvent = reply->event; } SERVER_END_REQ; + if( !ret && hEvent ) + { + ret = NtWaitForSingleObject( hEvent, FALSE, NULL ); + NtClose( hEvent ); + } return ret; }
Index: dlls/kernel/tests/pipe.c =================================================================== RCS file: /home/wine/wine/dlls/kernel/tests/pipe.c,v retrieving revision 1.3 diff -u -r1.3 pipe.c --- dlls/kernel/tests/pipe.c 13 May 2003 04:48:23 -0000 1.3 +++ dlls/kernel/tests/pipe.c 14 May 2003 08:10:17 -0000 @@ -90,10 +90,7 @@ hFile = CreateFileA(PIPENAME, GENERIC_READ|GENERIC_WRITE, 0, NULL, OPEN_EXISTING, 0, 0); - todo_wine - { - ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed"); - } + ok(hFile != INVALID_HANDLE_VALUE, "CreateFile failed"); /* don't try to do i/o if one side couldn't be opened, as it hangs */ if (hFile != INVALID_HANDLE_VALUE) {