diff mbox series

[v2,30/48] multipathd: uxlsnr: add idle notification

Message ID 20211118225840.19810-31-mwilck@suse.com (mailing list archive)
State Not Applicable, archived
Delegated to: christophe varoqui
Headers show
Series multipathd: uxlsnr overhaul | expand

Commit Message

Martin Wilck Nov. 18, 2021, 10:58 p.m. UTC
From: Martin Wilck <mwilck@suse.com>

The previous patches added the state machine and the timeout handling,
but there was no wakeup mechanism for the uxlsnr for cases where
client connections were waiting for the vecs lock.

This patch uses the previously introduced wakeup mechanism of
struct mutex_lock for this purpose. Processes which unlock the
"global" vecs lock send an event in an eventfd which the uxlsnr
loop is polling for.

As we are now woken up for servicing client handlers that don't
wait for input but for the lock, we need to set up the pollfds
differently, and iterate over all clients when handling events,
not only over the ones that are receiving. The hangup handling
is changed, too. We have to look at every client, even if one has
hung up. Note that I don't take client_lock for the loop in
uxsock_listen(), it's not necessary and will be removed elsewhere
in a follow-up patch.

With this in place, the lock need not be taken in execute_handler()
any more. The uxlsnr only ever calls trylock() on the vecs lock,
avoiding any waiting for other threads to finish.

Signed-off-by: Martin Wilck <mwilck@suse.com>
---
 multipathd/uxlsnr.c | 200 ++++++++++++++++++++++++++++----------------
 1 file changed, 129 insertions(+), 71 deletions(-)

Comments

Benjamin Marzinski Nov. 25, 2021, 1:08 a.m. UTC | #1
On Thu, Nov 18, 2021 at 11:58:22PM +0100, mwilck@suse.com wrote:
> From: Martin Wilck <mwilck@suse.com>
> 
> The previous patches added the state machine and the timeout handling,
> but there was no wakeup mechanism for the uxlsnr for cases where
> client connections were waiting for the vecs lock.
> 
> This patch uses the previously introduced wakeup mechanism of
> struct mutex_lock for this purpose. Processes which unlock the
> "global" vecs lock send an event in an eventfd which the uxlsnr
> loop is polling for.
> 
> As we are now woken up for servicing client handlers that don't
> wait for input but for the lock, we need to set up the pollfds
> differently, and iterate over all clients when handling events,
> not only over the ones that are receiving. The hangup handling
> is changed, too. We have to look at every client, even if one has
> hung up. Note that I don't take client_lock for the loop in
> uxsock_listen(), it's not necessary and will be removed elsewhere
> in a follow-up patch.
> 
> With this in place, the lock need not be taken in execute_handler()
> any more. The uxlsnr only ever calls trylock() on the vecs lock,
> avoiding any waiting for other threads to finish.
> 
> Signed-off-by: Martin Wilck <mwilck@suse.com>
> ---
>  multipathd/uxlsnr.c | 200 ++++++++++++++++++++++++++++----------------
>  1 file changed, 129 insertions(+), 71 deletions(-)
> 
> diff --git a/multipathd/uxlsnr.c b/multipathd/uxlsnr.c
> index 87134d5..bf9780d 100644
> --- a/multipathd/uxlsnr.c
> +++ b/multipathd/uxlsnr.c
> @@ -24,6 +24,7 @@
>  #include <signal.h>
>  #include <stdbool.h>
>  #include <sys/inotify.h>
> +#include <sys/eventfd.h>
>  #include "checkers.h"
>  #include "memory.h"
>  #include "debug.h"
> @@ -70,6 +71,7 @@ struct client {
>  enum {
>  	POLLFD_UX = 0,
>  	POLLFD_NOTIFY,
> +	POLLFD_IDLE,
>  	POLLFDS_BASE,
>  };
>  
> @@ -90,6 +92,7 @@ static LIST_HEAD(clients);
>  static pthread_mutex_t client_lock = PTHREAD_MUTEX_INITIALIZER;
>  static struct pollfd *polls;
>  static int notify_fd = -1;
> +static int idle_fd = -1;
>  static char *watch_config_dir;
>  
>  static bool _socket_client_is_root(int fd)
> @@ -187,6 +190,17 @@ void uxsock_cleanup(void *arg)
>  	free_polls();
>  }
>  
> +void wakeup_cleanup(void *arg)
> +{
> +	struct mutex_lock *lck = arg;
> +	int fd = idle_fd;
> +
> +	idle_fd = -1;
> +	set_wakeup_fn(lck, NULL);
> +	if (fd != -1)
> +		close(fd);
> +}
> +
>  struct watch_descriptors {
>  	int conf_wd;
>  	int dir_wd;
> @@ -293,6 +307,18 @@ static void handle_inotify(int fd, struct watch_descriptors *wds)
>  
>  static const struct timespec ts_zero = { .tv_sec = 0, };
>  
> +/* call with clients lock held */
> +static bool __need_vecs_lock(void)
> +{
> +	struct client *c;
> +
> +	list_for_each_entry(c, &clients, node) {
> +		if (c->state == CLT_WAIT_LOCK)
> +			return true;
> +	}
> +	return false;
> +}
> +
>  static int parse_cmd(struct client *c)
>  {
>  	int r;
> @@ -310,40 +336,31 @@ static int parse_cmd(struct client *c)
>  	return r;
>  }
>  
> -static int execute_handler(struct client *c, struct vectors *vecs, int timeout)
> +static int execute_handler(struct client *c, struct vectors *vecs)
>  {
> -	int r;
> -	struct timespec tmo;
>  
> -	if (!c->handler)
> +	if (!c->handler || !c->handler->fn)
>  		return -EINVAL;
>  
> -	if (clock_gettime(CLOCK_REALTIME, &tmo) == 0) {
> -		tmo.tv_sec += timeout;
> -	} else {
> -		tmo.tv_sec = 0;
> -	}
> +	return c->handler->fn(c->cmdvec, &c->reply, vecs);
> +}
>  
> -	if (c->handler->locked) {
> -		int locked = 0;
> +static void wakeup_listener(void)
> +{
> +	uint64_t one = 1;
>  
> -		pthread_cleanup_push(cleanup_lock, &vecs->lock);
> -		if (tmo.tv_sec) {
> -			r = timedlock(&vecs->lock, &tmo);
> -		} else {
> -			lock(&vecs->lock);
> -			r = 0;
> -		}
> -		if (r == 0) {
> -			locked = 1;
> -			pthread_testcancel();
> -			r = c->handler->fn(c->cmdvec, &c->reply, vecs);
> -		}
> -		pthread_cleanup_pop(locked);
> -	} else
> -		r = c->handler->fn(c->cmdvec, &c->reply, vecs);
> +	if (idle_fd != -1 &&
> +	    write(idle_fd, &one, sizeof(one)) != sizeof(one))
> +		condlog(1, "%s: failed", __func__);
> +}
>  
> -	return r;
> +static void drain_idle_fd(int fd)
> +{
> +	uint64_t val;
> +	int rc;
> +
> +	rc = read(fd, &val, sizeof(val));
> +	condlog(4, "%s: %d, %"PRIu64, __func__, rc, val);
>  }
>  
>  void default_reply(struct client *c, int r)
> @@ -397,16 +414,19 @@ enum {
>  	STM_BREAK,
>  };
>  
> -static int client_state_machine(struct client *c, struct vectors *vecs)
> +static int client_state_machine(struct client *c, struct vectors *vecs,
> +				short revents)
>  {
>  	ssize_t n;
>  	const char *buf;
>  
> -	condlog(4, "%s: cli[%d] state=%d cmd=\"%s\" repl \"%s\"", __func__,
> -		c->fd, c->state, c->cmd, get_strbuf_str(&c->reply));
> +	condlog(4, "%s: cli[%d] poll=%x state=%d cmd=\"%s\" repl \"%s\"", __func__,
> +		c->fd, revents, c->state, c->cmd, get_strbuf_str(&c->reply));
>  
>          switch (c->state) {
>  	case CLT_RECV:
> +		if (!(revents & POLLIN))
> +			return STM_BREAK;
>  		if (c->cmd_len == 0) {
>  			/*
>  			 * We got POLLIN; assume that at least the length can
> @@ -462,17 +482,30 @@ static int client_state_machine(struct client *c, struct vectors *vecs)
>  		}
>  		if (c->error)
>  			set_client_state(c, CLT_SEND);
> +		else if (c->handler->locked)
> +			set_client_state(c, CLT_WAIT_LOCK);
>  		else
>  			set_client_state(c, CLT_WORK);
>  		return STM_CONT;
>  
>  	case CLT_WAIT_LOCK:

It's not a big deal, but I would prefer a name like CLT_LOCKED_WORK
instead of CLT_WAIT_LOCK, to make it obvious that these are alternate
possibilites.


> -		/* tbd */
> -		set_client_state(c, CLT_WORK);
> -		return STM_CONT;
> +                if (trylock(&vecs->lock) == 0) {
> +			/* don't use cleanup_lock(), lest we wakeup ourselves */
> +			pthread_cleanup_push_cast(__unlock, &vecs->lock);
> +			c->error = execute_handler(c, vecs);
> +			pthread_cleanup_pop(1);
> +			condlog(4, "%s: cli[%d] grabbed lock", __func__, c->fd);
> +			free_keys(c->cmdvec);
> +			c->cmdvec = NULL;
> +			set_client_state(c, CLT_SEND);
> +			return STM_CONT;
> +		} else {
> +			condlog(4, "%s: cli[%d] waiting for lock", __func__, c->fd);
> +			return STM_BREAK;
> +		}
>  
>  	case CLT_WORK:
> -		c->error = execute_handler(c, vecs, uxsock_timeout / 1000);
> +		c->error = execute_handler(c, vecs);
>  		free_keys(c->cmdvec);
>  		c->cmdvec = NULL;
>  		set_client_state(c, CLT_SEND);
> @@ -499,9 +532,14 @@ static int client_state_machine(struct client *c, struct vectors *vecs)
>  	}
>  }
>  
> -static void handle_client(struct client *c, struct vectors *vecs)
> +static void handle_client(struct client *c, struct vectors *vecs, short revents)
>  {
> -	while (client_state_machine(c, vecs) == STM_CONT);
> +	if (revents & (POLLHUP|POLLERR)) {
> +		c->error = -ECONNRESET;
> +		return;
> +	}
> +
> +        while (client_state_machine(c, vecs, revents) == STM_CONT);
>  }
>  
>  /*
> @@ -514,6 +552,8 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  	/* conf->sequence_nr will be 1 when uxsock_listen is first called */
>  	unsigned int sequence_nr = 0;
>  	struct watch_descriptors wds = { .conf_wd = -1, .dir_wd = -1 };
> +	bool need_lock = false;
> +	struct vectors *vecs = trigger_data;
>  
>  	condlog(3, "uxsock: startup listener");
>  	polls = MALLOC(max_pfds * sizeof(*polls));
> @@ -524,6 +564,14 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  	notify_fd = inotify_init1(IN_NONBLOCK);
>  	if (notify_fd == -1) /* it's fine if notifications fail */
>  		condlog(3, "failed to start up configuration notifications");
> +
> +	pthread_cleanup_push(wakeup_cleanup, &vecs->lock);
> +	idle_fd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
> +	if (idle_fd == -1)
> +		condlog(1, "failed to create idle fd");

If the idle_fd doesn't get correctly set, that seems like a fatal error
to me.

> +	else
> +		set_wakeup_fn(&vecs->lock, wakeup_listener);
> +
>  	sigfillset(&mask);
>  	sigdelset(&mask, SIGINT);
>  	sigdelset(&mask, SIGTERM);
> @@ -575,16 +623,30 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  		else
>  			polls[POLLFD_NOTIFY].events = POLLIN;
>  
> +		need_lock = __need_vecs_lock();
> +		polls[POLLFD_IDLE].fd = idle_fd;
> +		if (need_lock)
> +			polls[POLLFD_IDLE].events = POLLIN;
> +		else
> +			polls[POLLFD_IDLE].events = 0;
> +
>  		/* setup the clients */
> -		i = POLLFDS_BASE;
> -		list_for_each_entry(c, &clients, node) {
> -			polls[i].fd = c->fd;
> -			polls[i].events = POLLIN;
> -			i++;
> -			if (i >= max_pfds)
> -				break;
> -		}
> -		n_pfds = i;
> +                i = POLLFDS_BASE;
> +                list_for_each_entry(c, &clients, node) {

Nitpick: This would look clearer to me if, instead of a switch
statement, it was just

if (c->state != CLT_RECV)
        continue;

polls[i].events = POLLIN;
polls[i].fd = c->fd;
...


-Ben

> +                        switch(c->state) {
> +                        case CLT_RECV:
> +                                polls[i].events = POLLIN;
> +                                break;
> +                        default:
> +				/* don't poll for this client */
> +                                continue;
> +                        }
> +                        polls[i].fd = c->fd;
> +                        i++;
> +                        if (i >= max_pfds)
> +                                break;
> +                }
> +                n_pfds = i;
>  		pthread_cleanup_pop(1);
>  
>  		/* most of our life is spent in this call */
> @@ -607,33 +669,28 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  			handle_signals(true);
>  			continue;
>  		}
> +		if (polls[POLLFD_IDLE].fd != -1 &&
> +		    polls[POLLFD_IDLE].revents & POLLIN)
> +			drain_idle_fd(idle_fd);
>  
> -		/* see if a client wants to speak to us */
> -		for (i = POLLFDS_BASE; i < n_pfds; i++) {
> -			if (polls[i].revents & (POLLIN|POLLHUP|POLLERR)) {
> -				c = NULL;
> -				pthread_mutex_lock(&client_lock);
> -				list_for_each_entry(tmp, &clients, node) {
> -					if (tmp->fd == polls[i].fd) {
> -						c = tmp;
> -						break;
> -					}
> -				}
> -				pthread_mutex_unlock(&client_lock);
> -				if (!c) {
> -					condlog(4, "cli%d: new fd %d",
> -						i, polls[i].fd);
> -					continue;
> -				}
> -				if (polls[i].revents & (POLLHUP|POLLERR)) {
> -					condlog(4, "cli[%d]: Disconnected",
> -						c->fd);
> -					dead_client(c);
> -					continue;
> -				}
> -				handle_client(c, trigger_data);
> -				if (c->error == -ECONNRESET)
> -					dead_client(c);
> +		/* see if a client needs handling */
> +		list_for_each_entry_safe(c, tmp, &clients, node) {
> +			short revents = 0;
> +
> +			for (i = POLLFDS_BASE; i < n_pfds; i++) {
> +                                if (polls[i].fd == c->fd) {
> +                                        revents = polls[i].revents;
> +                                        break;
> +                                }
> +                        }
> +
> +			handle_client(c, trigger_data, revents);
> +
> +			if (c->error == -ECONNRESET) {
> +				condlog(4, "cli[%d]: disconnected", c->fd);
> +				dead_client(c);
> +				if (i < n_pfds)
> +					polls[i].fd = -1;
>  			}
>  		}
>  		/* see if we got a non-fatal signal */
> @@ -649,5 +706,6 @@ void *uxsock_listen(long ux_sock, void *trigger_data)
>  			handle_inotify(notify_fd, &wds);
>  	}
>  
> +	pthread_cleanup_pop(1);
>  	return NULL;
>  }
> -- 
> 2.33.1

--
dm-devel mailing list
dm-devel@redhat.com
https://listman.redhat.com/mailman/listinfo/dm-devel
Martin Wilck Nov. 26, 2021, 2:23 p.m. UTC | #2
On Wed, 2021-11-24 at 19:08 -0600, Benjamin Marzinski wrote:
> On Thu, Nov 18, 2021 at 11:58:22PM +0100, mwilck@suse.com wrote:
> > From: Martin Wilck <mwilck@suse.com>
> > 
> > The previous patches added the state machine and the timeout
> > handling,
> > but there was no wakeup mechanism for the uxlsnr for cases where
> > client connections were waiting for the vecs lock.
> > 
> > This patch uses the previously introduced wakeup mechanism of
> > struct mutex_lock for this purpose. Processes which unlock the
> > "global" vecs lock send an event in an eventfd which the uxlsnr
> > loop is polling for.
> > 
> > As we are now woken up for servicing client handlers that don't
> > wait for input but for the lock, we need to set up the pollfds
> > differently, and iterate over all clients when handling events,
> > not only over the ones that are receiving. The hangup handling
> > is changed, too. We have to look at every client, even if one has
> > hung up. Note that I don't take client_lock for the loop in
> > uxsock_listen(), it's not necessary and will be removed elsewhere
> > in a follow-up patch.
> > 
> > With this in place, the lock need not be taken in execute_handler()
> > any more. The uxlsnr only ever calls trylock() on the vecs lock,
> > avoiding any waiting for other threads to finish.
> > 
> > Signed-off-by: Martin Wilck <mwilck@suse.com>
> > ---
> >  multipathd/uxlsnr.c | 200 ++++++++++++++++++++++++++++------------
> > ----
> >  1 file changed, 129 insertions(+), 71 deletions(-)
> > 
> > diff --git a/multipathd/uxlsnr.c b/multipathd/uxlsnr.c
> > index 87134d5..bf9780d 100644
> > --- a/multipathd/uxlsnr.c
> > +++ b/multipathd/uxlsnr.c
> 
> Nitpick: This would look clearer to me if, instead of a switch
> statement, it was just
> 
> if (c->state != CLT_RECV)
>         continue;
> 
> polls[i].events = POLLIN;
> polls[i].fd = c->fd;
> ...

That's true if you look at this patch in isolation. The reason I use a
switch statement is that with patch 32, we get another case to treat
here (CLT_SEND). At that point, the switch is at least on par wrt
clarity, IMO.

No?

Martin


--
dm-devel mailing list
dm-devel@redhat.com
https://listman.redhat.com/mailman/listinfo/dm-devel
Benjamin Marzinski Nov. 29, 2021, 4:26 p.m. UTC | #3
On Fri, Nov 26, 2021 at 03:23:18PM +0100, Martin Wilck wrote:
> On Wed, 2021-11-24 at 19:08 -0600, Benjamin Marzinski wrote:
> > On Thu, Nov 18, 2021 at 11:58:22PM +0100, mwilck@suse.com wrote:
> > > From: Martin Wilck <mwilck@suse.com>
> > > 
> > > The previous patches added the state machine and the timeout
> > > handling,
> > > but there was no wakeup mechanism for the uxlsnr for cases where
> > > client connections were waiting for the vecs lock.
> > > 
> > > This patch uses the previously introduced wakeup mechanism of
> > > struct mutex_lock for this purpose. Processes which unlock the
> > > "global" vecs lock send an event in an eventfd which the uxlsnr
> > > loop is polling for.
> > > 
> > > As we are now woken up for servicing client handlers that don't
> > > wait for input but for the lock, we need to set up the pollfds
> > > differently, and iterate over all clients when handling events,
> > > not only over the ones that are receiving. The hangup handling
> > > is changed, too. We have to look at every client, even if one has
> > > hung up. Note that I don't take client_lock for the loop in
> > > uxsock_listen(), it's not necessary and will be removed elsewhere
> > > in a follow-up patch.
> > > 
> > > With this in place, the lock need not be taken in execute_handler()
> > > any more. The uxlsnr only ever calls trylock() on the vecs lock,
> > > avoiding any waiting for other threads to finish.
> > > 
> > > Signed-off-by: Martin Wilck <mwilck@suse.com>
> > > ---
> > >  multipathd/uxlsnr.c | 200 ++++++++++++++++++++++++++++------------
> > > ----
> > >  1 file changed, 129 insertions(+), 71 deletions(-)
> > > 
> > > diff --git a/multipathd/uxlsnr.c b/multipathd/uxlsnr.c
> > > index 87134d5..bf9780d 100644
> > > --- a/multipathd/uxlsnr.c
> > > +++ b/multipathd/uxlsnr.c
> > 
> > Nitpick: This would look clearer to me if, instead of a switch
> > statement, it was just
> > 
> > if (c->state != CLT_RECV)
> >         continue;
> > 
> > polls[i].events = POLLIN;
> > polls[i].fd = c->fd;
> > ...
> 
> That's true if you look at this patch in isolation. The reason I use a
> switch statement is that with patch 32, we get another case to treat
> here (CLT_SEND). At that point, the switch is at least on par wrt
> clarity, IMO.
> 
> No?

Yep. I retrack my objection

Reviewed-by: Benjamin Marzinski <bmarzins@redhat.com>

> 
> Martin

--
dm-devel mailing list
dm-devel@redhat.com
https://listman.redhat.com/mailman/listinfo/dm-devel
diff mbox series

Patch

diff --git a/multipathd/uxlsnr.c b/multipathd/uxlsnr.c
index 87134d5..bf9780d 100644
--- a/multipathd/uxlsnr.c
+++ b/multipathd/uxlsnr.c
@@ -24,6 +24,7 @@ 
 #include <signal.h>
 #include <stdbool.h>
 #include <sys/inotify.h>
+#include <sys/eventfd.h>
 #include "checkers.h"
 #include "memory.h"
 #include "debug.h"
@@ -70,6 +71,7 @@  struct client {
 enum {
 	POLLFD_UX = 0,
 	POLLFD_NOTIFY,
+	POLLFD_IDLE,
 	POLLFDS_BASE,
 };
 
@@ -90,6 +92,7 @@  static LIST_HEAD(clients);
 static pthread_mutex_t client_lock = PTHREAD_MUTEX_INITIALIZER;
 static struct pollfd *polls;
 static int notify_fd = -1;
+static int idle_fd = -1;
 static char *watch_config_dir;
 
 static bool _socket_client_is_root(int fd)
@@ -187,6 +190,17 @@  void uxsock_cleanup(void *arg)
 	free_polls();
 }
 
+void wakeup_cleanup(void *arg)
+{
+	struct mutex_lock *lck = arg;
+	int fd = idle_fd;
+
+	idle_fd = -1;
+	set_wakeup_fn(lck, NULL);
+	if (fd != -1)
+		close(fd);
+}
+
 struct watch_descriptors {
 	int conf_wd;
 	int dir_wd;
@@ -293,6 +307,18 @@  static void handle_inotify(int fd, struct watch_descriptors *wds)
 
 static const struct timespec ts_zero = { .tv_sec = 0, };
 
+/* call with clients lock held */
+static bool __need_vecs_lock(void)
+{
+	struct client *c;
+
+	list_for_each_entry(c, &clients, node) {
+		if (c->state == CLT_WAIT_LOCK)
+			return true;
+	}
+	return false;
+}
+
 static int parse_cmd(struct client *c)
 {
 	int r;
@@ -310,40 +336,31 @@  static int parse_cmd(struct client *c)
 	return r;
 }
 
-static int execute_handler(struct client *c, struct vectors *vecs, int timeout)
+static int execute_handler(struct client *c, struct vectors *vecs)
 {
-	int r;
-	struct timespec tmo;
 
-	if (!c->handler)
+	if (!c->handler || !c->handler->fn)
 		return -EINVAL;
 
-	if (clock_gettime(CLOCK_REALTIME, &tmo) == 0) {
-		tmo.tv_sec += timeout;
-	} else {
-		tmo.tv_sec = 0;
-	}
+	return c->handler->fn(c->cmdvec, &c->reply, vecs);
+}
 
-	if (c->handler->locked) {
-		int locked = 0;
+static void wakeup_listener(void)
+{
+	uint64_t one = 1;
 
-		pthread_cleanup_push(cleanup_lock, &vecs->lock);
-		if (tmo.tv_sec) {
-			r = timedlock(&vecs->lock, &tmo);
-		} else {
-			lock(&vecs->lock);
-			r = 0;
-		}
-		if (r == 0) {
-			locked = 1;
-			pthread_testcancel();
-			r = c->handler->fn(c->cmdvec, &c->reply, vecs);
-		}
-		pthread_cleanup_pop(locked);
-	} else
-		r = c->handler->fn(c->cmdvec, &c->reply, vecs);
+	if (idle_fd != -1 &&
+	    write(idle_fd, &one, sizeof(one)) != sizeof(one))
+		condlog(1, "%s: failed", __func__);
+}
 
-	return r;
+static void drain_idle_fd(int fd)
+{
+	uint64_t val;
+	int rc;
+
+	rc = read(fd, &val, sizeof(val));
+	condlog(4, "%s: %d, %"PRIu64, __func__, rc, val);
 }
 
 void default_reply(struct client *c, int r)
@@ -397,16 +414,19 @@  enum {
 	STM_BREAK,
 };
 
-static int client_state_machine(struct client *c, struct vectors *vecs)
+static int client_state_machine(struct client *c, struct vectors *vecs,
+				short revents)
 {
 	ssize_t n;
 	const char *buf;
 
-	condlog(4, "%s: cli[%d] state=%d cmd=\"%s\" repl \"%s\"", __func__,
-		c->fd, c->state, c->cmd, get_strbuf_str(&c->reply));
+	condlog(4, "%s: cli[%d] poll=%x state=%d cmd=\"%s\" repl \"%s\"", __func__,
+		c->fd, revents, c->state, c->cmd, get_strbuf_str(&c->reply));
 
         switch (c->state) {
 	case CLT_RECV:
+		if (!(revents & POLLIN))
+			return STM_BREAK;
 		if (c->cmd_len == 0) {
 			/*
 			 * We got POLLIN; assume that at least the length can
@@ -462,17 +482,30 @@  static int client_state_machine(struct client *c, struct vectors *vecs)
 		}
 		if (c->error)
 			set_client_state(c, CLT_SEND);
+		else if (c->handler->locked)
+			set_client_state(c, CLT_WAIT_LOCK);
 		else
 			set_client_state(c, CLT_WORK);
 		return STM_CONT;
 
 	case CLT_WAIT_LOCK:
-		/* tbd */
-		set_client_state(c, CLT_WORK);
-		return STM_CONT;
+                if (trylock(&vecs->lock) == 0) {
+			/* don't use cleanup_lock(), lest we wakeup ourselves */
+			pthread_cleanup_push_cast(__unlock, &vecs->lock);
+			c->error = execute_handler(c, vecs);
+			pthread_cleanup_pop(1);
+			condlog(4, "%s: cli[%d] grabbed lock", __func__, c->fd);
+			free_keys(c->cmdvec);
+			c->cmdvec = NULL;
+			set_client_state(c, CLT_SEND);
+			return STM_CONT;
+		} else {
+			condlog(4, "%s: cli[%d] waiting for lock", __func__, c->fd);
+			return STM_BREAK;
+		}
 
 	case CLT_WORK:
-		c->error = execute_handler(c, vecs, uxsock_timeout / 1000);
+		c->error = execute_handler(c, vecs);
 		free_keys(c->cmdvec);
 		c->cmdvec = NULL;
 		set_client_state(c, CLT_SEND);
@@ -499,9 +532,14 @@  static int client_state_machine(struct client *c, struct vectors *vecs)
 	}
 }
 
-static void handle_client(struct client *c, struct vectors *vecs)
+static void handle_client(struct client *c, struct vectors *vecs, short revents)
 {
-	while (client_state_machine(c, vecs) == STM_CONT);
+	if (revents & (POLLHUP|POLLERR)) {
+		c->error = -ECONNRESET;
+		return;
+	}
+
+        while (client_state_machine(c, vecs, revents) == STM_CONT);
 }
 
 /*
@@ -514,6 +552,8 @@  void *uxsock_listen(long ux_sock, void *trigger_data)
 	/* conf->sequence_nr will be 1 when uxsock_listen is first called */
 	unsigned int sequence_nr = 0;
 	struct watch_descriptors wds = { .conf_wd = -1, .dir_wd = -1 };
+	bool need_lock = false;
+	struct vectors *vecs = trigger_data;
 
 	condlog(3, "uxsock: startup listener");
 	polls = MALLOC(max_pfds * sizeof(*polls));
@@ -524,6 +564,14 @@  void *uxsock_listen(long ux_sock, void *trigger_data)
 	notify_fd = inotify_init1(IN_NONBLOCK);
 	if (notify_fd == -1) /* it's fine if notifications fail */
 		condlog(3, "failed to start up configuration notifications");
+
+	pthread_cleanup_push(wakeup_cleanup, &vecs->lock);
+	idle_fd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
+	if (idle_fd == -1)
+		condlog(1, "failed to create idle fd");
+	else
+		set_wakeup_fn(&vecs->lock, wakeup_listener);
+
 	sigfillset(&mask);
 	sigdelset(&mask, SIGINT);
 	sigdelset(&mask, SIGTERM);
@@ -575,16 +623,30 @@  void *uxsock_listen(long ux_sock, void *trigger_data)
 		else
 			polls[POLLFD_NOTIFY].events = POLLIN;
 
+		need_lock = __need_vecs_lock();
+		polls[POLLFD_IDLE].fd = idle_fd;
+		if (need_lock)
+			polls[POLLFD_IDLE].events = POLLIN;
+		else
+			polls[POLLFD_IDLE].events = 0;
+
 		/* setup the clients */
-		i = POLLFDS_BASE;
-		list_for_each_entry(c, &clients, node) {
-			polls[i].fd = c->fd;
-			polls[i].events = POLLIN;
-			i++;
-			if (i >= max_pfds)
-				break;
-		}
-		n_pfds = i;
+                i = POLLFDS_BASE;
+                list_for_each_entry(c, &clients, node) {
+                        switch(c->state) {
+                        case CLT_RECV:
+                                polls[i].events = POLLIN;
+                                break;
+                        default:
+				/* don't poll for this client */
+                                continue;
+                        }
+                        polls[i].fd = c->fd;
+                        i++;
+                        if (i >= max_pfds)
+                                break;
+                }
+                n_pfds = i;
 		pthread_cleanup_pop(1);
 
 		/* most of our life is spent in this call */
@@ -607,33 +669,28 @@  void *uxsock_listen(long ux_sock, void *trigger_data)
 			handle_signals(true);
 			continue;
 		}
+		if (polls[POLLFD_IDLE].fd != -1 &&
+		    polls[POLLFD_IDLE].revents & POLLIN)
+			drain_idle_fd(idle_fd);
 
-		/* see if a client wants to speak to us */
-		for (i = POLLFDS_BASE; i < n_pfds; i++) {
-			if (polls[i].revents & (POLLIN|POLLHUP|POLLERR)) {
-				c = NULL;
-				pthread_mutex_lock(&client_lock);
-				list_for_each_entry(tmp, &clients, node) {
-					if (tmp->fd == polls[i].fd) {
-						c = tmp;
-						break;
-					}
-				}
-				pthread_mutex_unlock(&client_lock);
-				if (!c) {
-					condlog(4, "cli%d: new fd %d",
-						i, polls[i].fd);
-					continue;
-				}
-				if (polls[i].revents & (POLLHUP|POLLERR)) {
-					condlog(4, "cli[%d]: Disconnected",
-						c->fd);
-					dead_client(c);
-					continue;
-				}
-				handle_client(c, trigger_data);
-				if (c->error == -ECONNRESET)
-					dead_client(c);
+		/* see if a client needs handling */
+		list_for_each_entry_safe(c, tmp, &clients, node) {
+			short revents = 0;
+
+			for (i = POLLFDS_BASE; i < n_pfds; i++) {
+                                if (polls[i].fd == c->fd) {
+                                        revents = polls[i].revents;
+                                        break;
+                                }
+                        }
+
+			handle_client(c, trigger_data, revents);
+
+			if (c->error == -ECONNRESET) {
+				condlog(4, "cli[%d]: disconnected", c->fd);
+				dead_client(c);
+				if (i < n_pfds)
+					polls[i].fd = -1;
 			}
 		}
 		/* see if we got a non-fatal signal */
@@ -649,5 +706,6 @@  void *uxsock_listen(long ux_sock, void *trigger_data)
 			handle_inotify(notify_fd, &wds);
 	}
 
+	pthread_cleanup_pop(1);
 	return NULL;
 }