Re: [PATCH v3 30/35] multipathd: uxlsnr: add idle notification

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Sat, Nov 27, 2021 at 04:19:23PM +0100, mwilck@xxxxxxxx wrote:
> From: Martin Wilck <mwilck@xxxxxxxx>
> 
> The previous patches added the state machine and the timeout handling,
> but there was no wakeup mechanism for the uxlsnr for cases where
> client connections were waiting for the vecs lock.
> 
> This patch uses the previously introduced wakeup mechanism of
> struct mutex_lock for this purpose. Processes which unlock the
> "global" vecs lock send an event in an eventfd which the uxlsnr
> loop is polling for.
> 
> As we are now woken up for servicing client handlers that don't
> wait for input but for the lock, we need to set up the pollfds
> differently, and iterate over all clients when handling events,
> not only over the ones that are receiving. The hangup handling
> is changed, too. We have to look at every client, even if one has
> hung up. Note that I don't take client_lock for the loop in
> uxsock_listen(), it's not necessary and will be removed elsewhere
> in a follow-up patch.
> 
> With this in place, the lock need not be taken in execute_handler()
> any more. The uxlsnr only ever calls trylock() on the vecs lock,
> avoiding any waiting for other threads to finish.
> 
> Signed-off-by: Martin Wilck <mwilck@xxxxxxxx>
Reviewed-by: Benjamin Marzinski <bmarzins@xxxxxxxxxx>
> ---
>  multipathd/uxlsnr.c | 183 +++++++++++++++++++++++++++++---------------
>  1 file changed, 121 insertions(+), 62 deletions(-)
> 
> diff --git a/multipathd/uxlsnr.c b/multipathd/uxlsnr.c
> index c393477..f559a23 100644
> --- a/multipathd/uxlsnr.c
> +++ b/multipathd/uxlsnr.c
> @@ -24,6 +24,7 @@
>  #include <signal.h>
>  #include <stdbool.h>
>  #include <sys/inotify.h>
> +#include <sys/eventfd.h>
>  #include "checkers.h"
>  #include "debug.h"
>  #include "vector.h"
> @@ -69,6 +70,7 @@ struct client {
>  enum {
>  	POLLFD_UX = 0,
>  	POLLFD_NOTIFY,
> +	POLLFD_IDLE,
>  	POLLFDS_BASE,
>  };
>  
> @@ -89,6 +91,7 @@ static LIST_HEAD(clients);
>  static pthread_mutex_t client_lock = PTHREAD_MUTEX_INITIALIZER;
>  static struct pollfd *polls;
>  static int notify_fd = -1;
> +static int idle_fd = -1;
>  static char *watch_config_dir;
>  
>  static bool _socket_client_is_root(int fd)
> @@ -187,6 +190,17 @@ void uxsock_cleanup(void *arg)
>  	free_polls();
>  }
>  
> +void wakeup_cleanup(void *arg)
> +{
> +	struct mutex_lock *lck = arg;
> +	int fd = idle_fd;
> +
> +	idle_fd = -1;
> +	set_wakeup_fn(lck, NULL);
> +	if (fd != -1)
> +		close(fd);
> +}
> +
>  struct watch_descriptors {
>  	int conf_wd;
>  	int dir_wd;
> @@ -293,6 +307,18 @@ static void handle_inotify(int fd, struct watch_descriptors *wds)
>  
>  static const struct timespec ts_zero = { .tv_sec = 0, };
>  
> +/* call with clients lock held */
> +static bool __need_vecs_lock(void)
> +{
> +	struct client *c;
> +
> +	list_for_each_entry(c, &clients, node) {
> +		if (c->state == CLT_LOCKED_WORK)
> +			return true;
> +	}
> +	return false;
> +}
> +
>  static int parse_cmd(struct client *c)
>  {
>  	int r;
> @@ -310,40 +336,31 @@ static int parse_cmd(struct client *c)
>  	return r;
>  }
>  
> -static int execute_handler(struct client *c, struct vectors *vecs, int timeout)
> +static int execute_handler(struct client *c, struct vectors *vecs)
>  {
> -	int r;
> -	struct timespec tmo;
>  
> -	if (!c->handler)
> +	if (!c->handler || !c->handler->fn)
>  		return -EINVAL;
>  
> -	if (clock_gettime(CLOCK_REALTIME, &tmo) == 0) {
> -		tmo.tv_sec += timeout;
> -	} else {
> -		tmo.tv_sec = 0;
> -	}
> +	return c->handler->fn(c->cmdvec, &c->reply, vecs);
> +}
>  
> -	if (c->handler->locked) {
> -		int locked = 0;
> +static void wakeup_listener(void)
> +{
> +	uint64_t one = 1;
>  
> -		pthread_cleanup_push(cleanup_lock, &vecs->lock);
> -		if (tmo.tv_sec) {
> -			r = timedlock(&vecs->lock, &tmo);
> -		} else {
> -			lock(&vecs->lock);
> -			r = 0;
> -		}
> -		if (r == 0) {
> -			locked = 1;
> -			pthread_testcancel();
> -			r = c->handler->fn(c->cmdvec, &c->reply, vecs);
> -		}
> -		pthread_cleanup_pop(locked);
> -	} else
> -		r = c->handler->fn(c->cmdvec, &c->reply, vecs);
> +	if (idle_fd != -1 &&
> +	    write(idle_fd, &one, sizeof(one)) != sizeof(one))
> +		condlog(1, "%s: failed", __func__);
> +}
>  
> -	return r;
> +static void drain_idle_fd(int fd)
> +{
> +	uint64_t val;
> +	int rc;
> +
> +	rc = read(fd, &val, sizeof(val));
> +	condlog(4, "%s: %d, %"PRIu64, __func__, rc, val);
>  }
>  
>  void default_reply(struct client *c, int r)
> @@ -397,16 +414,19 @@ enum {
>  	STM_BREAK,
>  };
>  
> -static int client_state_machine(struct client *c, struct vectors *vecs)
> +static int client_state_machine(struct client *c, struct vectors *vecs,
> +				short revents)
>  {
>  	ssize_t n;
>  	const char *buf;
>  
> -	condlog(4, "%s: cli[%d] state=%d cmd=\"%s\" repl \"%s\"", __func__,
> -		c->fd, c->state, c->cmd, get_strbuf_str(&c->reply));
> +	condlog(4, "%s: cli[%d] poll=%x state=%d cmd=\"%s\" repl \"%s\"", __func__,
> +		c->fd, revents, c->state, c->cmd, get_strbuf_str(&c->reply));
>  
>  	switch (c->state) {
>  	case CLT_RECV:
> +		if (!(revents & POLLIN))
> +			return STM_BREAK;
>  		if (c->cmd_len == 0) {
>  			/*
>  			 * We got POLLIN; assume that at least the length can
> @@ -462,17 +482,30 @@ static int client_state_machine(struct client *c, struct vectors *vecs)
>  		}
>  		if (c->error)
>  			set_client_state(c, CLT_SEND);
> +		else if (c->handler->locked)
> +			set_client_state(c, CLT_LOCKED_WORK);
>  		else
>  			set_client_state(c, CLT_WORK);
>  		return STM_CONT;
>  
>  	case CLT_LOCKED_WORK:
> -		/* tbd */
> -		set_client_state(c, CLT_WORK);
> -		return STM_CONT;
> +                if (trylock(&vecs->lock) == 0) {
> +			/* don't use cleanup_lock(), lest we wakeup ourselves */
> +			pthread_cleanup_push_cast(__unlock, &vecs->lock);
> +			c->error = execute_handler(c, vecs);
> +			pthread_cleanup_pop(1);
> +			condlog(4, "%s: cli[%d] grabbed lock", __func__, c->fd);
> +			free_keys(c->cmdvec);
> +			c->cmdvec = NULL;
> +			set_client_state(c, CLT_SEND);
> +			return STM_CONT;
> +		} else {
> +			condlog(4, "%s: cli[%d] waiting for lock", __func__, c->fd);
> +			return STM_BREAK;
> +		}
>  
>  	case CLT_WORK:
> -		c->error = execute_handler(c, vecs, uxsock_timeout / 1000);
> +		c->error = execute_handler(c, vecs);
>  		free_keys(c->cmdvec);
>  		c->cmdvec = NULL;
>  		set_client_state(c, CLT_SEND);
> @@ -499,9 +532,14 @@ static int client_state_machine(struct client *c, struct vectors *vecs)
>  	}
>  }
>  
> -static void handle_client(struct client *c, struct vectors *vecs)
> +static void handle_client(struct client *c, struct vectors *vecs, short revents)
>  {
> -	while (client_state_machine(c, vecs) == STM_CONT);
> +	if (revents & (POLLHUP|POLLERR)) {
> +		c->error = -ECONNRESET;
> +		return;
> +	}
> +
> +        while (client_state_machine(c, vecs, revents) == STM_CONT);
>  }
>  
>  /*
> @@ -514,6 +552,8 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  	/* conf->sequence_nr will be 1 when uxsock_listen is first called */
>  	unsigned int sequence_nr = 0;
>  	struct watch_descriptors wds = { .conf_wd = -1, .dir_wd = -1 };
> +	bool need_lock = false;
> +	struct vectors *vecs = trigger_data;
>  
>  	condlog(3, "uxsock: startup listener");
>  	polls = calloc(1, max_pfds * sizeof(*polls));
> @@ -524,6 +564,15 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  	notify_fd = inotify_init1(IN_NONBLOCK);
>  	if (notify_fd == -1) /* it's fine if notifications fail */
>  		condlog(3, "failed to start up configuration notifications");
> +
> +	pthread_cleanup_push(wakeup_cleanup, &vecs->lock);
> +	idle_fd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
> +	if (idle_fd == -1) {
> +		condlog(1, "failed to create idle fd");
> +		exit_daemon();
> +	} else
> +		set_wakeup_fn(&vecs->lock, wakeup_listener);
> +
>  	sigfillset(&mask);
>  	sigdelset(&mask, SIGINT);
>  	sigdelset(&mask, SIGTERM);
> @@ -575,11 +624,25 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  		else
>  			polls[POLLFD_NOTIFY].events = POLLIN;
>  
> +		need_lock = __need_vecs_lock();
> +		polls[POLLFD_IDLE].fd = idle_fd;
> +		if (need_lock)
> +			polls[POLLFD_IDLE].events = POLLIN;
> +		else
> +			polls[POLLFD_IDLE].events = 0;
> +
>  		/* setup the clients */
>  		i = POLLFDS_BASE;
>  		list_for_each_entry(c, &clients, node) {
> +			switch(c->state) {
> +			case CLT_RECV:
> +				polls[i].events = POLLIN;
> +				break;
> +			default:
> +				/* don't poll for this client */
> +				continue;
> +			}
>  			polls[i].fd = c->fd;
> -			polls[i].events = POLLIN;
>  			i++;
>  			if (i >= max_pfds)
>  				break;
> @@ -607,33 +670,28 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  			handle_signals(true);
>  			continue;
>  		}
> +		if (polls[POLLFD_IDLE].fd != -1 &&
> +		    polls[POLLFD_IDLE].revents & POLLIN)
> +			drain_idle_fd(idle_fd);
>  
> -		/* see if a client wants to speak to us */
> -		for (i = POLLFDS_BASE; i < n_pfds; i++) {
> -			if (polls[i].revents & (POLLIN|POLLHUP|POLLERR)) {
> -				c = NULL;
> -				pthread_mutex_lock(&client_lock);
> -				list_for_each_entry(tmp, &clients, node) {
> -					if (tmp->fd == polls[i].fd) {
> -						c = tmp;
> -						break;
> -					}
> +		/* see if a client needs handling */
> +		list_for_each_entry_safe(c, tmp, &clients, node) {
> +			short revents = 0;
> +
> +			for (i = POLLFDS_BASE; i < n_pfds; i++) {
> +				if (polls[i].fd == c->fd) {
> +					revents = polls[i].revents;
> +					break;
>  				}
> -				pthread_mutex_unlock(&client_lock);
> -				if (!c) {
> -					condlog(4, "cli%d: new fd %d",
> -						i, polls[i].fd);
> -					continue;
> -				}
> -				if (polls[i].revents & (POLLHUP|POLLERR)) {
> -					condlog(4, "cli[%d]: Disconnected",
> -						c->fd);
> -					dead_client(c);
> -					continue;
> -				}
> -				handle_client(c, trigger_data);
> -				if (c->error == -ECONNRESET)
> -					dead_client(c);
> +			}
> +
> +			handle_client(c, trigger_data, revents);
> +
> +			if (c->error == -ECONNRESET) {
> +				condlog(4, "cli[%d]: disconnected", c->fd);
> +				dead_client(c);
> +				if (i < n_pfds)
> +					polls[i].fd = -1;
>  			}
>  		}
>  		/* see if we got a non-fatal signal */
> @@ -649,5 +707,6 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  			handle_inotify(notify_fd, &wds);
>  	}
>  
> +	pthread_cleanup_pop(1);
>  	return NULL;
>  }
> -- 
> 2.33.1

--
dm-devel mailing list
dm-devel@xxxxxxxxxx
https://listman.redhat.com/mailman/listinfo/dm-devel




[Index of Archives]     [DM Crypt]     [Fedora Desktop]     [ATA RAID]     [Fedora Marketing]     [Fedora Packaging]     [Fedora SELinux]     [Yosemite Discussion]     [KDE Users]     [Fedora Docs]

  Powered by Linux